diff --git a/services/DROP/__init__.py b/services/DROP/__init__.py new file mode 100644 index 0000000..30592ac --- /dev/null +++ b/services/DROP/__init__.py @@ -0,0 +1,232 @@ +import re +import os +import json +import click +from typing import Optional, Union +from http.cookiejar import CookieJar +from bs4 import BeautifulSoup + +from devine.core.config import config +from devine.core.service import Service +from devine.core.titles import Episode, Series +from devine.core.tracks import Tracks +from devine.core.credential import Credential +from devine.core.manifests import HLS +from devine.core.tracks.attachment import Attachment + + +class DROP(Service): + """ + Service code for DROPOUT.tv + Author: @sp4rk.y + + Authorization: Cookies or Credentials + Security: None + """ + + TITLE_RE = r"^(?:https?://(?:www\.)?dropout\.tv/)([^/]+)(?:/.*)?$" + SERIES_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)(?:/season:(\d+))?/?$" + EPISODE_RE = ( + r"https?://(?:www\.)?dropout\.tv/([^/]+)/season:(\d+)/videos/([^/]+)/?$" + ) + + LOGIN_URL = "https://www.dropout.tv/login" + + @staticmethod + @click.command(name="DROP", short_help="https://www.dropout.tv", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx, **kwargs): + return DROP(ctx, **kwargs) + + def __init__(self, ctx, title: str): + self.title = title + super().__init__(ctx) + + def authenticate( + self, + cookies: Optional[CookieJar] = None, + credential: Optional[Credential] = None, + ) -> None: + self.credentials = credential + + if cookies: + self.session.cookies.update(cookies) + elif self.credentials: + login_data = { + "email": self.credentials.username, + "password": self.credentials.password, + "authenticity_token": self._get_authenticity_token(), + "utf8": "true", + } + + # Use the URL from the config + response = self.session.post( + self.config["endpoints"]["login_url"], + data=login_data, + allow_redirects=False, + ) + + if '
Union[Series]: + match = re.match(self.SERIES_RE, self.title) + if match: + title_id = match.group(1) + else: + title_id = self.title + + url = self.config["endpoints"]["episode_metadata_url"].format(title_id=title_id) + response = self.session.get(url) + soup = BeautifulSoup(response.text, "html.parser") + + episodes = [] + season_urls = [] + + season_select = soup.find("select", class_="js-switch-season") + if season_select: + for option in season_select.find_all("option"): + season_urls.append(option["value"]) + + for season_url in season_urls: + season_response = self.session.get(season_url) + season_soup = BeautifulSoup(season_response.text, "html.parser") + + season_number = int(re.search(r"/season:(\d+)", season_url).group(1)) + + for item in season_soup.find_all("div", class_="browse-item-card"): + episode_link = item.find("a", class_="browse-item-link") + if episode_link: + episode_url = episode_link["href"] + episode_data = json.loads( + episode_link["data-track-event-properties"] + ) + + episode_id = episode_data["id"] + episode_title = episode_data["label"] + + episode_number_elem = item.find( + "span", class_="media-identifier media-episode" + ) + if episode_number_elem: + episode_number_match = re.search( + r"Episode (\d+)", episode_number_elem.text + ) + if episode_number_match: + episode_number = int(episode_number_match.group(1)) + else: + continue + else: + continue + + show_title = self.title.split("/")[-1].replace("-", " ").title() + + episode = Episode( + id_=str(episode_id), + service=self.__class__, + title=show_title, + season=season_number, + number=episode_number, + name=episode_title, + year=None, + data={"url": episode_url}, + ) + episodes.append(episode) + + return Series(episodes) + + def get_tracks(self, title: Union[Episode]) -> Tracks: + tracks = Tracks() + + episode_url = title.data["url"] + episode_page = self.session.get(episode_url).text + + embed_url_match = re.search( + self.config["endpoints"]["embed_url_regex"], episode_page + ) + if not embed_url_match: + raise ValueError("Could not find embed_url in the episode page") + embed_url = embed_url_match.group(1).replace("&", "&") + + headers = { + k: v.format(episode_url=episode_url) + for k, v in self.config["headers"].items() + } + + # Fetch the embed page content + embed_page = self.session.get(embed_url, headers=headers).text + + # Extract the config URL using regex + config_url_match = re.search( + self.config["endpoints"]["config_url_regex"], embed_page + ) + if config_url_match: + config_url = config_url_match.group(1).replace("\\u0026", "&") + else: + raise ValueError("Config URL not found on the embed page.") + + config_data = self.session.get(config_url, headers=headers).json() + + # Retrieve the CDN information from the config data + cdns = config_data["request"]["files"]["hls"]["cdns"] + default_cdn = config_data["request"]["files"]["hls"]["default_cdn"] + + # Select the default CDN or fall back to the first available one + cdn = cdns.get(default_cdn) or next(iter(cdns.values())) + + # Generate the MPD URL by replacing 'playlist.json' with 'playlist.mpd' + mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd") + + tracks = HLS.from_url(url=mpd_url).to_tracks(language="en") + + # Extract thumbnail URL from config_data + thumbnail_base_url = config_data["video"]["thumbs"]["base"] + thumbnail_url = f"{thumbnail_base_url}" + thumbnail_response = self.session.get(thumbnail_url) + if thumbnail_response.status_code == 200: + thumbnail_filename = f"{title.id}_thumbnail.jpg" + thumbnail_path = config.directories.temp / thumbnail_filename + + # Ensure the directory exists + os.makedirs(config.directories.temp, exist_ok=True) + + # Save the thumbnail file + with open(thumbnail_path, "wb") as f: + f.write(thumbnail_response.content) + + # Create an Attachment object + thumbnail_attachment = Attachment( + path=thumbnail_path, + name=thumbnail_filename, + mime_type="image/jpeg", + description="Thumbnail", + ) + + # Add the attachment to the tracks + tracks.attachments.append(thumbnail_attachment) + + return tracks + + def get_chapters(self, title): + return [] + + def get_widevine_license(self, challenge: bytes, title: Union[Episode], track): + # No DRM + pass diff --git a/services/DROP/config.yaml b/services/DROP/config.yaml new file mode 100644 index 0000000..afbeba4 --- /dev/null +++ b/services/DROP/config.yaml @@ -0,0 +1,15 @@ +endpoints: + login_url: "https://www.dropout.tv/login" + episode_metadata_url: "https://www.dropout.tv/{title_id}" + embed_url_regex: 'embed_url:\s*"([^"]+)"' + config_url_regex: 'config_url":"([^"]+)"' + +headers: + referer: "{episode_url}" + user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + accept: "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" + accept_language: "en-US,en;q=0.5" + upgrade_insecure_requests: "1" + sec_fetch_dest: "iframe" + sec_fetch_mode: "navigate" + sec_fetch_site: "cross-site" diff --git a/services/TFC/__init__.py b/services/TFC/__init__.py new file mode 100644 index 0000000..901ccb7 --- /dev/null +++ b/services/TFC/__init__.py @@ -0,0 +1,336 @@ +import json +import re +import time +import sys +from datetime import datetime, timedelta +from typing import Union, Generator, Optional +from urllib.parse import urljoin +from http.cookiejar import CookieJar + +import click +import requests + +from devine.core.constants import AnyTrack +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Tracks, Chapters, Subtitle, Chapter +from devine.core.credential import Credential +from devine.core.search_result import SearchResult +from devine.core.downloaders import curl_impersonate +from devine.core.utilities import get_ip_info +from devine.core.config import config +from devine.core.manifests.dash import DASH +import warnings + +# Weird chunk error from search, we're using this to ignore the warning popup +warnings.filterwarnings("ignore", message="chunk_size is ignored") + + +class TFC(Service): + """ + Service code for iWantTFC + Written by @sp4rk.y + + Authorization: Cookies (Free and Paid Titles) + Security: FHD@L3 + """ + + @staticmethod + @click.command(name="TFC", short_help="https://www.iwanttfc.com", help=__doc__) + @click.argument("title", type=str) + @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") + @click.pass_context + def cli(ctx, **kwargs): + return TFC(ctx, **kwargs) + + def __init__(self, ctx, title: str, movie: bool): + self.title = title + self.is_movie = movie + + self.credential = None + self.token = None + self.refresh_token = None + self.token_expiry = None + + super().__init__(ctx) + + self.session.headers.update( + { + "user-agent": self.config["browser"]["headers"]["user-agent"], + } + ) + + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0', + 'Accept': 'application/json, text/plain, */*', + 'Content-Type': 'application/x-www-form-urlencoded', + } + + self.session.headers.update(headers) + + if cookies: + self.session.cookies.update(cookies) + + if self.credential is None: + self.credential = credential + + if self.token is None: + auth_response = self.session.post( + url=self.config['endpoints']['api_login'], + data=f"password={credential.password}&email={credential.username}&deviceID={self.config['UUID']}", + headers=headers + ) + + # Parse the authentication response + response_json = auth_response.json() + + # Check if authentication was successful + if response_json.get('status') == 'OK' and 'UserAuthentication' in response_json: + # Extract token from UserAuthentication + self.token = response_json['UserAuthentication'] + self.refresh_token = response_json['refreshToken'] + self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp() + + # Update session headers with the Authorization token + self.session.headers.update({'Authorization': f'Bearer {self.token}'}) + else: + # Retry login if the first attempt fails + if auth_response.status_code == 401: # Assuming 401 for unauthorized + print("First login attempt failed, retrying...") + return self.authenticate(cookies, credential) # Recursive retry + else: + raise ValueError("Failed to authenticate. Response was not as expected.") + + return self.token + + def search(self) -> Generator[SearchResult, None, None]: + query = self.title + headers = self.config["search"]["headers"] + data = '{"requests":[{"query":"blabla","indexName":"www_iwanttfc_com_items","params":"hitsPerPage=200"},{"query":"blabla","indexName":"www_iwanttfc_com_tag_id_cast","params":"hitsPerPage=200"}]}' + parsed_data = json.loads(data) + parsed_data["requests"][0]["query"] = query + parsed_data["requests"][1]["query"] = query + response = requests.post( + self.config["endpoints"]["api_search"], + headers=headers, + data=json.dumps(parsed_data), + ) + response.raise_for_status() + + results = response.json()["results"] + for result in results[0]["hits"]: + title = result.get("title", {}).get("en", "") + if not title: + continue + + # Get detailed metadata + detail_url = self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=result["objectID"]) + detail_response = self.session.get(detail_url) + detail_data = detail_response.json() + + # Extract description and media type + description = detail_data.get("description", {}).get("en", "")[:200] + "..." + media_type = "TV" if "children" in detail_data else "Movie" + + # Extract year and episode count for TV shows + year = detail_data.get("release_year") + episode_count = 0 + + if media_type == "TV": + episode_count = len( + [episode for episode in detail_data.get("children", []) if "-tlr" not in episode["id"]] + ) + + # Construct label with episode count for TV shows + label = media_type + if year: + label += f" ({year})" + if media_type == "TV": + label += f" {episode_count} Episode{'' if episode_count == 1 else 's'}" + + # Create SearchResult with additional details + yield SearchResult( + id_=result["objectID"], + title=title, + description=description, + label=label, + ) + + def get_js_value(self) -> Optional[str]: + # Simulate browsing to the page and download the HTML file + for _ in curl_impersonate( + urls="https://www.iwanttfc.com/#!/browse", + output_dir=config.directories.temp, + filename="browse_page.html", + ): + pass + + # Read the downloaded HTML file + html_path = config.directories.temp / "browse_page.html" + with html_path.open("r", encoding="utf8") as f: + html_content = f.read() + + # Find the script tag with the catalog URL and extract the 'js' value + match = re.search(r'src="https://absprod-static.iwanttfc.com/c/6/catalog/(.*?)/script.js', html_content) + if match: + return match.group(1) + + return None + + def get_titles(self) -> Union[Movies, Series]: + # Get title metadata + try: + title_metadata = self.session.get( + self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=self.title) + ).json() + except ValueError: + self.log.warning("Show title does not exist.") + sys.exit(1) + + # Check for GEOFENCE rules (this part remains the same) + rules = title_metadata.get("rules", {}).get("rules", []) + for rule in rules: + if rule.get("start") <= time.time() * 1000 <= rule.get("end"): # Check if rule is active + required_countries = rule.get("countries", []) + if required_countries: + current_region = get_ip_info(self.session)["country"].lower() + if not any(x.lower() == current_region for x in required_countries): + self.log.warning( + f"Show '{title_metadata['id']}' requires a proxy in {', '.join(required_countries)} " + f"but your current region is {current_region.upper()}. " + ) + sys.exit(0) + + if "children" in title_metadata: + # TV Show - Extract episodes with correct season info + episodes = [] + for episode in title_metadata.get("children", []): + episode_id = episode["id"] + + # Extract season and episode number from ID + match = re.match(r".*-s(\d+)e(\d+)$", episode_id, re.IGNORECASE) + if not match: + continue # Skip if unable to parse season and episode + + season, number = map(int, match.groups()) + + # Create Episode object with season and episode number + episode_obj = Episode( + id_=episode_id, + title=title_metadata.get("title", {}).get("en"), + season=season, + number=number, + year=title_metadata.get("release_year"), + service=self.__class__, + ) + episodes.append(episode_obj) + + return Series(episodes) + + else: + # Movie - Extract movie details + movie_name = title_metadata.get("title", {}).get("en") + movie_year = title_metadata.get("release_year") + + # Create Movie object + movie_class = Movie( + id_=self.title, + name=movie_name, + year=movie_year, + service=self.__class__, + ) + + return Movies([movie_class]) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + if isinstance(title, Episode) and not title.data: + # Fetch detailed episode data if needed + episode_data = self.session.get( + self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id) + ).json() + title.data = episode_data + + # Extract MPD URLs + mpd_urls = episode_data.get("media", {}).get("mpds", []) + + # Extract subtitle URLs and languages + subtitle_data = [ + ( + urljoin(self.config["endpoints"]["api_subtitle"], caption.get("id")) + ".vtt", + caption.get("lang"), + ) + for caption in episode_data.get("media", {}).get("captions", []) + ] + + tracks = Tracks() + + # Create Video and Audio Tracks from MPDs, avoiding duplicates and storing episode_id + for mpd_url in mpd_urls: + mpd_tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language or "fil") + for track in mpd_tracks: + if not tracks.exists(by_id=track.id): + track.data["episode_id"] = episode_data.get("id") # Store episode_id in track.data + tracks.add(track) + + for track in tracks.audio: + mpd_lang = language = title.language or "fil" + track.language.language = mpd_lang + track.language._broader = [mpd_lang, "und"] + track.language._dict = {"language": mpd_lang} + track.language._str_tag = mpd_lang + + # Create Subtitle Tracks for all languages, avoiding duplicates + for subtitle_url, language in subtitle_data: + subtitle_track = Subtitle( + id_=subtitle_url.split("/")[-1].split(".")[0], + url=subtitle_url, + codec=Subtitle.Codec.WebVTT, + language=language, + is_original_lang=language == title.language, + ) + + if not tracks.exists(by_id=subtitle_track.id): + tracks.add(subtitle_track) + + chapters = self.get_chapters(title) + tracks.chapters = Chapters(chapters) + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: + if isinstance(title, Episode) and not title.data: + episode_data = self.session.get( + self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id) + ).json() + title.data = episode_data + + cuepoints = title.data.get("cuepoints", []) + + # Sort the cuepoints + sorted_cuepoints = sorted(cuepoints, key=lambda x: datetime.strptime(x, "%H:%M:%S.%f")) + + chapters = [ + Chapter(name="Chapter 1", timestamp="00:00:00.000") + ] + + for i, cuepoint in enumerate(sorted_cuepoints, start=2): + try: + timestamp = datetime.strptime(cuepoint, "%H:%M:%S.%f").time() + chapters.append(Chapter(name=f"Chapter {i}", timestamp=timestamp.strftime("%H:%M:%S.%f")[:-3])) + except ValueError: + self.log.warning(f"Invalid cuepoint format: {cuepoint}") + + return chapters + + def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str: + # TODO: Cache the returned service cert + return self.get_widevine_license(challenge, track) + + def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes: + episode_id = track.data.get("episode_id") + license_url = self.config["endpoints"]["api_license"] + license_url += f"?itemID={episode_id}" + license_url += f"&UserAuthentication={self.session.cookies.get('UserAuthentication')}" + license_url += "&build=52b61137ff3af37f55e0" + return self.session.post(url=license_url, data=challenge).content diff --git a/services/TFC/config.yaml b/services/TFC/config.yaml new file mode 100644 index 0000000..1537e89 --- /dev/null +++ b/services/TFC/config.yaml @@ -0,0 +1,28 @@ +endpoints: + api_login: "https://www.iwanttfc.com/api/1.0/user/auth" + api_playback: "https://absprod-static.iwanttfc.com/c/6/catalog/{js}/item/{id}.json" + api_license: "https://www.iwanttfc.com/api/1.0/license" + api_subtitle: "https://absprod-static.iwanttfc.com/c/6/captions/{videoid}" + api_search: "https://4nzqf4xnoy-2.algolianet.com/1/indexes/*/queries?x-algolia-agent=Algolia%20for%20JavaScript%20(4.11.0)%3B%20Browser" + +UUID: "50b0a188-d85f-4191-90f3-94f3f78a4f20" + +browser: + headers: + sec-ch-ua: '"Not/A)Brand";v="99", "Google Chrome";v="115", "Chromium";v="115"' + Accept: "application/json, text/plain, */*" + Referer: "https://www.iwanttfc.com/" + sec-ch-ua-mobile: "?0" + user-agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" + sec-ch-ua-platform: '"Windows"' + +search: + headers: + sec-ch-ua: '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"' + x-algolia-api-key: "e2f539d702376992de12e2042cdcda01" + sec-ch-ua-mobile: "?0" + User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" + content-type: "application/x-www-form-urlencoded" + x-algolia-application-id: "4NZQF4XNOY" + Referer: "https://www.iwanttfc.com/" + sec-ch-ua-platform: '"Windows"' diff --git a/services/VIKI/__init__.py b/services/VIKI/__init__.py new file mode 100644 index 0000000..145755a --- /dev/null +++ b/services/VIKI/__init__.py @@ -0,0 +1,291 @@ +import base64 +import datetime +import json +import re +import click +from langcodes import Language + +from typing import Optional, Union, Generator +from devine.core.constants import AnyTrack +from devine.core.manifests import DASH +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Chapters, Tracks, Subtitle +from devine.core.search_result import SearchResult + + +class VIKI(Service): + """ + Service code for Viki + Written by ToonsHub, improved by @sp4rk.y + + Authorization: None (Free SD) | Cookies (Free and Paid Titles) + Security: FHD@L3 + """ + + TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com/(?:tv|movies)/)(?P[a-z0-9]+)(?:-.+)?$" + GEOFENCE = ("ca",) + + @staticmethod + @click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__) + @click.argument("title", type=str) + @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") + @click.pass_context + def cli(ctx, **kwargs): + return VIKI(ctx, **kwargs) + + def __init__(self, ctx, title: str, movie: bool): + self.title = title + + if "/movies/" in self.title: + self.is_movie = True + else: + self.is_movie = movie + + super().__init__(ctx) + + self.session.headers.update( + { + "user-agent": self.config["browser"]["user-agent"], + "x-client-user-agent": self.config["browser"]["user-agent"], + "x-viki-app-ver": self.config["browser"]["x-viki-app-ver"], + "x-viki-as-id": self.config["browser"]["x-viki-as-id"], + } + ) + + def search(self) -> Generator[SearchResult, None, None]: + query = self.title + response = self.session.get( + self.config["endpoints"]["search_endpoint_url"], + params={ + "term": query, + "app": "100000a", + "per_page": 10, + "blocked": "true", + }, + ) + response.raise_for_status() + + search_data = response.json() + + for result in search_data["response"]: + media_type = "TV" if result["type"] == "series" else "Movie" + + year = None + distributors = result.get("distributors") + if distributors: + from_date = distributors[0].get("from") + if from_date: + year_match = re.match(r"^\d{4}", from_date) + if year_match: + year = year_match.group() + label = media_type + if year: + label += f" ({year})" + if "viki_air_time" in result: + release_time = datetime.datetime.fromtimestamp(result["viki_air_time"], datetime.timezone.utc) + if release_time > datetime.datetime.now( + datetime.timezone.utc + ): # Check if release time is in the future + time_diff = release_time - datetime.datetime.now(datetime.timezone.utc) + days, seconds = time_diff.days, time_diff.seconds + hours = days * 24 + seconds // 3600 + minutes = (seconds % 3600) // 60 + if hours > 0: + label = f"In {hours} hours" + elif minutes > 0: + label = f"In {minutes} minutes" + else: + label = "In less than a minute" + yield SearchResult( + id_=result["id"], + title=result["titles"]["en"], + description=result.get("descriptions", {}).get("en", "")[:200] + "...", + label=label, + url=f"https://www.viki.com/tv/{result['id']}", + ) + + def get_titles(self) -> Union[Movies, Series]: + match = re.match(self.TITLE_RE, self.title) + if match: + title_id = match.group("id") + else: + title_id = self.title + + if not self.is_movie: + self.is_movie = False + episodes = [] + pagenumber = 1 + special_episode_number = 1 + while True: + series_metadata = self.session.get( + f"https://api.viki.io/v4/containers/{title_id}/episodes.json?direction=asc&with_upcoming=false&sort=number&page={pagenumber}&per_page=10&app=100000a" + ).json() + + self.series_metadata = series_metadata + + if not series_metadata["response"] and not series_metadata["more"]: + break + show_year = self.get_show_year_from_search() + + for episode in series_metadata["response"]: + episode_id = episode["id"] + show_title = episode["container"]["titles"]["en"] + episode_season = 1 + episode_number = episode["number"] + + # Check for season number or year at the end of the show title + title_match = re.match(r"^(.*?)(?: (\d{4})| (\d+))?$", show_title) + if title_match: + base_title = title_match.group(1) + year = title_match.group(2) + season = title_match.group(3) + + if season: + episode_season = int(season) + elif year: + base_title = show_title[:-5] # Strip the year + + show_title = base_title + + episode_title_with_year = f"{show_title}.{show_year}" + if "Special" in episode.get("titles", {}).get("en", "") or "Extra" in episode.get("titles", {}).get( + "en", "" + ): + episode_season = 0 + episode_number = special_episode_number + special_episode_number += 1 + + episode_name = None + episode_class = Episode( + id_=episode_id, + title=episode_title_with_year, + season=episode_season, + number=episode_number, + name=episode_name, + year=show_year, + service=self.__class__, + ) + episodes.append(episode_class) + pagenumber += 1 + + return Series(episodes) + + else: + movie_metadata = self.session.get(f"https://www.viki.com/movies/{title_id}").text + video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", movie_metadata).group(1) + + movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json() + self.movie_metadata = movie_metadata + movie_id = movie_metadata["id"] + movie_name = movie_metadata["titles"]["en"] + + # Check for year at the end of the movie name and strip it + title_match = re.match(r"^(.*?)(?: (\d{4}))?$", movie_name) + if title_match: + base_title = title_match.group(1) + year = title_match.group(2) + + if year: + movie_name = base_title + + movie_year = self.get_show_year_from_search() + movie_class = Movie(id_=movie_id, name=movie_name, year=movie_year, service=self.__class__) + + return Movies([movie_class]) + + def get_show_year_from_search(self) -> Optional[str]: + if hasattr(self, "movie_metadata") and self.movie_metadata: + query = self.movie_metadata["container"]["titles"]["en"] + else: + query = self.series_metadata["response"][0]["container"]["titles"]["en"] + + response = self.session.get( + self.config["endpoints"]["search_endpoint_url"], + params={ + "term": query, + "app": "100000a", + "per_page": 50, + "blocked": "true", + }, + ) + response.raise_for_status() + + search_data = response.json() + + for result in search_data["response"]: + if result["id"] == self.title or re.match(self.TITLE_RE, self.title).group("id") == result["id"]: + distributors = result.get("distributors") + if distributors: + from_date = distributors[0].get("from") + if from_date: + return from_date[:4] + return None + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + CHINESE_LANGUAGE_MAP = { + "zh": "zh-Hans", # Simplified Chinese + "zt": "zh-Hant", # Traditional Chinese + "zh-TW": "zh-Hant", # Traditional Chinese (Taiwan) + "zh-HK": "zh-Hant", # Traditional Chinese (Hong Kong) + } + mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=title.id)) + mpd_data = mpd_info.json() + mpd_url = mpd_data["queue"][1]["url"] + mpd_lang = mpd_data["video"]["origin"]["language"] + if mpd_lang in CHINESE_LANGUAGE_MAP: + mpd_lang = CHINESE_LANGUAGE_MAP[mpd_lang] + + license_url = json.loads(base64.b64decode(mpd_data["drm"]).decode("utf-8", "ignore"))["dt3"] + tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang) + + for track in tracks: + track.data["license_url"] = license_url + + for track in tracks.audio: + track.language = Language.make(language=mpd_lang) + + tracks.subtitles.clear() + + def strip_percentage(name: str) -> str: + return re.sub(r"\s*\(\d+%\)", "", name).strip() + + if "subtitles" in mpd_data: + for sub in mpd_data["subtitles"]: + if sub.get("percentage", 0) > 95: + language_code = sub["srclang"] + language_name = sub.get("label", language_code) + language_name = strip_percentage(language_name) + + if language_code.startswith("zh"): + language_code = CHINESE_LANGUAGE_MAP.get(language_code, language_code) + + is_original = language_code == mpd_lang + + subtitle_track = Subtitle( + id_=f"{sub.get('id', '')}_{language_code}", + url=sub["src"], + codec=Subtitle.Codec.WebVTT, + language=language_code, + is_original_lang=is_original, + forced=False, + sdh=False, + name=language_name, + ) + + if sub.get("default"): + subtitle_track.default = True + + tracks.add(subtitle_track, warn_only=True) + + return tracks + + def get_chapters(self, *_, **__) -> Chapters: + return Chapters() + + def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str: + # TODO: Cache the returned service cert + return self.get_widevine_license(challenge, track) + + def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes: + return self.session.post(url=track.data["license_url"], data=challenge).content diff --git a/services/VIKI/config.yaml b/services/VIKI/config.yaml new file mode 100644 index 0000000..d91703a --- /dev/null +++ b/services/VIKI/config.yaml @@ -0,0 +1,10 @@ +endpoints: + episode_metadata: https://api.viki.io/v4/containers/{id}/episodes.json?direction=asc&with_upcoming=false&sort=number&page={pagenumber}&per_page=1000&app=100000a + video_metadata: https://api.viki.io/v4/videos/{id}.json?app=100000a + mpd_api: https://www.viki.com/api/videos/{id} + search_endpoint_url: https://api.viki.io/v4/search.json + +browser: + user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 OPR/107.0.0.0 + x-viki-app-ver: 14.10.0 + x-viki-as-id: 100000a-1709757058757-0fb4be98-a04e-47b2-a80b-2dfe75cc6376 \ No newline at end of file