Improved by @sp4rk.y & Fixed the bug causing series to be limited to first 50 episodes

master
ToonsHub 2024-03-25 22:06:16 +00:00
parent 79bc77962c
commit 5229627e6e
1 changed files with 70 additions and 105 deletions

View File

@ -1,125 +1,104 @@
import base64
import re
import math
import json
import time
import datetime
import logging
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlparse
import re
from typing import Union
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Series, Movies, Movie, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.titles import Title_T, Titles_T
from devine.core.tracks import Video, Audio, Subtitle, Chapter, Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapters, Tracks
class VIKI(Service):
"""
Service code for Viki
Written by ToonsHub
Written by ToonsHub, improved by @sp4rk.y [Discord]
Authorization: None (Free SD) | Cookies (Free and Paid Titles)
Security: FHD@L3
"""
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__)
# Using series ID
@click.argument("title", type=str)
# Movie Tag
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.")
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return VIKI(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title, movie):
# Pass the series_id argument to self so it's accessable across all methods
def __init__(self, ctx, title: str, movie: bool):
self.title = title
self.is_movie = movie
# Overriding the constructor
super().__init__(ctx)
self.session.headers.update(
{
"user-agent": self.config["browser"]["user-agent"],
"x-client-user-agent": self.config["browser"]["user-agent"],
"x-viki-app-ver": self.config["browser"]["x-viki-app-ver"],
"x-viki-as-id": self.config["browser"]["x-viki-as-id"],
}
)
# Defining an authinticate function
def authenticate(self, cookies: Optional[CookieJar], credential: Optional[Credential] = None):
# Save Cookies
if cookies:
self.session.cookies.get(cookies)
self.session.cookies.update(cookies)
# Defining a function to return titles
def get_titles(self):
def get_titles(self) -> Union[Movies, Series]:
if not self.is_movie:
# Get the metadata needed for the series
series_metadata = self.session.get(self.config['endpoints']['episode_metadata'].format(id=self.title))
# Set an empty list for episodes
episodes = []
pagenumber = 1
while True:
series_metadata = self.session.get(f"https://api.viki.io/v4/containers/{self.title}/episodes.json?direction=asc&with_upcoming=false&sort=number&page={pagenumber}&per_page=100&app=100000a").json()
if not series_metadata["response"] and not series_metadata["more"]:
break
# Get the episode metadata by iterating through each season id
for episode in series_metadata.json()['response']:
# Get the episode metadata by iterating through each episode
for episode in series_metadata["response"]:
# Get the id
episode_id = episode["id"]
# Get the id
episode_id = episode["id"]
# Get the show title
show_title = episode["container"]["titles"]["en"]
# Get the show title
show_title = episode["container"]["titles"]["en"]
# Get the season
episode_season = 1
# Get the season
episode_season = 1
# Get the episode number
episode_number = episode["number"]
# Get the episode number
episode_number = episode["number"]
# Get the episode name
episode_name = None
# Get the episode name
episode_name = None
# Get the episode year
episode_year = episode["created_at"][:4]
# Get the episode year
episode_year = episode["created_at"][:4]
# Set a class for each episode
episode_class = Episode(
id_=episode_id,
title=show_title,
season=episode_season,
number=episode_number,
name=episode_name,
year=episode_year,
service=self.__class__,
)
# Set a class for each episode
episode_class = Episode(id_=episode_id, title=show_title, season=episode_season, number=episode_number, name=episode_name, year=episode_year, service=self.__class__)
# Append it to the list
episodes.append(episode_class)
# Append it to the list
episodes.append(episode_class)
pagenumber += 1
# Return the episodes as a Series object
return Series(episodes)
else:
else:
# Get Video API URL
page_html = requests.get(f"https://www.viki.com/movies/{self.title}").text
video_id = re.search(r'https://api.viki.io/v4/videos/(.*?).json', page_html).group(1)
video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", page_html).group(1)
# Get Movie Data
movie_metadata = self.session.get(self.config['endpoints']['video_metadata'].format(id=video_id)).json()
movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json()
movie_id = movie_metadata["id"]
movie_name = movie_metadata["titles"]["en"]
movie_year = movie_metadata["created_at"][:4]
@ -127,41 +106,27 @@ class VIKI(Service):
return Movies([movie_class])
# Defining a function to get tracks
def get_tracks(self, title: Title_T) -> Tracks:
# Set the headers for the request
headers = {
'user-agent': self.config['browser']['user-agent'],
'x-client-user-agent': self.config['browser']['user-agent'],
'x-viki-app-ver': self.config['browser']['x-viki-app-ver'],
'x-viki-as-id': self.config['browser']['x-viki-as-id'],
}
# Update the headers
self.session.headers.update(headers)
mpd_info = self.session.get(self.config['endpoints']['mpd_api'].format(id=title.id))
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=title.id))
mpd_url = mpd_info.json()["queue"][1]["url"]
mpd_lang = mpd_info.json()["video"]["origin"]["language"]
self.license_url = json.loads(base64.b64decode(mpd_info.json()["drm"]).decode("utf-8", "ignore"))["dt3"]
# Grab the tracks from the MPD
# this thing here looks wrong/overcomplicated buyt alas might not be
license_url = json.loads(base64.b64decode(mpd_info.json()["drm"]).decode("utf-8", "ignore"))["dt3"]
tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
# Return the tracks
for track in tracks:
track.data["license_url"] = license_url
return tracks
# Defining a function to get chapters
def get_chapters(self, title):
return []
def get_chapters(self, *_, **__) -> Chapters:
return Chapters()
# Defining a function to get widevine license keys
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str:
# TODO: Cache the returned service cert
return self.get_widevine_license(challenge, track)
# Send the post request to the license server
license_raw = self.session.post(self.license_url, data=challenge).content
# Return the license
return base64.b64encode(license_raw).decode()
def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes:
return self.session.post(url=track.data["license_url"], data=challenge).content