♻️ (TFC): remove redundant comments and improve code readability

 (TFC): add thumbnail attachment support for tracks
📝 (VIU): update last update date in docstring
This commit is contained in:
Sp4rk.y 2024-09-18 22:32:54 -06:00
parent 10767a6f66
commit 3db0ada766
2 changed files with 43 additions and 38 deletions

View File

@ -2,6 +2,7 @@ import json
import re import re
import time import time
import sys import sys
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Union, Generator, Optional from typing import Union, Generator, Optional
from urllib.parse import urljoin from urllib.parse import urljoin
@ -14,6 +15,7 @@ from devine.core.constants import AnyTrack
from devine.core.service import Service from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Tracks, Chapters, Subtitle, Chapter from devine.core.tracks import Tracks, Chapters, Subtitle, Chapter
from devine.core.tracks.attachment import Attachment
from devine.core.credential import Credential from devine.core.credential import Credential
from devine.core.search_result import SearchResult from devine.core.search_result import SearchResult
from devine.core.downloaders import curl_impersonate from devine.core.downloaders import curl_impersonate
@ -22,7 +24,6 @@ from devine.core.config import config
from devine.core.manifests.dash import DASH from devine.core.manifests.dash import DASH
import warnings import warnings
# Weird chunk error from search, we're using this to ignore the warning popup
warnings.filterwarnings("ignore", message="chunk_size is ignored") warnings.filterwarnings("ignore", message="chunk_size is ignored")
@ -82,23 +83,18 @@ class TFC(Service):
headers=headers, headers=headers,
) )
# Parse the authentication response
response_json = auth_response.json() response_json = auth_response.json()
# Check if authentication was successful
if response_json.get("status") == "OK" and "UserAuthentication" in response_json: if response_json.get("status") == "OK" and "UserAuthentication" in response_json:
# Extract token from UserAuthentication
self.token = response_json["UserAuthentication"] self.token = response_json["UserAuthentication"]
self.refresh_token = response_json["refreshToken"] self.refresh_token = response_json["refreshToken"]
self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp() self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp()
# Update session headers with the Authorization token
self.session.headers.update({"Authorization": f"Bearer {self.token}"}) self.session.headers.update({"Authorization": f"Bearer {self.token}"})
else: else:
# Retry login if the first attempt fails if auth_response.status_code == 401:
if auth_response.status_code == 401: # Assuming 401 for unauthorized
print("First login attempt failed, retrying...") print("First login attempt failed, retrying...")
return self.authenticate(cookies, credential) # Recursive retry return self.authenticate(cookies, credential)
else: else:
raise ValueError("Failed to authenticate. Response was not as expected.") raise ValueError("Failed to authenticate. Response was not as expected.")
@ -124,16 +120,13 @@ class TFC(Service):
if not title: if not title:
continue continue
# Get detailed metadata
detail_url = self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=result["objectID"]) detail_url = self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=result["objectID"])
detail_response = self.session.get(detail_url) detail_response = self.session.get(detail_url)
detail_data = detail_response.json() detail_data = detail_response.json()
# Extract description and media type
description = detail_data.get("description", {}).get("en", "")[:200] + "..." description = detail_data.get("description", {}).get("en", "")[:200] + "..."
media_type = "TV" if "children" in detail_data else "Movie" media_type = "TV" if "children" in detail_data else "Movie"
# Extract year and episode count for TV shows
year = detail_data.get("release_year") year = detail_data.get("release_year")
episode_count = 0 episode_count = 0
@ -142,14 +135,12 @@ class TFC(Service):
[episode for episode in detail_data.get("children", []) if "-tlr" not in episode["id"]] [episode for episode in detail_data.get("children", []) if "-tlr" not in episode["id"]]
) )
# Construct label with episode count for TV shows
label = media_type label = media_type
if year: if year:
label += f" ({year})" label += f" ({year})"
if media_type == "TV": if media_type == "TV":
label += f" {episode_count} Episode{'' if episode_count == 1 else 's'}" label += f" {episode_count} Episode{'' if episode_count == 1 else 's'}"
# Create SearchResult with additional details
yield SearchResult( yield SearchResult(
id_=result["objectID"], id_=result["objectID"],
title=title, title=title,
@ -158,7 +149,6 @@ class TFC(Service):
) )
def get_js_value(self) -> Optional[str]: def get_js_value(self) -> Optional[str]:
# Simulate browsing to the page and download the HTML file
for _ in curl_impersonate( for _ in curl_impersonate(
urls="https://www.iwanttfc.com/#!/browse", urls="https://www.iwanttfc.com/#!/browse",
output_dir=config.directories.temp, output_dir=config.directories.temp,
@ -166,12 +156,10 @@ class TFC(Service):
): ):
pass pass
# Read the downloaded HTML file
html_path = config.directories.temp / "browse_page.html" html_path = config.directories.temp / "browse_page.html"
with html_path.open("r", encoding="utf8") as f: with html_path.open("r", encoding="utf8") as f:
html_content = f.read() html_content = f.read()
# Find the script tag with the catalog URL and extract the 'js' value
match = re.search(r'src="https://absprod-static.iwanttfc.com/c/6/catalog/(.*?)/script.js', html_content) match = re.search(r'src="https://absprod-static.iwanttfc.com/c/6/catalog/(.*?)/script.js', html_content)
if match: if match:
return match.group(1) return match.group(1)
@ -179,7 +167,6 @@ class TFC(Service):
return None return None
def get_titles(self) -> Union[Movies, Series]: def get_titles(self) -> Union[Movies, Series]:
# Get title metadata
try: try:
title_metadata = requests.get( title_metadata = requests.get(
self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=self.title) self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=self.title)
@ -188,10 +175,9 @@ class TFC(Service):
self.log.warning("Show title does not exist.") self.log.warning("Show title does not exist.")
sys.exit(1) sys.exit(1)
# Check for GEOFENCE rules (this part remains the same)
rules = title_metadata.get("rules", {}).get("rules", []) rules = title_metadata.get("rules", {}).get("rules", [])
for rule in rules: for rule in rules:
if rule.get("start") <= time.time() * 1000 <= rule.get("end"): # Check if rule is active if rule.get("start") <= time.time() * 1000 <= rule.get("end"):
required_countries = rule.get("countries", []) required_countries = rule.get("countries", [])
if required_countries: if required_countries:
current_region = get_ip_info(self.session)["country"].lower() current_region = get_ip_info(self.session)["country"].lower()
@ -203,19 +189,16 @@ class TFC(Service):
sys.exit(0) sys.exit(0)
if "children" in title_metadata: if "children" in title_metadata:
# TV Show - Extract episodes with correct season info
episodes = [] episodes = []
for episode in title_metadata.get("children", []): for episode in title_metadata.get("children", []):
episode_id = episode["id"] episode_id = episode["id"]
# Extract season and episode number from ID
match = re.match(r".*-s(\d+)e(\d+)$", episode_id, re.IGNORECASE) match = re.match(r".*-s(\d+)e(\d+)$", episode_id, re.IGNORECASE)
if not match: if not match:
continue # Skip if unable to parse season and episode continue
season, number = map(int, match.groups()) season, number = map(int, match.groups())
# Create Episode object with season and episode number
episode_obj = Episode( episode_obj = Episode(
id_=episode_id, id_=episode_id,
title=title_metadata.get("title", {}).get("en"), title=title_metadata.get("title", {}).get("en"),
@ -229,11 +212,9 @@ class TFC(Service):
return Series(episodes) return Series(episodes)
else: else:
# Movie - Extract movie details
movie_name = title_metadata.get("title", {}).get("en") movie_name = title_metadata.get("title", {}).get("en")
movie_year = title_metadata.get("release_year") movie_year = title_metadata.get("release_year")
# Create Movie object
movie_class = Movie( movie_class = Movie(
id_=self.title, id_=self.title,
name=movie_name, name=movie_name,
@ -244,17 +225,16 @@ class TFC(Service):
return Movies([movie_class]) return Movies([movie_class])
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
if isinstance(title, Episode) and not title.data: if not title.data:
# Fetch detailed episode data if needed
episode_data = requests.get( episode_data = requests.get(
self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id) self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id)
).json() ).json()
title.data = episode_data title.data = episode_data
else:
episode_data = title.data
# Extract MPD URLs
mpd_urls = episode_data.get("media", {}).get("mpds", []) mpd_urls = episode_data.get("media", {}).get("mpds", [])
# Extract subtitle URLs and languages
subtitle_data = [ subtitle_data = [
( (
urljoin(self.config["endpoints"]["api_subtitle"], caption.get("id")) + ".vtt", urljoin(self.config["endpoints"]["api_subtitle"], caption.get("id")) + ".vtt",
@ -265,12 +245,11 @@ class TFC(Service):
tracks = Tracks() tracks = Tracks()
# Create Video and Audio Tracks from MPDs, avoiding duplicates and storing episode_id
for mpd_url in mpd_urls: for mpd_url in mpd_urls:
mpd_tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language or "fil") mpd_tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language or "fil")
for track in mpd_tracks: for track in mpd_tracks:
if not tracks.exists(by_id=track.id): if not tracks.exists(by_id=track.id):
track.data["episode_id"] = episode_data.get("id") # Store episode_id in track.data track.data["episode_id"] = episode_data.get("id")
tracks.add(track) tracks.add(track)
for track in tracks.audio: for track in tracks.audio:
@ -280,7 +259,6 @@ class TFC(Service):
track.language._dict = {"language": mpd_lang} track.language._dict = {"language": mpd_lang}
track.language._str_tag = mpd_lang track.language._str_tag = mpd_lang
# Create Subtitle Tracks for all languages, avoiding duplicates
for subtitle_url, language in subtitle_data: for subtitle_url, language in subtitle_data:
subtitle_track = Subtitle( subtitle_track = Subtitle(
id_=subtitle_url.split("/")[-1].split(".")[0], id_=subtitle_url.split("/")[-1].split(".")[0],
@ -296,6 +274,37 @@ class TFC(Service):
chapters = self.get_chapters(title) chapters = self.get_chapters(title)
tracks.chapters = Chapters(chapters) tracks.chapters = Chapters(chapters)
thumbnail_id = episode_data.get("thumbnail") or episode_data.get("poster") or episode_data.get("thumb")
if not thumbnail_id:
images = episode_data.get("images", [])
if images:
thumbnail_data = images[0]
thumbnail_id = thumbnail_data.get("id") or thumbnail_data.get("url").split("/")[-1].split(".")[0]
if thumbnail_id:
thumbnail_base_url = self.config["endpoints"]["api_thumbnail"]
thumbnail_url = f"{thumbnail_base_url}{thumbnail_id}.jpg"
thumbnail_response = self.session.get(thumbnail_url)
if thumbnail_response.status_code == 200:
thumbnail_filename = f"{title.id}_thumbnail.jpg"
thumbnail_path = config.directories.temp / thumbnail_filename
os.makedirs(config.directories.temp, exist_ok=True)
with open(thumbnail_path, "wb") as f:
f.write(thumbnail_response.content)
thumbnail_attachment = Attachment(
path=thumbnail_path,
name=thumbnail_filename,
mime_type="image/jpeg",
description="Thumbnail",
)
tracks.attachments.append(thumbnail_attachment)
else:
self.log.warning("Thumbnail not found for title.")
return tracks return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:

View File

@ -31,7 +31,7 @@ class VIU(Service):
1 & 2 has different api 1 & 2 has different api
Author: unnamed improved by @sp4rk.y Author: unnamed improved by @sp4rk.y
last update: 17/09/2024 last update: 18/09/2024
""" """
# GEOFENCE = ("sg",) # GEOFENCE = ("sg",)
@ -67,11 +67,7 @@ class VIU(Service):
credential: Optional[Credential] = None, credential: Optional[Credential] = None,
) -> None: ) -> None:
self.credentials = credential self.credentials = credential
self.session.headers.update( self.session.headers.update({"Referer": "https://viu.com/"})
{
"Referer": "https://viu.com/" # headers Origin make 403 error
}
)
self.log.info(" + Downloading without an account...") self.log.info(" + Downloading without an account...")
self.log.info(f" + Detected using: {self.jenis}") self.log.info(f" + Detected using: {self.jenis}")