Sp4rk.y
dab35ef7d5
Refactor services TFC, VIKI, and VIU to enhance code readability and maintainability. - Update import statements for clarity. - Adjust function and variable formatting for consistency. - Simplify conditional logic and loops. - Improve error handling and logging. - Add support for additional configurations and language mappings. - Ensure consistent use of language and region settings across services.
385 lines
15 KiB
Python
385 lines
15 KiB
Python
import base64
|
|
import os
|
|
import datetime
|
|
import time
|
|
import json
|
|
import re
|
|
import click
|
|
import sys
|
|
from langcodes import Language
|
|
|
|
from typing import Optional, Union, Generator
|
|
from devine.core.config import config
|
|
from devine.core.constants import AnyTrack
|
|
from devine.core.manifests import DASH
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series
|
|
from devine.core.tracks import Chapters, Tracks, Subtitle
|
|
from devine.core.tracks.attachment import Attachment
|
|
from devine.core.search_result import SearchResult
|
|
|
|
|
|
class VIKI(Service):
|
|
"""
|
|
Service code for Viki
|
|
Written by ToonsHub, improved by @sp4rk.y
|
|
|
|
Authorization: None (Free SD) | Cookies (Free and Paid Titles)
|
|
Security: FHD@L3
|
|
"""
|
|
|
|
TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com/(?:tv|movies)/)(?P<id>[a-z0-9]+)(?:-.+)?$"
|
|
# GEOFENCE = ("ca",)
|
|
|
|
@staticmethod
|
|
@click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.")
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
return VIKI(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title: str, movie: bool):
|
|
self.title = title
|
|
|
|
if "/movies/" in self.title:
|
|
self.is_movie = True
|
|
else:
|
|
self.is_movie = movie
|
|
|
|
super().__init__(ctx)
|
|
|
|
self.session.headers.update(
|
|
{
|
|
"user-agent": self.config["browser"]["user-agent"],
|
|
"x-client-user-agent": self.config["browser"]["user-agent"],
|
|
"x-viki-app-ver": self.config["browser"]["x-viki-app-ver"],
|
|
"x-viki-as-id": self.config["browser"]["x-viki-as-id"],
|
|
}
|
|
)
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
query = self.title
|
|
response = self.session.get(
|
|
self.config["endpoints"]["search_endpoint_url"],
|
|
params={
|
|
"term": query,
|
|
"app": "100000a",
|
|
"per_page": 10,
|
|
"blocked": "true",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
search_data = response.json()
|
|
|
|
for result in search_data["response"]:
|
|
media_type = "TV" if result["type"] == "series" else "Movie"
|
|
|
|
year = None
|
|
distributors = result.get("distributors")
|
|
if distributors:
|
|
from_date = distributors[0].get("from")
|
|
if from_date:
|
|
year_match = re.match(r"^\d{4}", from_date)
|
|
if year_match:
|
|
year = year_match.group()
|
|
label = media_type
|
|
if year:
|
|
label += f" ({year})"
|
|
if "viki_air_time" in result:
|
|
release_time = datetime.datetime.fromtimestamp(result["viki_air_time"], datetime.timezone.utc)
|
|
if release_time > datetime.datetime.now(
|
|
datetime.timezone.utc
|
|
):
|
|
time_diff = release_time - datetime.datetime.now(datetime.timezone.utc)
|
|
days, seconds = time_diff.days, time_diff.seconds
|
|
hours = days * 24 + seconds // 3600
|
|
minutes = (seconds % 3600) // 60
|
|
if hours > 0:
|
|
label = f"In {hours} hours"
|
|
elif minutes > 0:
|
|
label = f"In {minutes} minutes"
|
|
else:
|
|
label = "In less than a minute"
|
|
yield SearchResult(
|
|
id_=result["id"],
|
|
title=result["titles"]["en"],
|
|
description=result.get("descriptions", {}).get("en", "")[:200] + "...",
|
|
label=label,
|
|
url=f"https://www.viki.com/tv/{result['id']}",
|
|
)
|
|
|
|
def get_titles(self) -> Union[Movies, Series]:
|
|
match = re.match(self.TITLE_RE, self.title)
|
|
if match:
|
|
title_id = match.group("id")
|
|
else:
|
|
title_id = self.title
|
|
|
|
if not self.is_movie:
|
|
self.is_movie = False
|
|
episodes = []
|
|
pagenumber = 1
|
|
special_episode_number = 1
|
|
while True:
|
|
# Fetch series metadata from the endpoint in config, for each page
|
|
series_metadata_url = self.config["endpoints"]["episode_metadata"].format(
|
|
id=title_id, pagenumber=pagenumber
|
|
)
|
|
series_metadata = self.session.get(series_metadata_url).json()
|
|
|
|
self.series_metadata = series_metadata
|
|
|
|
if not series_metadata["response"] and not series_metadata["more"]:
|
|
break
|
|
show_year = self.get_show_year_from_search()
|
|
|
|
for episode in series_metadata["response"]:
|
|
episode_id = episode["id"]
|
|
|
|
if len(episode_id) < 4:
|
|
episode_id += "5349"
|
|
|
|
show_title = episode["container"]["titles"]["en"]
|
|
episode_season = 1
|
|
episode_number = episode["number"]
|
|
|
|
current_time = int(time.time()) # Get current Unix time in seconds
|
|
geo_blocking = episode.get("blocking", {}).get("geo", False)
|
|
viki_air_time = episode.get("viki_air_time", 0)
|
|
|
|
if geo_blocking:
|
|
# Only perform the time check if geo_blocking is True
|
|
if current_time < viki_air_time:
|
|
return
|
|
else:
|
|
# Episode is geo-blocked and its air time has passed
|
|
self.log.info(f"Episode {episode.get('number')} is blocked due to Geo-Location.\n")
|
|
sys.exit(1)
|
|
|
|
# Check for season number or year at the end of the show title
|
|
title_match = re.match(r"^(.*?)(?: (\d{4})| (\d+))?$", show_title)
|
|
if title_match:
|
|
base_title = title_match.group(1)
|
|
year = title_match.group(2)
|
|
season = title_match.group(3)
|
|
|
|
if season:
|
|
episode_season = int(season)
|
|
elif year:
|
|
base_title = show_title[:-5] # Strip the year
|
|
|
|
show_title = base_title
|
|
|
|
episode_title_with_year = f"{show_title}.{show_year}"
|
|
if "Special" in episode.get("titles", {}).get("en", "") or "Extra" in episode.get("titles", {}).get(
|
|
"en", ""
|
|
):
|
|
episode_season = 0
|
|
episode_number = special_episode_number
|
|
special_episode_number += 1
|
|
|
|
episode_name = None
|
|
episode_class = Episode(
|
|
id_=episode_id,
|
|
title=episode_title_with_year,
|
|
season=episode_season,
|
|
number=episode_number,
|
|
name=episode_name,
|
|
year=show_year,
|
|
service=self.__class__,
|
|
)
|
|
|
|
# Store the thumbnail URL only if it exists
|
|
if "images" in episode and "poster" in episode["images"] and "url" in episode["images"]["poster"]:
|
|
episode_class.thumbnail_url = episode["images"]["poster"]["url"]
|
|
|
|
episodes.append(episode_class)
|
|
pagenumber += 1
|
|
|
|
return Series(episodes)
|
|
|
|
else:
|
|
movie_metadata = self.session.get(f"https://www.viki.com/movies/{title_id}").text
|
|
video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", movie_metadata).group(1)
|
|
|
|
movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json()
|
|
self.movie_metadata = movie_metadata
|
|
movie_id = movie_metadata["id"]
|
|
movie_name = movie_metadata["titles"]["en"]
|
|
|
|
# Check for year at the end of the movie name and strip it
|
|
title_match = re.match(r"^(.*?)(?: (\d{4}))?$", movie_name)
|
|
if title_match:
|
|
base_title = title_match.group(1)
|
|
year = title_match.group(2)
|
|
|
|
if year:
|
|
movie_name = base_title
|
|
|
|
movie_year = self.get_show_year_from_search()
|
|
movie_class = Movie(id_=movie_id, name=movie_name, year=movie_year, service=self.__class__)
|
|
movie_class.thumbnail_url = movie_metadata["images"]["poster"]["url"]
|
|
|
|
return Movies([movie_class])
|
|
|
|
def get_show_year_from_search(self) -> Optional[str]:
|
|
# Determine the query based on metadata
|
|
if hasattr(self, "movie_metadata") and self.movie_metadata:
|
|
query = self.movie_metadata["container"]["titles"]["en"]
|
|
else:
|
|
query = self.series_metadata["response"][0]["container"]["titles"]["en"]
|
|
|
|
# Make the search request
|
|
response = self.session.get(
|
|
self.config["endpoints"]["search_endpoint_url"],
|
|
params={
|
|
"term": query,
|
|
"app": "100000a",
|
|
"per_page": 50,
|
|
"blocked": "true",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
search_data = response.json()
|
|
|
|
for result in search_data.get("response", []):
|
|
if result.get("id") == self.title:
|
|
distributors = result.get("distributors")
|
|
if distributors:
|
|
from_date = distributors[0].get("from")
|
|
if from_date:
|
|
return from_date[:4]
|
|
|
|
match = re.match(self.TITLE_RE, self.title)
|
|
if match:
|
|
extracted_id = match.group("id")
|
|
if extracted_id == result.get("id"):
|
|
distributors = result.get("distributors")
|
|
if distributors:
|
|
from_date = distributors[0].get("from")
|
|
if from_date:
|
|
return from_date[:4]
|
|
|
|
# Default to "2024" if no year is found
|
|
return "2024"
|
|
|
|
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
|
|
chinese_language_map = {
|
|
"zh": "zh-Hans",
|
|
"zt": "zh-Hant",
|
|
"zh-tw": "zh-Hant",
|
|
"zh-hk": "zh-Hant",
|
|
"zh-hans": "zh-Hans",
|
|
"zh-hant": "zh-Hant",
|
|
}
|
|
|
|
# Handle original_id logic
|
|
original_id = title.id[:-4] if title.id.endswith("5349") else title.id
|
|
mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=original_id))
|
|
mpd_data = mpd_info.json()
|
|
|
|
try:
|
|
mpd_url = mpd_data["queue"][1]["url"]
|
|
except (KeyError, IndexError):
|
|
self.log.info("Episode not yet available\n")
|
|
sys.exit(1)
|
|
|
|
mpd_lang = mpd_data["video"]["origin"]["language"].lower()
|
|
if mpd_lang in chinese_language_map:
|
|
mpd_lang_mapped = chinese_language_map[mpd_lang]
|
|
else:
|
|
mpd_lang_mapped = mpd_lang
|
|
|
|
license_url = json.loads(base64.b64decode(mpd_data["drm"]).decode("utf-8", "ignore"))["dt3"]
|
|
tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
|
|
|
|
for track in tracks:
|
|
track.data["license_url"] = license_url
|
|
|
|
for track in tracks.audio:
|
|
track.data["original_language"] = track.language
|
|
track.language = Language.make(language=mpd_lang_mapped)
|
|
|
|
tracks.subtitles.clear()
|
|
|
|
def clean_language_label(label: str) -> str:
|
|
label = re.sub(r"<[^>]+>", "", label)
|
|
label = re.sub(r"\s*\(\d+%\)", "", label)
|
|
return label.strip()
|
|
|
|
if hasattr(title, "thumbnail_url") and title.thumbnail_url:
|
|
thumbnail_url = title.thumbnail_url
|
|
thumbnail_response = self.session.get(thumbnail_url)
|
|
if thumbnail_response.status_code == 200:
|
|
thumbnail_filename = f"{title.id}_thumbnail.jpg"
|
|
thumbnail_path = config.directories.temp / thumbnail_filename
|
|
|
|
os.makedirs(config.directories.temp, exist_ok=True)
|
|
|
|
with open(thumbnail_path, "wb") as f:
|
|
f.write(thumbnail_response.content)
|
|
|
|
thumbnail_attachment = Attachment(
|
|
path=thumbnail_path,
|
|
name=thumbnail_filename,
|
|
mime_type="image/jpeg",
|
|
description="Thumbnail",
|
|
)
|
|
|
|
if not hasattr(tracks, "attachments"):
|
|
tracks.attachments = []
|
|
|
|
tracks.attachments.append(thumbnail_attachment)
|
|
else:
|
|
self.log.warning("Thumbnail download failed.")
|
|
else:
|
|
self.log.warning("Thumbnail URL not available for this title.")
|
|
|
|
stream_subtitles = mpd_data.get("streamSubtitles", {}).get("dash", [])
|
|
if not stream_subtitles:
|
|
self.log.warning("No subtitles available in 'streamSubtitles.dash'.")
|
|
else:
|
|
for sub in stream_subtitles:
|
|
if sub.get("percentage", 0) > 95 and sub.get("kind") == "subtitles":
|
|
language_code_raw = sub.get("srclang", "").lower()
|
|
language_label = sub.get("label", language_code_raw)
|
|
language_name = clean_language_label(language_label)
|
|
|
|
if language_code_raw.startswith("z"):
|
|
language_code_mapped = chinese_language_map.get(language_code_raw, language_code_raw)
|
|
script = "Simplified" if language_code_mapped == "zh-Hans" else "Traditional"
|
|
language_name = f"Chinese ({script})"
|
|
else:
|
|
language_code_mapped = language_code_raw
|
|
|
|
is_original = language_code_mapped == mpd_lang_mapped
|
|
subtitle_id = f"{title.id}_{sub.get('id', '')}_{language_code_mapped}"
|
|
|
|
subtitle_track = Subtitle(
|
|
id_=subtitle_id,
|
|
url=sub["src"],
|
|
codec=Subtitle.Codec.WebVTT,
|
|
language=language_code_mapped,
|
|
is_original_lang=is_original,
|
|
forced=False,
|
|
sdh=False,
|
|
name=language_name,
|
|
)
|
|
if sub.get("default"):
|
|
subtitle_track.default = True
|
|
tracks.add(subtitle_track, warn_only=True)
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, *_, **__) -> Chapters:
|
|
return Chapters()
|
|
|
|
def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str:
|
|
return self.get_widevine_license(challenge, track)
|
|
|
|
def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes:
|
|
return self.session.post(url=track.data["license_url"], data=challenge).content
|