import json import uuid import re import urllib.parse import click import random import sys import datetime import langcodes from http.cookiejar import CookieJar from typing import Any, Optional from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from devine.core.credential import Credential from devine.core.tracks import Tracks, Subtitle from devine.core.utilities import is_close_match from devine.core.manifests import HLS class VIU(Service): """ Service code for VIU streaming service (https://viu.com). Authorization: Username-Password, None Security: HD@L3, NonDRM doesn't seem to care about releases. VIU has some regions supported: - 1: ID, MY - 2: SG, HK, TH, PH 1 & 2 has different api Author: unnamed improved by @sp4rk.y last update: 18/09/2024 """ # GEOFENCE = ("sg",) # Static method, this method belongs to the class @staticmethod # The command name, must much the service tag (and by extension the service folder) @click.command(name="VIU", short_help="https://viu.com", help=__doc__) @click.argument("title", type=str) @click.option("-l", "--lang", default="kor", help="Specify language for metadata") @click.option("-nt", "--notitle", is_flag=True, default=False, help="Dont grab episode title.") @click.pass_context def cli(ctx, **kwargs): return VIU(ctx, **kwargs) def __init__(self, ctx, title, lang: str, notitle: bool): self.url = title self.title = self.parse_input(title) self.notitle = notitle self.lang = lang self.token = "" self._auth_codes = {} self._user_token = None # Overriding the constructor super().__init__(ctx) def authenticate( self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None, ) -> None: self.credentials = credential self.session.headers.update( {"Referer": "https://viu.com/"} # headers Origin make 403 error ) self.log.info(" + Downloading without an account...") self.log.info(f" + Detected using: {self.jenis}") def get_titles(self) -> Titles_T: res = self.session.get(url=self.url, allow_redirects=False) try: match = re.search( r"href=\"\/ott\/(.+)\/index\.php\?r=campaign\/connectwithus\&language_flag_id=(\d+)\&area_id=(\d+)\"", res.text, ) if match: self.region = match.group(1) self.language_flag_id = match.group(2) self.area_id = match.group(3) else: self.region, self.area_id, self.language_flag_id = self.get_region() self.log.info(f" + Region: {self.region}") self.log.debug(f" + Area_id: {self.area_id}") self.log.debug(f" + Language_flag_id: {self.language_flag_id}") except Exception: self.log.exit(f" - Error, response: {res.text}") if self.region in ["my", "id"]: try: # Define a regex pattern to find "series_id":"51611" or "series_id":51611 series_id_match = re.search(r'"series_id"\s*:\s*["]?(\d+)["]?', res.text) if series_id_match: series_id = series_id_match.group(1) self.log.debug(f"Extracted series_id: {series_id}") else: raise ValueError("series_id not found in the response content.") except Exception as e: self.log.exit(f" - Error extracting series_id: {e}, response: {res.text}") # Define the category language mapping category_to_language = { "Drama Cina": "zh", "Siri Melayu": "ms", "Drama Korea": "ko", } # Search for language categories within

tags in the HTML category_pattern = r'

]*id="category"[^>]*>([^<]+)

' categories = re.findall(category_pattern, res.text) # Iterate through the categories found and set language based on mapping for category in categories: if category in category_to_language: self.lang = category_to_language[category] self.log.info(f" + Language: {self.lang}") break self.session.headers.update({"X-Forwarded-For": "139.195.232.194"}) meta_res = self.session.get( url=self.config["endpoints"]["gateway"], params={ "platform_flag_label": "web", "area_id": self.area_id, "language_flag_id": self.language_flag_id, "platformFlagLabel": "web", "areaId": self.area_id, "languageFlagId": self.language_flag_id, "countryCode": self.region.upper(), "ut": "0", "r": "/vod/product-list", "os_flag_id": "1", "series_id": series_id, }, ) try: data = meta_res.json()["data"] except Exception: self.log.info(f" - Error, response: {meta_res.text}") sys.exit() if not data.get("product_list"): meta_res2 = self.session.get( url=self.config["endpoints"]["gateway"], params={ "platform_flag_label": "web", "area_id": self.area_id, "language_flag_id": self.language_flag_id, "platformFlagLabel": "web", "areaId": self.area_id, "languageFlagId": self.language_flag_id, "countryCode": self.region.upper(), "ut": "0", "r": "/vod/product-list", "os_flag_id": "1", "series_id": data["current_product"]["series_id"], "size": "-1", "sort": "asc", }, ) try: product_list = meta_res2.json()["data"]["product_list"] data["series"]["product"] = product_list except Exception: self.log.info(f" - Error, response: {meta_res2.text}") sys.exit() else: self.session.headers.update({"X-Forwarded-For": "103.62.48.237"}) meta_res = self.session.get( url=self.config["endpoints"]["ott"].format(region=self.region), params={ "area_id": self.area_id, "language_flag_id": self.language_flag_id, "r": "vod/ajax-detail", "platform_flag_label": "web", "product_id": self.title, }, ) try: data = meta_res.json()["data"] except Exception: self.log.info(f" - Error, response: {meta_res.text}") sys.exit() product_type = "movie" if data.get("current_product", {}).get("is_movie", 0) == 1 else "series" self.log.info(f" + Product type: {product_type}") # Check if the region is 'my' or 'id' where the API structure is different if self.region in ["my", "id"]: if product_type == "movie": try: year = re.search(r"(\d{4})", data["series"]["name"]).group(1) except Exception: year = None return Movies( [ Movie( id_=self.title, service=self.__class__, year=year, name=data["series"]["name"], language=data["series"].get("series_language", self.lang), data=data, ) ] ) else: titles_ = [] for x in sorted(data["product_list"], key=lambda x: int(x.get("number", 0))): series_name = x.get("synopsis", "").split(" - Episode ")[0] or self.title year = ( datetime.datetime.fromtimestamp(int(x["schedule_start_time"])).year if x.get("schedule_start_time") else "" ) episode_title_with_year = f"{series_name}.{year}" titles_.append( Episode( id_=x["product_id"], title=episode_title_with_year, season=1, number=int(x.get("number", 0)), service=self.__class__, language=self.lang, data=x, ) ) return Series(titles_) else: if product_type == "movie": # Handle movie case for other regions try: year = re.search(r"(\d{4})", data["series"]["name"]).group(1) except Exception: year = None return Movies( [ Movie( id_=self.title, service=self.__class__, year=year, name=data["series"]["name"], language=data["series"].get("series_language", self.lang), data=data, ) ] ) else: titles_ = [] self.lang = "tl" if self.region == "ph" else self.lang # Override 'te' with 'tl' in the series language if present series_language = data["series"].get("series_language", self.lang) if series_language == "te": series_language = "tl" for x in sorted(data["series"]["product"], key=lambda x: int(x.get("number", 0))): episode_title_with_year = f"{data['series']['name'].replace('(', '').replace(')', '')}.{data['series'].get('release_of_year', '')}" titles_.append( Episode( id_=x["product_id"], title=episode_title_with_year, season=1, # If available, set the correct season number number=int(x.get("number", 0)), service=self.__class__, language=series_language, data=x, ) ) return Series(titles_) def get_tracks(self, title: Title_T) -> Tracks: tracks = Tracks() data = title.data if self.region in ["id", "my"]: stream_info = { "current_product": data, "time_duration": data.get("time_duration", ""), } else: stream_info = self.session.get( url=self.config["endpoints"]["ott"].format(region=self.region), params={ "area_id": self.area_id, "language_flag_id": self.language_flag_id, "r": "vod/ajax-detail", "platform_flag_label": "web", "product_id": title.id, }, ).json()["data"] duration_limit = False query = { "ccs_product_id": stream_info["current_product"]["ccs_product_id"], "language_flag_id": self.language_flag_id or "3", } def download_playback(): stream_data = self.session.get( url=self.config["endpoints"]["playback"], params=query, headers={"Authorization": f"Bearer {self._auth_codes[self.region]}"}, ).json() return self.check_error(stream_data).get("stream") if not self._auth_codes.get(self.region): self._auth_codes[self.region] = self._get_token(self.region) self.log.debug(f" + Token play: {self._auth_codes[self.region]}") stream_data = None try: stream_data = download_playback() except (Exception, KeyError): token = self._login(self.region) self.log.debug(f" + Token login: {token}") if token is not None: query["identity"] = token else: # The content is Preview or for VIP only. # We can try to bypass the duration which is limited to 3mins only duration_limit, query["duration"] = True, "180" try: stream_data = download_playback() except (Exception, KeyError): if token is None: raise self.log.exit( " - Login required, needs password, detected:" f"\nuser: {self.credentials.username}\npwd: {self.credentials.password}" ) if not stream_data: self.log.exit(" - Cannot get stream info") formats = [] for vid_format, stream_url in (stream_data.get("airplayurl") or {}).items(): height = int(re.search(r"s(\d+)p", vid_format).group(1)) # bypass preview duration limit if duration_limit: old_stream_url = urllib.parse.urlparse(stream_url) query = dict(urllib.parse.parse_qsl(old_stream_url.query, keep_blank_values=True)) query.update( { "duration": stream_info.get("time_duration") or "9999999", "duration_start": "0", } ) stream_url = old_stream_url._replace(query=urllib.parse.urlencode(query)).geturl() formats.append({"format_id": vid_format, "url": stream_url, "height": height}) for x in formats: tracks.add( HLS.from_url(url=x["url"], session=self.session).to_tracks(language=title.language), warn_only=True, ) if self.region in ["id", "my"]: # obtain subs - get per eps again meta_res = self.session.get( url=self.config["endpoints"]["gateway"], params={ "platform_flag_label": "web", "area_id": self.area_id, "language_flag_id": self.language_flag_id, "platformFlagLabel": "web", "areaId": self.area_id, "languageFlagId": self.language_flag_id, "countryCode": self.region.upper(), "ut": "0", "r": "/vod/detail", "product_id": title.id, "os_flag_id": "1", }, ) try: data = meta_res.json()["data"] stream_info = data except Exception: pass for x in stream_info["current_product"].get("subtitle", []): tracks.add( Subtitle( id_="{}_{}".format(x["product_subtitle_id"], x["code"]), url=x["url"], codec=Subtitle.Codec.SubRip, language=x["code"], is_original_lang=is_close_match(x["code"], [title.language]), forced=False, sdh=False, ), warn_only=True, ) if x.get("second_subtitle_url"): tracks.add( Subtitle( id_="{}_{}_annotation".format(x["product_subtitle_id"], x["code"]), url=x["second_subtitle_url"], codec=Subtitle.Codec.SubRip, language=x["code"], is_original_lang=is_close_match(x["code"], [title.language]), forced=False, sdh=False, ), warn_only=True, ) for video in tracks.videos: video.needs_repack = True if not video.language.is_valid(): video.language = langcodes.Language.get(self.lang) return tracks def get_chapters(self, title): return [] def get_widevine_license(self, challenge: bytes, title: Title_T, **_: Any) -> bytes: return self.session.post( url=self.config["endpoints"]["license"].format(id=title.id), headers={ "authorization": self.token_lic or self.config["auth"], "actiontype": "s", "drm_level": "l3", "hdcp_level": "null", "lang_id": "en", "languageid": "en", "os_ver": "10", "x-client": "browser", "x-request-id": str(uuid.uuid4()), "x-session-id": self.sessionid, }, data=challenge, # expects bytes ).content _AREA_ID = { "HK": 1, "SG": 2, "TH": 4, "PH": 5, "MY": 1001, } _LANGUAGE_FLAG = { 1: "zh-hk", 2: "zh-cn", 3: "en-us", } def parse_input(self, input_): re_product = r"vod\/(\d+)\/" re_playlist = r".+playlist-(\d+)" # re_playlist2 = r".+video.+-(\d+)" re_playlist2 = r"containerId=(\d+)" product_id = re.search(re_product, input_) playlist_id = re.search(re_playlist, input_) playlist2_id = re.search(re_playlist2, input_) if product_id: self.jenis = "product_id" input_id = product_id.group(1) elif playlist_id or playlist2_id: self.jenis = "playlist_id" input_ = playlist_id or playlist2_id input_id = input_.group(1) else: self.jenis = "playlist_id_eps" input_id = input_.split("-")[-1] return input_id def get_language_code(self, lang): language_code = { "en": "en", "zh": "zh-Hans", "zh-CN": "zh-Hans", "zh-Hant": "zh-Hant", "ms": "ms", "th": "th", "id": "id", "my": "my", "mya": "mya", } if language_code.get(lang): return language_code.get(lang) def get_region(self): region = "" area_id = "" language_flag_id = "" region_search = re.search(r"\/ott\/(.+?)\/(.+?)\/", self.url) if region_search: region = region_search.group(1) language = region_search.group(2) if region == "sg": area_id = 2 language_flag_id = "" if "zh" in language: language_flag_id = "2" else: language_flag_id = "3" elif region == "id": area_id = 1000 language_flag_id = "8" else: area_id = self._AREA_ID.get(str(region).upper()) or "hk" if "zh" in language: language_flag_id = "1" elif "th" in language: language_flag_id = "4" else: language_flag_id = "3" return region, area_id, language_flag_id def check_error(self, response): code = response["status"]["code"] if code > 0: message = response["status"]["message"] raise Exception( self.log.warn(f" - Got an error, code: {code} - message {message} - Trying to bypass it...") ) return response.get("data") or {} def get_token(self): self.sessionid = str(uuid.uuid4()) self.deviceid = str(uuid.uuid4()) res = self.session.post( url=self.config["endpoints"]["token"], params={ "ver": "1.0", "fmt": "json", "aver": "5.0", "appver": "2.0", "appid": "viu_desktop", "platform": "desktop", "iid": str(uuid.uuid4()), }, headers={ "accept": "application/json; charset=utf-8", "content-type": "application/json; charset=UTF-8", "x-session-id": self.sessionid, "Sec-Fetch-Mode": "cors", "x-client": "browser", }, json={"deviceId": self.deviceid}, ) if res.ok: return res.json()["token"] else: self.log.exit(f" - Cannot get token, response: {res.text}") def _get_token(self, country_code): rand = "".join(random.choice("0123456789") for _ in range(10)) self.uuid = str(uuid.uuid4()) return self.session.post( url=self.config["endpoints"]["token2"], params={"v": f"{rand}000&"}, headers={"Content-Type": "application/json"}, data=json.dumps( { "countryCode": country_code.upper(), "platform": "browser", "platformFlagLabel": "web", "language": "en", "uuid": self.uuid, "carrierId": "0", } ).encode("utf-8"), ).json()["token"] def _login(self, country_code): if not self._user_token: try: user = Credential.username pwd = Credential.password except Exception: user = None pwd = None if user == "empty" or not user: return if pwd == "empty" or not user: return self.log.debug(f" + auth: {self._auth_codes[country_code]}") headers = { "Authorization": f"Bearer {self._auth_codes[country_code]}", "Content-Type": "application/json", } data = self.session.post( url=self.config["endpoints"]["validate"], headers=headers, data=json.dumps({"principal": user, "provider": "email"}).encode(), ).json() if not data.get("exists"): self.log.exit(" - Invalid email address") data = self.session.post( url=self.config["endpoints"]["login"], headers=headers, data=json.dumps( { "email": user, "password": pwd, "provider": "email", } ).encode(), ).json() self.check_error(data) self._user_token = data.get("identity") # need to update with valid user's token else will throw an error again self._get_token[country_code] = data.get("token") return self._user_token