import base64 import os import datetime import time import json import re import click import sys from langcodes import Language from typing import Optional, Union, Generator from devine.core.config import config from devine.core.constants import AnyTrack from devine.core.manifests import DASH from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Chapters, Tracks, Subtitle from devine.core.tracks.attachment import Attachment from devine.core.search_result import SearchResult class VIKI(Service): """ Service code for Viki Written by ToonsHub, improved by @sp4rk.y Authorization: None (Free SD) | Cookies (Free and Paid Titles) Security: FHD@L3 """ TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com/(?:tv|movies)/)(?P[a-z0-9]+)(?:-.+)?$" # GEOFENCE = ("ca",) @staticmethod @click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__) @click.argument("title", type=str) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") @click.pass_context def cli(ctx, **kwargs): return VIKI(ctx, **kwargs) def __init__(self, ctx, title: str, movie: bool): self.title = title if "/movies/" in self.title: self.is_movie = True else: self.is_movie = movie super().__init__(ctx) self.session.headers.update( { "user-agent": self.config["browser"]["user-agent"], "x-client-user-agent": self.config["browser"]["user-agent"], "x-viki-app-ver": self.config["browser"]["x-viki-app-ver"], "x-viki-as-id": self.config["browser"]["x-viki-as-id"], } ) def search(self) -> Generator[SearchResult, None, None]: query = self.title response = self.session.get( self.config["endpoints"]["search_endpoint_url"], params={ "term": query, "app": "100000a", "per_page": 10, "blocked": "true", }, ) response.raise_for_status() search_data = response.json() for result in search_data["response"]: media_type = "TV" if result["type"] == "series" else "Movie" year = None distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: year_match = re.match(r"^\d{4}", from_date) if year_match: year = year_match.group() label = media_type if year: label += f" ({year})" if "viki_air_time" in result: release_time = datetime.datetime.fromtimestamp(result["viki_air_time"], datetime.timezone.utc) if release_time > datetime.datetime.now( datetime.timezone.utc ): time_diff = release_time - datetime.datetime.now(datetime.timezone.utc) days, seconds = time_diff.days, time_diff.seconds hours = days * 24 + seconds // 3600 minutes = (seconds % 3600) // 60 if hours > 0: label = f"In {hours} hours" elif minutes > 0: label = f"In {minutes} minutes" else: label = "In less than a minute" yield SearchResult( id_=result["id"], title=result["titles"]["en"], description=result.get("descriptions", {}).get("en", "")[:200] + "...", label=label, url=f"https://www.viki.com/tv/{result['id']}", ) def get_titles(self) -> Union[Movies, Series]: match = re.match(self.TITLE_RE, self.title) if match: title_id = match.group("id") else: title_id = self.title if not self.is_movie: self.is_movie = False episodes = [] pagenumber = 1 special_episode_number = 1 while True: # Fetch series metadata from the endpoint in config, for each page series_metadata_url = self.config["endpoints"]["episode_metadata"].format( id=title_id, pagenumber=pagenumber ) series_metadata = self.session.get(series_metadata_url).json() self.series_metadata = series_metadata if not series_metadata["response"] and not series_metadata["more"]: break show_year = self.get_show_year_from_search() for episode in series_metadata["response"]: episode_id = episode["id"] if len(episode_id) < 4: episode_id += "5349" show_title = episode["container"]["titles"]["en"] episode_season = 1 episode_number = episode["number"] current_time = int(time.time()) # Get current Unix time in seconds geo_blocking = episode.get("blocking", {}).get("geo", False) viki_air_time = episode.get("viki_air_time", 0) if geo_blocking: # Only perform the time check if geo_blocking is True if current_time < viki_air_time: return else: # Episode is geo-blocked and its air time has passed self.log.info(f"Episode {episode.get('number')} is blocked due to Geo-Location.\n") sys.exit(1) # Check for season number or year at the end of the show title title_match = re.match(r"^(.*?)(?: (\d{4})| (\d+))?$", show_title) if title_match: base_title = title_match.group(1) year = title_match.group(2) season = title_match.group(3) if season: episode_season = int(season) elif year: base_title = show_title[:-5] # Strip the year show_title = base_title episode_title_with_year = f"{show_title}.{show_year}" if "Special" in episode.get("titles", {}).get("en", "") or "Extra" in episode.get("titles", {}).get( "en", "" ): episode_season = 0 episode_number = special_episode_number special_episode_number += 1 episode_name = None episode_class = Episode( id_=episode_id, title=episode_title_with_year, season=episode_season, number=episode_number, name=episode_name, year=show_year, service=self.__class__, ) # Store the thumbnail URL only if it exists if "images" in episode and "poster" in episode["images"] and "url" in episode["images"]["poster"]: episode_class.thumbnail_url = episode["images"]["poster"]["url"] episodes.append(episode_class) pagenumber += 1 return Series(episodes) else: movie_metadata = self.session.get(f"https://www.viki.com/movies/{title_id}").text video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", movie_metadata).group(1) movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json() self.movie_metadata = movie_metadata movie_id = movie_metadata["id"] movie_name = movie_metadata["titles"]["en"] # Check for year at the end of the movie name and strip it title_match = re.match(r"^(.*?)(?: (\d{4}))?$", movie_name) if title_match: base_title = title_match.group(1) year = title_match.group(2) if year: movie_name = base_title movie_year = self.get_show_year_from_search() movie_class = Movie(id_=movie_id, name=movie_name, year=movie_year, service=self.__class__) movie_class.thumbnail_url = movie_metadata["images"]["poster"]["url"] return Movies([movie_class]) def get_show_year_from_search(self) -> Optional[str]: # Determine the query based on metadata if hasattr(self, "movie_metadata") and self.movie_metadata: query = self.movie_metadata["container"]["titles"]["en"] else: query = self.series_metadata["response"][0]["container"]["titles"]["en"] # Make the search request response = self.session.get( self.config["endpoints"]["search_endpoint_url"], params={ "term": query, "app": "100000a", "per_page": 50, "blocked": "true", }, ) response.raise_for_status() search_data = response.json() for result in search_data.get("response", []): if result.get("id") == self.title: distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: return from_date[:4] match = re.match(self.TITLE_RE, self.title) if match: extracted_id = match.group("id") if extracted_id == result.get("id"): distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: return from_date[:4] # Default to "2024" if no year is found return "2024" def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: chinese_language_map = { "zh": "zh-Hans", "zt": "zh-Hant", "zh-tw": "zh-Hant", "zh-hk": "zh-Hant", "zh-hans": "zh-Hans", "zh-hant": "zh-Hant", } # Handle original_id logic original_id = title.id[:-4] if title.id.endswith("5349") else title.id mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=original_id)) mpd_data = mpd_info.json() try: mpd_url = mpd_data["queue"][1]["url"] except (KeyError, IndexError): self.log.info("Episode not yet available\n") sys.exit(1) mpd_lang = mpd_data["video"]["origin"]["language"].lower() if mpd_lang in chinese_language_map: mpd_lang_mapped = chinese_language_map[mpd_lang] else: mpd_lang_mapped = mpd_lang license_url = json.loads(base64.b64decode(mpd_data["drm"]).decode("utf-8", "ignore"))["dt3"] tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang) for track in tracks: track.data["license_url"] = license_url for track in tracks.audio: track.data["original_language"] = track.language track.language = Language.make(language=mpd_lang_mapped) tracks.subtitles.clear() def clean_language_label(label: str) -> str: label = re.sub(r"<[^>]+>", "", label) label = re.sub(r"\s*\(\d+%\)", "", label) return label.strip() if hasattr(title, "thumbnail_url") and title.thumbnail_url: thumbnail_url = title.thumbnail_url thumbnail_response = self.session.get(thumbnail_url) if thumbnail_response.status_code == 200: thumbnail_filename = f"{title.id}_thumbnail.jpg" thumbnail_path = config.directories.temp / thumbnail_filename os.makedirs(config.directories.temp, exist_ok=True) with open(thumbnail_path, "wb") as f: f.write(thumbnail_response.content) thumbnail_attachment = Attachment( path=thumbnail_path, name=thumbnail_filename, mime_type="image/jpeg", description="Thumbnail", ) if not hasattr(tracks, "attachments"): tracks.attachments = [] tracks.attachments.append(thumbnail_attachment) else: self.log.warning("Thumbnail download failed.") else: self.log.warning("Thumbnail URL not available for this title.") stream_subtitles = mpd_data.get("streamSubtitles", {}).get("dash", []) if not stream_subtitles: self.log.warning("No subtitles available in 'streamSubtitles.dash'.") else: for sub in stream_subtitles: if sub.get("percentage", 0) > 95 and sub.get("kind") == "subtitles": language_code_raw = sub.get("srclang", "").lower() language_label = sub.get("label", language_code_raw) language_name = clean_language_label(language_label) if language_code_raw.startswith("z"): language_code_mapped = chinese_language_map.get(language_code_raw, language_code_raw) script = "Simplified" if language_code_mapped == "zh-Hans" else "Traditional" language_name = f"Chinese ({script})" else: language_code_mapped = language_code_raw is_original = language_code_mapped == mpd_lang_mapped subtitle_id = f"{title.id}_{sub.get('id', '')}_{language_code_mapped}" subtitle_track = Subtitle( id_=subtitle_id, url=sub["src"], codec=Subtitle.Codec.WebVTT, language=language_code_mapped, is_original_lang=is_original, forced=False, sdh=False, name=language_name, ) if sub.get("default"): subtitle_track.default = True tracks.add(subtitle_track, warn_only=True) return tracks def get_chapters(self, *_, **__) -> Chapters: return Chapters() def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str: return self.get_widevine_license(challenge, track) def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes: return self.session.post(url=track.data["license_url"], data=challenge).content