devine-services/services/VIU/__init__.py
Sp4rk.y dab35ef7d5 ♻️ (services): refactor code for improved readability and maintainability
Refactor services TFC, VIKI, and VIU to enhance code readability and maintainability.
- Update import statements for clarity.
- Adjust function and variable formatting for consistency.
- Simplify conditional logic and loops.
- Improve error handling and logging.
- Add support for additional configurations and language mappings.
- Ensure consistent use of language and region settings across services.
2024-10-10 19:02:24 -06:00

621 lines
23 KiB
Python

import json
import uuid
import re
import urllib.parse
import click
import random
import sys
import datetime
import langcodes
from http.cookiejar import CookieJar
from typing import Any, Optional
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.credential import Credential
from devine.core.tracks import Tracks, Subtitle
from devine.core.utilities import is_close_match
from devine.core.manifests import HLS
class VIU(Service):
"""
Service code for VIU streaming service (https://viu.com).
Authorization: Username-Password, None
Security: HD@L3, NonDRM doesn't seem to care about releases.
VIU has some regions supported:
- 1: ID, MY
- 2: SG, HK, TH, PH
1 & 2 has different api
Author: unnamed improved by @sp4rk.y
last update: 18/09/2024
"""
# GEOFENCE = ("sg",)
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="VIU", short_help="https://viu.com", help=__doc__)
@click.argument("title", type=str)
@click.option("-l", "--lang", default="kor", help="Specify language for metadata")
@click.option("-nt", "--notitle", is_flag=True, default=False, help="Dont grab episode title.")
@click.pass_context
def cli(ctx, **kwargs):
return VIU(ctx, **kwargs)
def __init__(self, ctx, title, lang: str, notitle: bool):
self.url = title
self.title = self.parse_input(title)
self.notitle = notitle
self.lang = lang
self.token = ""
self._auth_codes = {}
self._user_token = None
# Overriding the constructor
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
self.session.headers.update(
{"Referer": "https://viu.com/"} # headers Origin make 403 error
)
self.log.info(" + Downloading without an account...")
self.log.info(f" + Detected using: {self.jenis}")
def get_titles(self) -> Titles_T:
res = self.session.get(url=self.url, allow_redirects=False)
try:
match = re.search(
r"href=\"\/ott\/(.+)\/index\.php\?r=campaign\/connectwithus\&language_flag_id=(\d+)\&area_id=(\d+)\"",
res.text,
)
if match:
self.region = match.group(1)
self.language_flag_id = match.group(2)
self.area_id = match.group(3)
else:
self.region, self.area_id, self.language_flag_id = self.get_region()
self.log.info(f" + Region: {self.region}")
self.log.debug(f" + Area_id: {self.area_id}")
self.log.debug(f" + Language_flag_id: {self.language_flag_id}")
except Exception:
self.log.exit(f" - Error, response: {res.text}")
if self.region in ["my", "id"]:
try:
# Define a regex pattern to find "series_id":"51611" or "series_id":51611
series_id_match = re.search(r'"series_id"\s*:\s*["]?(\d+)["]?', res.text)
if series_id_match:
series_id = series_id_match.group(1)
self.log.debug(f"Extracted series_id: {series_id}")
else:
raise ValueError("series_id not found in the response content.")
except Exception as e:
self.log.exit(f" - Error extracting series_id: {e}, response: {res.text}")
# Define the category language mapping
category_to_language = {
"Drama Cina": "zh",
"Siri Melayu": "ms",
"Drama Korea": "ko",
}
# Search for language categories within <h2> tags in the HTML
category_pattern = r'<h2 class="Typography-root Typography-body1[^"]*"[^>]*id="category"[^>]*>([^<]+)</h2>'
categories = re.findall(category_pattern, res.text)
# Iterate through the categories found and set language based on mapping
for category in categories:
if category in category_to_language:
self.lang = category_to_language[category]
self.log.info(f" + Language: {self.lang}")
break
self.session.headers.update({"X-Forwarded-For": "139.195.232.194"})
meta_res = self.session.get(
url=self.config["endpoints"]["gateway"],
params={
"platform_flag_label": "web",
"area_id": self.area_id,
"language_flag_id": self.language_flag_id,
"platformFlagLabel": "web",
"areaId": self.area_id,
"languageFlagId": self.language_flag_id,
"countryCode": self.region.upper(),
"ut": "0",
"r": "/vod/product-list",
"os_flag_id": "1",
"series_id": series_id,
},
)
try:
data = meta_res.json()["data"]
except Exception:
self.log.info(f" - Error, response: {meta_res.text}")
sys.exit()
if not data.get("product_list"):
meta_res2 = self.session.get(
url=self.config["endpoints"]["gateway"],
params={
"platform_flag_label": "web",
"area_id": self.area_id,
"language_flag_id": self.language_flag_id,
"platformFlagLabel": "web",
"areaId": self.area_id,
"languageFlagId": self.language_flag_id,
"countryCode": self.region.upper(),
"ut": "0",
"r": "/vod/product-list",
"os_flag_id": "1",
"series_id": data["current_product"]["series_id"],
"size": "-1",
"sort": "asc",
},
)
try:
product_list = meta_res2.json()["data"]["product_list"]
data["series"]["product"] = product_list
except Exception:
self.log.info(f" - Error, response: {meta_res2.text}")
sys.exit()
else:
self.session.headers.update({"X-Forwarded-For": "103.62.48.237"})
meta_res = self.session.get(
url=self.config["endpoints"]["ott"].format(region=self.region),
params={
"area_id": self.area_id,
"language_flag_id": self.language_flag_id,
"r": "vod/ajax-detail",
"platform_flag_label": "web",
"product_id": self.title,
},
)
try:
data = meta_res.json()["data"]
except Exception:
self.log.info(f" - Error, response: {meta_res.text}")
sys.exit()
product_type = "movie" if data.get("current_product", {}).get("is_movie", 0) == 1 else "series"
self.log.info(f" + Product type: {product_type}")
# Check if the region is 'my' or 'id' where the API structure is different
if self.region in ["my", "id"]:
if product_type == "movie":
try:
year = re.search(r"(\d{4})", data["series"]["name"]).group(1)
except Exception:
year = None
return Movies(
[
Movie(
id_=self.title,
service=self.__class__,
year=year,
name=data["series"]["name"],
language=data["series"].get("series_language", self.lang),
data=data,
)
]
)
else:
titles_ = []
for x in sorted(data["product_list"], key=lambda x: int(x.get("number", 0))):
series_name = x.get("synopsis", "").split(" - Episode ")[0] or self.title
year = (
datetime.datetime.fromtimestamp(int(x["schedule_start_time"])).year
if x.get("schedule_start_time")
else ""
)
episode_title_with_year = f"{series_name}.{year}"
titles_.append(
Episode(
id_=x["product_id"],
title=episode_title_with_year,
season=1,
number=int(x.get("number", 0)),
service=self.__class__,
language=self.lang,
data=x,
)
)
return Series(titles_)
else:
if product_type == "movie":
# Handle movie case for other regions
try:
year = re.search(r"(\d{4})", data["series"]["name"]).group(1)
except Exception:
year = None
return Movies(
[
Movie(
id_=self.title,
service=self.__class__,
year=year,
name=data["series"]["name"],
language=data["series"].get("series_language", self.lang),
data=data,
)
]
)
else:
titles_ = []
self.lang = "tl" if self.region == "ph" else self.lang
# Override 'te' with 'tl' in the series language if present
series_language = data["series"].get("series_language", self.lang)
if series_language == "te":
series_language = "tl"
for x in sorted(data["series"]["product"], key=lambda x: int(x.get("number", 0))):
episode_title_with_year = f"{data['series']['name'].replace('(', '').replace(')', '')}.{data['series'].get('release_of_year', '')}"
titles_.append(
Episode(
id_=x["product_id"],
title=episode_title_with_year,
season=1, # If available, set the correct season number
number=int(x.get("number", 0)),
service=self.__class__,
language=series_language,
data=x,
)
)
return Series(titles_)
def get_tracks(self, title: Title_T) -> Tracks:
tracks = Tracks()
data = title.data
if self.region in ["id", "my"]:
stream_info = {
"current_product": data,
"time_duration": data.get("time_duration", ""),
}
else:
stream_info = self.session.get(
url=self.config["endpoints"]["ott"].format(region=self.region),
params={
"area_id": self.area_id,
"language_flag_id": self.language_flag_id,
"r": "vod/ajax-detail",
"platform_flag_label": "web",
"product_id": title.id,
},
).json()["data"]
duration_limit = False
query = {
"ccs_product_id": stream_info["current_product"]["ccs_product_id"],
"language_flag_id": self.language_flag_id or "3",
}
def download_playback():
stream_data = self.session.get(
url=self.config["endpoints"]["playback"],
params=query,
headers={"Authorization": f"Bearer {self._auth_codes[self.region]}"},
).json()
return self.check_error(stream_data).get("stream")
if not self._auth_codes.get(self.region):
self._auth_codes[self.region] = self._get_token(self.region)
self.log.debug(f" + Token play: {self._auth_codes[self.region]}")
stream_data = None
try:
stream_data = download_playback()
except (Exception, KeyError):
token = self._login(self.region)
self.log.debug(f" + Token login: {token}")
if token is not None:
query["identity"] = token
else:
# The content is Preview or for VIP only.
# We can try to bypass the duration which is limited to 3mins only
duration_limit, query["duration"] = True, "180"
try:
stream_data = download_playback()
except (Exception, KeyError):
if token is None:
raise
self.log.exit(
" - Login required, needs password, detected:"
f"\nuser: {self.credentials.username}\npwd: {self.credentials.password}"
)
if not stream_data:
self.log.exit(" - Cannot get stream info")
formats = []
for vid_format, stream_url in (stream_data.get("airplayurl") or {}).items():
height = int(re.search(r"s(\d+)p", vid_format).group(1))
# bypass preview duration limit
if duration_limit:
old_stream_url = urllib.parse.urlparse(stream_url)
query = dict(urllib.parse.parse_qsl(old_stream_url.query, keep_blank_values=True))
query.update(
{
"duration": stream_info.get("time_duration") or "9999999",
"duration_start": "0",
}
)
stream_url = old_stream_url._replace(query=urllib.parse.urlencode(query)).geturl()
formats.append({"format_id": vid_format, "url": stream_url, "height": height})
for x in formats:
tracks.add(
HLS.from_url(url=x["url"], session=self.session).to_tracks(language=title.language),
warn_only=True,
)
if self.region in ["id", "my"]:
# obtain subs - get per eps again
meta_res = self.session.get(
url=self.config["endpoints"]["gateway"],
params={
"platform_flag_label": "web",
"area_id": self.area_id,
"language_flag_id": self.language_flag_id,
"platformFlagLabel": "web",
"areaId": self.area_id,
"languageFlagId": self.language_flag_id,
"countryCode": self.region.upper(),
"ut": "0",
"r": "/vod/detail",
"product_id": title.id,
"os_flag_id": "1",
},
)
try:
data = meta_res.json()["data"]
stream_info = data
except Exception:
pass
for x in stream_info["current_product"].get("subtitle", []):
tracks.add(
Subtitle(
id_="{}_{}".format(x["product_subtitle_id"], x["code"]),
url=x["url"],
codec=Subtitle.Codec.SubRip,
language=x["code"],
is_original_lang=is_close_match(x["code"], [title.language]),
forced=False,
sdh=False,
),
warn_only=True,
)
if x.get("second_subtitle_url"):
tracks.add(
Subtitle(
id_="{}_{}_annotation".format(x["product_subtitle_id"], x["code"]),
url=x["second_subtitle_url"],
codec=Subtitle.Codec.SubRip,
language=x["code"],
is_original_lang=is_close_match(x["code"], [title.language]),
forced=False,
sdh=False,
),
warn_only=True,
)
for video in tracks.videos:
video.needs_repack = True
if not video.language.is_valid():
video.language = langcodes.Language.get(self.lang)
return tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Title_T, **_: Any) -> bytes:
return self.session.post(
url=self.config["endpoints"]["license"].format(id=title.id),
headers={
"authorization": self.token_lic or self.config["auth"],
"actiontype": "s",
"drm_level": "l3",
"hdcp_level": "null",
"lang_id": "en",
"languageid": "en",
"os_ver": "10",
"x-client": "browser",
"x-request-id": str(uuid.uuid4()),
"x-session-id": self.sessionid,
},
data=challenge, # expects bytes
).content
_AREA_ID = {
"HK": 1,
"SG": 2,
"TH": 4,
"PH": 5,
"MY": 1001,
}
_LANGUAGE_FLAG = {
1: "zh-hk",
2: "zh-cn",
3: "en-us",
}
def parse_input(self, input_):
re_product = r"vod\/(\d+)\/"
re_playlist = r".+playlist-(\d+)"
# re_playlist2 = r".+video.+-(\d+)"
re_playlist2 = r"containerId=(\d+)"
product_id = re.search(re_product, input_)
playlist_id = re.search(re_playlist, input_)
playlist2_id = re.search(re_playlist2, input_)
if product_id:
self.jenis = "product_id"
input_id = product_id.group(1)
elif playlist_id or playlist2_id:
self.jenis = "playlist_id"
input_ = playlist_id or playlist2_id
input_id = input_.group(1)
else:
self.jenis = "playlist_id_eps"
input_id = input_.split("-")[-1]
return input_id
def get_language_code(self, lang):
language_code = {
"en": "en",
"zh": "zh-Hans",
"zh-CN": "zh-Hans",
"zh-Hant": "zh-Hant",
"ms": "ms",
"th": "th",
"id": "id",
"my": "my",
"mya": "mya",
}
if language_code.get(lang):
return language_code.get(lang)
def get_region(self):
region = ""
area_id = ""
language_flag_id = ""
region_search = re.search(r"\/ott\/(.+?)\/(.+?)\/", self.url)
if region_search:
region = region_search.group(1)
language = region_search.group(2)
if region == "sg":
area_id = 2
language_flag_id = ""
if "zh" in language:
language_flag_id = "2"
else:
language_flag_id = "3"
elif region == "id":
area_id = 1000
language_flag_id = "8"
else:
area_id = self._AREA_ID.get(str(region).upper()) or "hk"
if "zh" in language:
language_flag_id = "1"
elif "th" in language:
language_flag_id = "4"
else:
language_flag_id = "3"
return region, area_id, language_flag_id
def check_error(self, response):
code = response["status"]["code"]
if code > 0:
message = response["status"]["message"]
raise Exception(
self.log.warn(f" - Got an error, code: {code} - message {message} - Trying to bypass it...")
)
return response.get("data") or {}
def get_token(self):
self.sessionid = str(uuid.uuid4())
self.deviceid = str(uuid.uuid4())
res = self.session.post(
url=self.config["endpoints"]["token"],
params={
"ver": "1.0",
"fmt": "json",
"aver": "5.0",
"appver": "2.0",
"appid": "viu_desktop",
"platform": "desktop",
"iid": str(uuid.uuid4()),
},
headers={
"accept": "application/json; charset=utf-8",
"content-type": "application/json; charset=UTF-8",
"x-session-id": self.sessionid,
"Sec-Fetch-Mode": "cors",
"x-client": "browser",
},
json={"deviceId": self.deviceid},
)
if res.ok:
return res.json()["token"]
else:
self.log.exit(f" - Cannot get token, response: {res.text}")
def _get_token(self, country_code):
rand = "".join(random.choice("0123456789") for _ in range(10))
self.uuid = str(uuid.uuid4())
return self.session.post(
url=self.config["endpoints"]["token2"],
params={"v": f"{rand}000&"},
headers={"Content-Type": "application/json"},
data=json.dumps(
{
"countryCode": country_code.upper(),
"platform": "browser",
"platformFlagLabel": "web",
"language": "en",
"uuid": self.uuid,
"carrierId": "0",
}
).encode("utf-8"),
).json()["token"]
def _login(self, country_code):
if not self._user_token:
try:
user = Credential.username
pwd = Credential.password
except Exception:
user = None
pwd = None
if user == "empty" or not user:
return
if pwd == "empty" or not user:
return
self.log.debug(f" + auth: {self._auth_codes[country_code]}")
headers = {
"Authorization": f"Bearer {self._auth_codes[country_code]}",
"Content-Type": "application/json",
}
data = self.session.post(
url=self.config["endpoints"]["validate"],
headers=headers,
data=json.dumps({"principal": user, "provider": "email"}).encode(),
).json()
if not data.get("exists"):
self.log.exit(" - Invalid email address")
data = self.session.post(
url=self.config["endpoints"]["login"],
headers=headers,
data=json.dumps(
{
"email": user,
"password": pwd,
"provider": "email",
}
).encode(),
).json()
self.check_error(data)
self._user_token = data.get("identity")
# need to update with valid user's token else will throw an error again
self._get_token[country_code] = data.get("token")
return self._user_token