devine-services/services/VIKI/__init__.py
Sp4rk.y dab35ef7d5 ♻️ (services): refactor code for improved readability and maintainability
Refactor services TFC, VIKI, and VIU to enhance code readability and maintainability.
- Update import statements for clarity.
- Adjust function and variable formatting for consistency.
- Simplify conditional logic and loops.
- Improve error handling and logging.
- Add support for additional configurations and language mappings.
- Ensure consistent use of language and region settings across services.
2024-10-10 19:02:24 -06:00

385 lines
15 KiB
Python

import base64
import os
import datetime
import time
import json
import re
import click
import sys
from langcodes import Language
from typing import Optional, Union, Generator
from devine.core.config import config
from devine.core.constants import AnyTrack
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapters, Tracks, Subtitle
from devine.core.tracks.attachment import Attachment
from devine.core.search_result import SearchResult
class VIKI(Service):
"""
Service code for Viki
Written by ToonsHub, improved by @sp4rk.y
Authorization: None (Free SD) | Cookies (Free and Paid Titles)
Security: FHD@L3
"""
TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com/(?:tv|movies)/)(?P<id>[a-z0-9]+)(?:-.+)?$"
# GEOFENCE = ("ca",)
@staticmethod
@click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__)
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.")
@click.pass_context
def cli(ctx, **kwargs):
return VIKI(ctx, **kwargs)
def __init__(self, ctx, title: str, movie: bool):
self.title = title
if "/movies/" in self.title:
self.is_movie = True
else:
self.is_movie = movie
super().__init__(ctx)
self.session.headers.update(
{
"user-agent": self.config["browser"]["user-agent"],
"x-client-user-agent": self.config["browser"]["user-agent"],
"x-viki-app-ver": self.config["browser"]["x-viki-app-ver"],
"x-viki-as-id": self.config["browser"]["x-viki-as-id"],
}
)
def search(self) -> Generator[SearchResult, None, None]:
query = self.title
response = self.session.get(
self.config["endpoints"]["search_endpoint_url"],
params={
"term": query,
"app": "100000a",
"per_page": 10,
"blocked": "true",
},
)
response.raise_for_status()
search_data = response.json()
for result in search_data["response"]:
media_type = "TV" if result["type"] == "series" else "Movie"
year = None
distributors = result.get("distributors")
if distributors:
from_date = distributors[0].get("from")
if from_date:
year_match = re.match(r"^\d{4}", from_date)
if year_match:
year = year_match.group()
label = media_type
if year:
label += f" ({year})"
if "viki_air_time" in result:
release_time = datetime.datetime.fromtimestamp(result["viki_air_time"], datetime.timezone.utc)
if release_time > datetime.datetime.now(
datetime.timezone.utc
):
time_diff = release_time - datetime.datetime.now(datetime.timezone.utc)
days, seconds = time_diff.days, time_diff.seconds
hours = days * 24 + seconds // 3600
minutes = (seconds % 3600) // 60
if hours > 0:
label = f"In {hours} hours"
elif minutes > 0:
label = f"In {minutes} minutes"
else:
label = "In less than a minute"
yield SearchResult(
id_=result["id"],
title=result["titles"]["en"],
description=result.get("descriptions", {}).get("en", "")[:200] + "...",
label=label,
url=f"https://www.viki.com/tv/{result['id']}",
)
def get_titles(self) -> Union[Movies, Series]:
match = re.match(self.TITLE_RE, self.title)
if match:
title_id = match.group("id")
else:
title_id = self.title
if not self.is_movie:
self.is_movie = False
episodes = []
pagenumber = 1
special_episode_number = 1
while True:
# Fetch series metadata from the endpoint in config, for each page
series_metadata_url = self.config["endpoints"]["episode_metadata"].format(
id=title_id, pagenumber=pagenumber
)
series_metadata = self.session.get(series_metadata_url).json()
self.series_metadata = series_metadata
if not series_metadata["response"] and not series_metadata["more"]:
break
show_year = self.get_show_year_from_search()
for episode in series_metadata["response"]:
episode_id = episode["id"]
if len(episode_id) < 4:
episode_id += "5349"
show_title = episode["container"]["titles"]["en"]
episode_season = 1
episode_number = episode["number"]
current_time = int(time.time()) # Get current Unix time in seconds
geo_blocking = episode.get("blocking", {}).get("geo", False)
viki_air_time = episode.get("viki_air_time", 0)
if geo_blocking:
# Only perform the time check if geo_blocking is True
if current_time < viki_air_time:
return
else:
# Episode is geo-blocked and its air time has passed
self.log.info(f"Episode {episode.get('number')} is blocked due to Geo-Location.\n")
sys.exit(1)
# Check for season number or year at the end of the show title
title_match = re.match(r"^(.*?)(?: (\d{4})| (\d+))?$", show_title)
if title_match:
base_title = title_match.group(1)
year = title_match.group(2)
season = title_match.group(3)
if season:
episode_season = int(season)
elif year:
base_title = show_title[:-5] # Strip the year
show_title = base_title
episode_title_with_year = f"{show_title}.{show_year}"
if "Special" in episode.get("titles", {}).get("en", "") or "Extra" in episode.get("titles", {}).get(
"en", ""
):
episode_season = 0
episode_number = special_episode_number
special_episode_number += 1
episode_name = None
episode_class = Episode(
id_=episode_id,
title=episode_title_with_year,
season=episode_season,
number=episode_number,
name=episode_name,
year=show_year,
service=self.__class__,
)
# Store the thumbnail URL only if it exists
if "images" in episode and "poster" in episode["images"] and "url" in episode["images"]["poster"]:
episode_class.thumbnail_url = episode["images"]["poster"]["url"]
episodes.append(episode_class)
pagenumber += 1
return Series(episodes)
else:
movie_metadata = self.session.get(f"https://www.viki.com/movies/{title_id}").text
video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", movie_metadata).group(1)
movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json()
self.movie_metadata = movie_metadata
movie_id = movie_metadata["id"]
movie_name = movie_metadata["titles"]["en"]
# Check for year at the end of the movie name and strip it
title_match = re.match(r"^(.*?)(?: (\d{4}))?$", movie_name)
if title_match:
base_title = title_match.group(1)
year = title_match.group(2)
if year:
movie_name = base_title
movie_year = self.get_show_year_from_search()
movie_class = Movie(id_=movie_id, name=movie_name, year=movie_year, service=self.__class__)
movie_class.thumbnail_url = movie_metadata["images"]["poster"]["url"]
return Movies([movie_class])
def get_show_year_from_search(self) -> Optional[str]:
# Determine the query based on metadata
if hasattr(self, "movie_metadata") and self.movie_metadata:
query = self.movie_metadata["container"]["titles"]["en"]
else:
query = self.series_metadata["response"][0]["container"]["titles"]["en"]
# Make the search request
response = self.session.get(
self.config["endpoints"]["search_endpoint_url"],
params={
"term": query,
"app": "100000a",
"per_page": 50,
"blocked": "true",
},
)
response.raise_for_status()
search_data = response.json()
for result in search_data.get("response", []):
if result.get("id") == self.title:
distributors = result.get("distributors")
if distributors:
from_date = distributors[0].get("from")
if from_date:
return from_date[:4]
match = re.match(self.TITLE_RE, self.title)
if match:
extracted_id = match.group("id")
if extracted_id == result.get("id"):
distributors = result.get("distributors")
if distributors:
from_date = distributors[0].get("from")
if from_date:
return from_date[:4]
# Default to "2024" if no year is found
return "2024"
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
chinese_language_map = {
"zh": "zh-Hans",
"zt": "zh-Hant",
"zh-tw": "zh-Hant",
"zh-hk": "zh-Hant",
"zh-hans": "zh-Hans",
"zh-hant": "zh-Hant",
}
# Handle original_id logic
original_id = title.id[:-4] if title.id.endswith("5349") else title.id
mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=original_id))
mpd_data = mpd_info.json()
try:
mpd_url = mpd_data["queue"][1]["url"]
except (KeyError, IndexError):
self.log.info("Episode not yet available\n")
sys.exit(1)
mpd_lang = mpd_data["video"]["origin"]["language"].lower()
if mpd_lang in chinese_language_map:
mpd_lang_mapped = chinese_language_map[mpd_lang]
else:
mpd_lang_mapped = mpd_lang
license_url = json.loads(base64.b64decode(mpd_data["drm"]).decode("utf-8", "ignore"))["dt3"]
tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
for track in tracks:
track.data["license_url"] = license_url
for track in tracks.audio:
track.data["original_language"] = track.language
track.language = Language.make(language=mpd_lang_mapped)
tracks.subtitles.clear()
def clean_language_label(label: str) -> str:
label = re.sub(r"<[^>]+>", "", label)
label = re.sub(r"\s*\(\d+%\)", "", label)
return label.strip()
if hasattr(title, "thumbnail_url") and title.thumbnail_url:
thumbnail_url = title.thumbnail_url
thumbnail_response = self.session.get(thumbnail_url)
if thumbnail_response.status_code == 200:
thumbnail_filename = f"{title.id}_thumbnail.jpg"
thumbnail_path = config.directories.temp / thumbnail_filename
os.makedirs(config.directories.temp, exist_ok=True)
with open(thumbnail_path, "wb") as f:
f.write(thumbnail_response.content)
thumbnail_attachment = Attachment(
path=thumbnail_path,
name=thumbnail_filename,
mime_type="image/jpeg",
description="Thumbnail",
)
if not hasattr(tracks, "attachments"):
tracks.attachments = []
tracks.attachments.append(thumbnail_attachment)
else:
self.log.warning("Thumbnail download failed.")
else:
self.log.warning("Thumbnail URL not available for this title.")
stream_subtitles = mpd_data.get("streamSubtitles", {}).get("dash", [])
if not stream_subtitles:
self.log.warning("No subtitles available in 'streamSubtitles.dash'.")
else:
for sub in stream_subtitles:
if sub.get("percentage", 0) > 95 and sub.get("kind") == "subtitles":
language_code_raw = sub.get("srclang", "").lower()
language_label = sub.get("label", language_code_raw)
language_name = clean_language_label(language_label)
if language_code_raw.startswith("z"):
language_code_mapped = chinese_language_map.get(language_code_raw, language_code_raw)
script = "Simplified" if language_code_mapped == "zh-Hans" else "Traditional"
language_name = f"Chinese ({script})"
else:
language_code_mapped = language_code_raw
is_original = language_code_mapped == mpd_lang_mapped
subtitle_id = f"{title.id}_{sub.get('id', '')}_{language_code_mapped}"
subtitle_track = Subtitle(
id_=subtitle_id,
url=sub["src"],
codec=Subtitle.Codec.WebVTT,
language=language_code_mapped,
is_original_lang=is_original,
forced=False,
sdh=False,
name=language_name,
)
if sub.get("default"):
subtitle_track.default = True
tracks.add(subtitle_track, warn_only=True)
return tracks
def get_chapters(self, *_, **__) -> Chapters:
return Chapters()
def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str:
return self.get_widevine_license(challenge, track)
def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes:
return self.session.post(url=track.data["license_url"], data=challenge).content