diff --git a/README.md b/README.md index 52ef3d1..76d1b2f 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,12 @@ 10. SKST (the hardest service I ever dealt upon now): - Subtitle has been fixed, hopefully no issue 11. VLD: - - Token isn't cached so that's a major problem with series + - All seems fine working for now 12. HPLA: - No support for Television yet - Music needs to be fixed since the output is a mp4 instead of m4a + 13. SHUD: + - PlayReady needed - Acknowledgment diff --git a/SHUD/__init__.py b/SHUD/__init__.py new file mode 100644 index 0000000..8f37080 --- /dev/null +++ b/SHUD/__init__.py @@ -0,0 +1,741 @@ +import base64 +import hashlib +import json +import re +from collections.abc import Generator +from datetime import datetime, timedelta +from http.cookiejar import CookieJar +from typing import Optional, Union + +import click +from langcodes import Language +from urllib.parse import parse_qs, urlparse +from unshackle.core.constants import AnyTrack +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH, HLS +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T +from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video + + +class SHUD(Service): + """ + Service code for watch.shudder.com + Version: 1.0.0 + + Authorization: Bearer JWT Token + + Security: FHD@L3 + + Use full URL (for example - https://watch.shudder.com/watch/927436) or title ID (for example - 927436). + """ + + TITLE_RE = r"^(?:https?://watch\.shudder\.com/[^/]+/)?(?P\d+)" + GEOFENCE = ("US", "CA", "GB", "AU", "IE", "NZ") + NO_SUBTITLES = False + + VIDEO_RANGE_MAP = { + "SDR": "sdr", + "HDR10": "hdr10", + "DV": "dolby_vision", + } + + @staticmethod + @click.command(name="SHUD", short_help="https://watch.shudder.com") + @click.argument("title", type=str) + @click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie") + @click.option("-d", "--device", type=str, default="web", help="Select device from the config file") + @click.pass_context + def cli(ctx, **kwargs): + return SHUD(ctx, **kwargs) + + def __init__(self, ctx, title, movie, device): + super().__init__(ctx) + + self.title = title + self.movie = movie + self.device = device + self.cdm = ctx.obj.cdm + + # Track request overrides based on device/CDM capabilities + if any(r != Video.Range.SDR for r in self.track_request.ranges): + self.track_request.codecs = [Video.Codec.HEVC] + + if self.cdm and self.cdm.security_level == 3: + self.track_request.codecs = [Video.Codec.AVC] + self.track_request.ranges = [Video.Range.SDR] + + if self.config is None: + raise Exception("Config is missing!") + + profile_name = ctx.parent.params.get("profile") + self.profile = profile_name or "default" + self.license_data = {} + self.realm = "dce.shudder" + self.api_key = self.config["api_key"] + + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + + # Set required headers for all requests + self.session.headers.update({ + "User-Agent": self.config["client"][self.device]["user_agent"], + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US", + "Accept-Encoding": "gzip, deflate, br, zstd", + "Referer": "https://watch.shudder.com/", + "Content-Type": "application/json", + "x-api-key": self.api_key, + "app": "dice", + "x-app-var": self.config["client"][self.device]["app_version"], + "Origin": "https://watch.shudder.com", + "Connection": "keep-alive", + }) + + # Handle credential-based auth (email/password) + if credential: + self.log.info("Authenticating with credentials") + + # First get init tokens + init_params = { + "lk": "language", + "pk": "subTitleLanguage,subtitlePreferenceMode,subtitlePreferenceMap,audioLanguage,autoAdvance,pluginAccessTokens,videoBackgroundAutoPlay", + "readLicences": "true", + "countEvents": "LIVE", + "menuTargetPlatform": "WEB", + "readIconStore": "ENABLED", + "readUserProfiles": "true" + } + + init_resp = self.session.get( + url=self.config["endpoints"]["init"], + params=init_params + ) + init_resp.raise_for_status() + init_data = init_resp.json() + + # Login with credentials + login_resp = self.session.post( + url=self.config["endpoints"]["login"], + headers={ + "Authorization": f"Bearer {init_data.get('authentication', {}).get('authorisationToken', '')}", + "Realm": self.realm, + }, + json={ + "id": credential.username, + "secret": credential.password, + } + ) + login_resp.raise_for_status() + login_data = login_resp.json() + + self.auth_token = login_data.get("authorisationToken") + self.refresh_token = login_data.get("refreshToken") + + if not self.auth_token: + raise ValueError("Authentication failed - no token received") + + self.session.headers.update({ + "Authorization": f"Bearer {self.auth_token}", + "Realm": self.realm, + }) + + self.log.info("Authentication successful") + return + + if cookies: + self.log.info("Authenticating with cookies") + for cookie in cookies: + if cookie.name == "auth_token": + self.auth_token = cookie.value + self.session.headers.update({ + "Authorization": f"Bearer {self.auth_token}", + "Realm": self.realm, + }) + return + raise ValueError("No valid auth_token cookie found") + + raise EnvironmentError("Service requires Credentials or Cookies for Authentication.") + + def search(self) -> Generator[SearchResult, None, None]: + """Search for titles on Shudder""" + search_resp = self.session.get( + url=self.config["endpoints"]["search"], + params={ + "query": self.title, + "timezone": self.config.get("timezone", "UTC"), + } + ) + search_resp.raise_for_status() + search_data = search_resp.json() + + cards = [] + for element in search_data.get("elements", []): + if element.get("$type") != "cardList": + continue + cards.extend(element.get("attributes", {}).get("cards", [])) + + for card in cards: + attrs = card.get("attributes", {}) + action = attrs.get("action", {}) + route = action.get("data", {}) if action.get("type") == "route" else {} + + if not route: + continue + + content_type = str(route.get("type", "")).upper() + if content_type not in ("VOD", "SERIES"): + continue + + raw_id = str(route.get("id", "")) + if not raw_id: + continue + + # "VOD#877410" -> "877410" + # "SERIES#3311" -> "3311" + title_id = raw_id.split("#", 1)[-1].strip() + if not title_id: + continue + + is_series = content_type == "SERIES" + + yield SearchResult( + id_=title_id, + title=route.get("title", ""), + label="SERIES" if is_series else "MOVIE", + url=f"https://watch.shudder.com/{'series' if is_series else 'watch'}/{title_id}", + ) + + def _parse_title_input(self) -> tuple[str, Optional[str], Optional[str]]: + """ + Returns: + (title_id, kind, season_id) + + kind: + - "watch" for movie/episode URLs like /watch/927436 + - "series" for series URLs like /series/3713?seasonId=33510 + - None for raw numeric ids + """ + raw = str(self.title).strip() + + if raw.isdigit(): + return raw, None, None + + parsed = urlparse(raw) + if parsed.scheme and parsed.netloc: + parts = [p for p in parsed.path.split("/") if p] + kind = parts[0].lower() if parts else None + title_id = parts[1] if len(parts) > 1 else None + season_id = parse_qs(parsed.query).get("seasonId", [None])[0] + + if title_id and title_id.isdigit(): + return title_id, kind, season_id + + match = re.match(self.TITLE_RE, raw) + if not match: + raise ValueError(f"Invalid Shudder title: {raw}") + + return match.group("title_id"), None, None + + + def _build_manifest_payload(self, video_id: Union[str, int]) -> dict: + return { + "mediaCapabilities": [ + { + "protocols": ["HLS", "DASH"], + "audioCodecs": ["aac"], + "videoCodecs": ["h264", "hevc"], + }, + { + "keySystem": "WIDEVINE", + "robustness": "software", + "protocols": ["HLS", "DASH"], + "audioCodecs": ["aac"], + "encryptionMode": ["CBC", "CTR"], + "videoCodecs": ["h264"], + }, + ], + "macros": { + "CM-APP-NAME": "Website", + "CM-APP-VERSION": self.config["client"][self.device]["app_version"], + "CM-DVC-DNT": "0", + "CM-DVC-H": "1200", + "CM-DVC-W": "1920", + "CM-DVC-LANG": "en-US", + "CM-DVC-OS": "14", + "CM-DVC-TYPE": "2", + "CM-WEB-MBL": "0", + "CM-WEB-PAGE": f"/video/{video_id}", + "CM-CST-TCF": "", + "CM-CST-USP": "", + "CM-DVC-ATS": "", + }, + } + + + def _get_video_metadata(self, video_id: str) -> Optional[dict]: + resp = self.session.post( + url=self.config["endpoints"]["video"].format(video_id=video_id), + params={"includePlaybackDetails": "URL", "displayGeoblocked": "HIDE"}, + json=self._build_manifest_payload(video_id), + ) + + if resp.status_code in (404, 405): + return None + + resp.raise_for_status() + return resp.json() + + + def _get_series_view(self, series_id: str, season_id: Optional[str] = None) -> dict: + """ + Supports both config styles: + 1. full templated URL: + https://.../api/v1/view?type=series&id={series_id}&timezone=UTC + 2. base URL: + https://.../api/v1/view + """ + endpoint = self.config["endpoints"]["series_view"] + params = {} + + if "{series_id}" in endpoint: + url = endpoint.format(series_id=series_id) + else: + url = endpoint + params.update({ + "type": "series", + "id": series_id, + "timezone": self.config.get("timezone", "UTC"), + }) + + if season_id: + params["seasonId"] = season_id + + resp = self.session.get(url=url, params=params or None) + resp.raise_for_status() + return resp.json() + + + def _get_series_element(self, data: dict) -> dict: + for element in data.get("elements", []): + if element.get("$type") == "series": + return element.get("attributes", {}) + return {} + + + def _get_season_bucket(self, data: dict) -> dict: + for element in data.get("elements", []): + if element.get("$type") != "bucket": + continue + attrs = element.get("attributes", {}) + if attrs.get("tab") == "season" or attrs.get("type") == "season": + return attrs + return {} + + + def _extract_series_description(self, data: dict) -> str: + for element in data.get("elements", []): + if element.get("$type") != "hero": + continue + for item in element.get("attributes", {}).get("content", []): + if item.get("$type") == "textblock": + text = item.get("attributes", {}).get("text") + if text: + return text + return "" + + + def _extract_series_year(self, data: dict) -> Optional[int]: + for element in data.get("elements", []): + if element.get("$type") != "hero": + continue + for item in element.get("attributes", {}).get("content", []): + if item.get("$type") != "tagList": + continue + for tag in item.get("attributes", {}).get("tags", []): + text = str(tag.get("attributes", {}).get("text", "")).strip() + if re.fullmatch(r"\d{4}", text): + return int(text) + return None + + + @staticmethod + def _parse_episode_label(label: str, fallback_number: int) -> tuple[int, str]: + label = (label or "").strip() + if not label: + return fallback_number, f"Episode {fallback_number}" + + m = re.match(r"^E(?P\d+)\s*[-:]\s*(?P.+)$", label, re.I) + if m: + return int(m.group("number")), m.group("name").strip() + + m = re.match(r"^Episode\s+(?P\d+)\s*[-:]\s*(?P.+)$", label, re.I) + if m: + return int(m.group("number")), m.group("name").strip() + + return fallback_number, label + + + def _get_series_titles(self, series_id: str, preferred_season_id: Optional[str] = None) -> Series: + """ + Important: + The /view response usually contains episode items only for the selected season. + So we fetch the initial page, then request each season explicitly with seasonId=... + """ + page = self._get_series_view(series_id, preferred_season_id) + + series_element = self._get_series_element(page) + season_bucket = self._get_season_bucket(page) + metadata = page.get("metadata", {}) + + series_title = ( + metadata.get("pageTitle") + or series_element.get("series", {}).get("title") + or "" + ) + series_description = self._extract_series_description(page) + series_year = self._extract_series_year(page) + + seasons = series_element.get("seasons", {}).get("items", []) + if not seasons: + raise ValueError(f"No seasons found for series {series_id}") + + initial_season_id = str( + season_bucket.get("seasonId") + or season_bucket.get("id") + or series_element.get("seasonId") + or metadata.get("currentSeason", {}).get("seasonId") + or "" + ) + + cached_items = {} + if initial_season_id: + cached_items[initial_season_id] = season_bucket.get("items", []) + + built_episodes = [] + seen_episode_ids = set() + + for season_index, season in enumerate(seasons, start=1): + season_id = str(season.get("id")) + + season_number = season.get("seasonNumber") + if season_number is None: + m = re.search(r"(\d+)", str(season.get("title", ""))) + season_number = int(m.group(1)) if m else season_index + else: + season_number = int(season_number) + + items = cached_items.get(season_id) + if items is None: + season_page = self._get_series_view(series_id, season_id) + season_bucket = self._get_season_bucket(season_page) + items = season_bucket.get("items", []) + + if not items: + self.log.warning(f"No episode items returned for series {series_id}, season {season_number}") + continue + + for fallback_ep_num, item in enumerate(items, start=1): + episode_id = str(item["id"]) + if episode_id in seen_episode_ids: + continue + seen_episode_ids.add(episode_id) + + episode_number, episode_name = self._parse_episode_label( + item.get("title", ""), + fallback_ep_num, + ) + + built_episodes.append(( + season_number, + episode_number, + Episode( + id_=episode_id, + service=self.__class__, + title=series_title, + season=season_number, + number=episode_number, + name=episode_name, + year=series_year, + language=Language.get("en"), + data={ + **item, + "series_id": int(series_id), + "series_title": series_title, + "series_description": series_description, + "season_id": season.get("id"), + "season_title": season.get("title"), + "season_number": season_number, + "episode_number": episode_number, + }, + ), + )) + + if not built_episodes: + raise ValueError(f"No episodes found for series {series_id}") + + return Series([ + episode + for _, _, episode in sorted(built_episodes, key=lambda x: (x[0], x[1])) + ]) + + def get_titles(self) -> Titles_T: + """Get movie or series metadata""" + title_id, kind, season_id = self._parse_title_input() + self.title = title_id + + # Explicit /series/... URL -> go straight to series handling + if not self.movie and kind == "series": + return self._get_series_titles(title_id, season_id) + + # Try movie/video manifest first + metadata = self._get_video_metadata(title_id) + + # If manifest lookup fails, try series view + if metadata is None: + if not self.movie: + self.log.info(f"Manifest lookup failed for {title_id}, trying series view") + return self._get_series_titles(title_id, season_id) + raise ValueError(f"Title {title_id} not found") + + if metadata.get("contentDownload", {}).get("permission") == "DISALLOWED": + self.log.warning(f"Download not permitted for title {title_id}") + + content_type = str(metadata.get("type", "")).upper() + + # Movie path + if self.movie or content_type in ("VOD", "MOVIE"): + return Movies([ + Movie( + id_=metadata["id"], + service=self.__class__, + name=metadata.get("title", ""), + description=metadata.get("description", metadata.get("longDescription", "")), + year=int(metadata.get("productionYear", 0)) if metadata.get("productionYear") else None, + language=Language.get("en"), + data=metadata, + ) + ]) + + # Direct episode ids are not ideal without the parent series context + if "SEASON" in content_type or "EPISODE" in content_type: + raise ValueError( + "Direct episode IDs are not supported yet. " + "Use the series URL or series id instead." + ) + + # Fallback to series handling + return self._get_series_titles(title_id, season_id) + + def get_tracks(self, title: Title_T) -> Tracks: + """Fetch and parse manifest tracks""" + def _fetch_variant( + title: Title_T, + codec: Optional[Video.Codec], + range_: Video.Range, + ) -> Tracks: + vcodec_str = "hevc" if codec == Video.Codec.HEVC else "h264" + range_str = range_.name + video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr") + + self.log.info(f" + Fetching {vcodec_str.upper()} {range_str} manifest") + + # Build media capabilities payload + media_capabilities = [ + { + "protocols": ["HLS", "DASH"], + "audioCodecs": ["aac"], + "videoCodecs": [vcodec_str], + } + ] + + # Add DRM capabilities for encrypted streams + if codec: + media_capabilities.append({ + "keySystem": "WIDEVINE", + "robustness": "software", + "protocols": ["HLS", "DASH"], + "audioCodecs": ["aac"], + "encryptionMode": ["CBC", "CTR"], + "videoCodecs": [vcodec_str], + }) + + # Build macros for request + macros = { + "CM-APP-NAME": "Website", + "CM-APP-VERSION": self.config["client"][self.device]["app_version"], + "CM-DVC-DNT": "0", + "CM-DVC-H": "1080", + "CM-DVC-W": "1920", + "CM-DVC-LANG": "en-US", + "CM-DVC-OS": "14", + "CM-DVC-TYPE": "2", + "CM-WEB-MBL": "0", + f"CM-WEB-PAGE": f"/video/{title.id}", + } + + # Inside _fetch_variant() in get_tracks(): + manifest_resp = self.session.post( + url=self.config["endpoints"]["manifest"].format(video_id=title.id), + params={"includePlaybackDetails": "URL", "displayGeoblocked": "HIDE"}, + json={ + "mediaCapabilities": media_capabilities, # Same as above + "macros": macros, # Same as above, update CM-WEB-PAGE with title.id + } + ) + manifest_resp.raise_for_status() + manifest_data = manifest_resp.json() + + # Extract stream URL and DRM info + streams = manifest_data.get("streams", []) + if not streams: + raise ValueError("No streams available for this title") + + stream = streams[0] # Take first available stream + stream_url = stream.get("url") + + if not stream_url: + raise ValueError("No stream URL found in manifest") + + # Store DRM/license data for later use + drm = stream.get("drm", {}) + if drm: + self.license_data = { + "url": drm.get("url", self.config["endpoints"]["widevine_license"]), + "jwtToken": drm.get("jwtToken", ""), + "encryptionMode": drm.get("encryptionMode", "CBC"), + "keySystems": drm.get("keySystems", []), + } + + # Parse manifest based on protocol + if "m3u8" in stream_url.lower(): + tracks = HLS.from_url(url=stream_url, session=self.session).to_tracks(language=title.language) + else: + tracks = DASH.from_url(url=stream_url, session=self.session).to_tracks(language=title.language) + + # Apply video range to tracks + range_enum = { + "hdr10": Video.Range.HDR10, + "dolby_vision": Video.Range.DV, + }.get(video_format, Video.Range.SDR) + + for video in tracks.videos: + video.range = range_enum + + # Filter audio tracks (remove clear/unencrypted if DRM present) + if drm: + tracks.audio = [ + track for track in tracks.audio + if "clear" not in str(track.data).lower() + ] + + # Fix channel counts + for track in tracks.audio: + if track.channels == 6.0: + track.channels = 5.1 + # Check for descriptive audio + label = track.data.get("label", "").lower() if isinstance(track.data, dict) else "" + if "audio description" in label or "descriptive" in label: + track.descriptive = True + + + return tracks + + return self._get_tracks_for_variants(title, _fetch_variant) + + def get_chapters(self, title: Title_T) -> list[Chapter]: + """Extract chapter markers if available""" + chapters = [] + + # Check for skip markers in title data + skip_markers = title.data.get("skipMarkers", []) + for marker in skip_markers: + marker_type = marker.get("type", "").lower() + start = marker.get("start", marker.get("offset")) + end = marker.get("end") + + if marker_type == "intro" and start is not None: + chapters.append(Chapter(timestamp=int(start), name="Opening")) + if end: + chapters.append(Chapter(timestamp=int(end))) + elif marker_type == "credits" and start is not None: + chapters.append(Chapter(timestamp=int(start), name="Credits")) + + return chapters + + def get_widevine_service_certificate(self, **_: any) -> str: + """Return Widevine service certificate if configured""" + return self.config.get("certificate", "") + + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: + """Request Widevine license from Shudder's DRM server""" + license_url = self.license_data.get("url") or self.config["endpoints"]["widevine_license"] + + if not license_url: + raise ValueError("Widevine license endpoint not configured") + + # Build license request headers + headers = { + "User-Agent": self.config["client"][self.device]["user_agent"], + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9", + "Accept-Encoding": "gzip, deflate, br, zstd", + "Origin": "https://watch.shudder.com", + "Referer": "https://watch.shudder.com/", + "Connection": "keep-alive", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", + } + + # Add DRM info header for Widevine + drm_info = {"system": "com.widevine.alpha"} + headers["X-DRM-INFO"] = base64.b64encode(json.dumps(drm_info).encode()).decode() + + # Add authorization if we have JWT token + jwt_token = self.license_data.get("jwtToken") + if jwt_token: + headers["Authorization"] = f"Bearer {jwt_token}" + elif hasattr(self, "auth_token"): + headers["Authorization"] = f"Bearer {self.auth_token}" + + # Send license request + response = self.session.post( + url=license_url, + data=challenge, + headers=headers, + ) + response.raise_for_status() + + # Handle JSON or binary license response + try: + license_data = response.json() + # Shudder may return license in different fields + return license_data.get("license") or license_data.get("data") or response.content + except ValueError: + return response.content + + def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: +# """Request PlayReady license (if supported)""" +# license_url = self.config["endpoints"].get("playready_license") +# +# if not license_url: +# self.log.warning("PlayReady license endpoint not configured") +# return None +# +# headers = { +# "User-Agent": self.config["client"][self.device]["user_agent"], +# "Content-Type": "application/octet-stream", +# } +# +# if hasattr(self, "auth_token"): +# headers["Authorization"] = f"Bearer {self.auth_token}" +# +# response = self.session.post( +# url=license_url, +# data=challenge, +# headers=headers, +# ) +# response.raise_for_status() +# return response.content diff --git a/SHUD/config.yaml b/SHUD/config.yaml new file mode 100644 index 0000000..a65f9fe --- /dev/null +++ b/SHUD/config.yaml @@ -0,0 +1,49 @@ +# Shudder (SHUD) Configuration + +api_key: "857a1e5d-e35e-4fdf-805b-a87b6f8364bf" + +endpoints: + # Initialization + init: "https://dce-frontoffice.imggaming.com/api/v1/init/" + + # Authentication (with caching support) + login: "https://dce-frontoffice.imggaming.com/api/v2/login" + refresh: "https://dce-frontoffice.imggaming.com/api/v2/token/refresh" + + # Content Discovery + search: "https://search.dce-prod.dicelaboratory.com/search" + + # Video/Episode Manifest (POST) + video: "https://dce-frontoffice.imggaming.com/api/v5/manifest/video/{video_id}" + manifest: "https://dce-frontoffice.imggaming.com/api/v5/manifest/video/{video_id}" + + # Series Metadata + series_view: "https://dce-frontoffice.imggaming.com/api/v1/view?type=series&id={series_id}&timezone=UTC" + + # DRM License Servers + widevine_license: "https://shield-drm.imggaming.com/api/v2/license" + playready_license: "https://shield-drm.imggaming.com/api/v2/license" + +client: + web: + user_agent: "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0" + license_user_agent: "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0" + app_version: "6.60.0.7cf91e1" + type: "BROWSER" + + android_tv: + user_agent: "okhttp/4.12.0" + license_user_agent: "okhttp/4.12.0" + app_version: "6.60.0" + type: "ANDROID_TV" + +# Auth token cache duration (seconds) +auth_cache_duration: 3600 + +# Optional: Widevine certificate +# certificate: "CAUSxwE..." + +realm: "dce.shudder" +language: "en_US" +rate_limit: 2 +session_timeout: 300 diff --git a/VLD/__init__.py b/VLD/__init__.py index f934122..9eca72a 100644 --- a/VLD/__init__.py +++ b/VLD/__init__.py @@ -19,7 +19,7 @@ from unshackle.core.tracks import Chapter, Subtitle, Tracks class VLD(Service): """ Service code for RTL's Dutch streaming service Videoland (https://v2.videoland.com) - Version: 1.0.0 + Version: 1.1.0 Authorization: Credentials @@ -62,12 +62,17 @@ class VLD(Service): self.platform = self.config["platform"]["android_tv"] self.platform_token = "token-androidtv-3" + # Auth state - initialized to None, populated by authenticate() + self.access_token = None + self.gigya_uid = None + self.profile_id = None + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not credential or not credential.username or not credential.password: raise EnvironmentError("Service requires Credentials for Authentication.") - self.credential = credential # Store for potential re-auth + self.credential = credential self.session.headers.update({ "origin": "https://v2.videoland.com", @@ -75,25 +80,56 @@ class VLD(Service): "x-customer-name": "rtlnl", }) - # Build cache key - cache_key = f"tokens_{self.profile}" - - # Check cache first + cache_key = f"tokens_{credential.username}" cache = self.cache.get(cache_key) if cache and not cache.expired: cached_data = cache.data - if isinstance(cached_data, dict) and cached_data.get("username") == credential.username: - self.log.info("Using cached tokens") + if ( + isinstance(cached_data, dict) + and cached_data.get("username") == credential.username + and cached_data.get("access_token") + and cached_data.get("gigya_uid") + and cached_data.get("profile_id") + ): + self.log.info("Using cached Videoland tokens") self._restore_from_cache(cached_data) return + else: + self.log.warning("Cached token data is incomplete or mismatched, re-authenticating") - # Perform fresh login - self.log.info("Retrieving new tokens") + self.log.info("Retrieving new Videoland tokens") self._do_login(credential) + self._cache_tokens(credential.username, cache) - # Cache the tokens - self._cache_tokens(credential.username, cache_key) + def _invalidate_cache(self) -> None: + """Wipe the cached tokens for the current credential so the next + call to authenticate() is forced to perform a fresh login.""" + if not self.credential: + return + cache_key = f"tokens_{self.credential.username}" + cache = self.cache.get(cache_key) + # Writing an empty dict with a TTL of 0 effectively expires it + # immediately so the next cache.expired check returns True. + try: + cache.set(data={}, expiration=0) + self.log.debug("Token cache invalidated") + except Exception: + pass # If the cache backend refuses, just continue + + def _reauthenticate(self) -> None: + """Invalidate the cache and perform a completely fresh login. + + Call this whenever the API returns a token-expired error so that + the rest of the current run continues with valid credentials. + """ + self.log.warning("Access token has expired — invalidating cache and re-authenticating") + self._invalidate_cache() + self._do_login(self.credential) + # Re-persist the brand-new tokens + cache_key = f"tokens_{self.credential.username}" + cache = self.cache.get(cache_key) + self._cache_tokens(self.credential.username, cache) def _restore_from_cache(self, cached_data: dict) -> None: """Restore authentication state from cached data.""" @@ -102,9 +138,13 @@ class VLD(Service): self.profile_id = cached_data["profile_id"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) - def _cache_tokens(self, username: str, cache_key: str) -> None: - """Cache the current authentication tokens.""" - cache = self.cache.get(cache_key) + def _cache_tokens(self, username: str, cache: object) -> None: + """Persist the current tokens into the cache object. + + Accepts the cache object directly instead of re-fetching it by key, + so we always write to the exact same object we checked during the + cache-hit test in authenticate(). + """ cache.set( data={ "username": username, @@ -112,12 +152,16 @@ class VLD(Service): "gigya_uid": self.gigya_uid, "profile_id": self.profile_id, }, - expiration=3600 # 1 hour expiration, adjust as needed + # 3500 seconds gives a 100-second safety margin below the + # typical 1-hour JWT lifetime so we never use a nearly-expired token. + expiration=3500, ) + self.log.info("Videoland tokens cached successfully") def _do_login(self, credential: Credential) -> None: - """Perform full login flow.""" - # Step 1: Authorize with Gigya + """Perform the full four-step Videoland / Gigya login flow.""" + + # ── Step 1: Gigya account login ────────────────────────────── auth_response = self.session.post( url=self.config["endpoints"]["authorization"], data={ @@ -137,13 +181,15 @@ class VLD(Service): ).json() if auth_response.get("errorMessage"): - raise EnvironmentError(f"Could not authorize Videoland account: {auth_response['errorMessage']!r}") + raise EnvironmentError( + f"Could not authorize Videoland account: {auth_response['errorMessage']!r}" + ) self.gigya_uid = auth_response["UID"] uid_signature = auth_response["UIDSignature"] signature_timestamp = auth_response["signatureTimestamp"] - # Step 2: Get initial JWT token + # ── Step 2: Exchange Gigya credentials for an initial JWT ───── jwt_headers = { "x-auth-device-id": self.device_id, "x-auth-device-player-size-height": "3840", @@ -161,11 +207,13 @@ class VLD(Service): ).json() if jwt_response.get("error"): - raise EnvironmentError(f"Could not get Access Token: {jwt_response['error']['message']!r}") + raise EnvironmentError( + f"Could not get Access Token: {jwt_response['error']['message']!r}" + ) initial_token = jwt_response["token"] - # Step 3: Get profiles + # ── Step 3: Fetch profiles and pick the first one ───────────── profiles_response = self.session.get( url=self.config["endpoints"]["profiles"].format( platform=self.platform, @@ -175,11 +223,13 @@ class VLD(Service): ).json() if isinstance(profiles_response, dict) and profiles_response.get("error"): - raise EnvironmentError(f"Could not get profiles: {profiles_response['error']['message']!r}") + raise EnvironmentError( + f"Could not get profiles: {profiles_response['error']['message']!r}" + ) self.profile_id = profiles_response[0]["uid"] - # Step 4: Get final JWT token with profile + # ── Step 4: Obtain a profile-scoped JWT (the final token) ───── jwt_headers["X-Auth-profile-id"] = self.profile_id final_jwt_response = self.session.get( @@ -188,15 +238,102 @@ class VLD(Service): ).json() if final_jwt_response.get("error"): - raise EnvironmentError(f"Could not get final Access Token: {final_jwt_response['error']['message']!r}") + raise EnvironmentError( + f"Could not get final Access Token: {final_jwt_response['error']['message']!r}" + ) self.access_token = final_jwt_response["token"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) + # ------------------------------------------------------------------ + # Title discovery + # ------------------------------------------------------------------ + def search(self) -> Generator[SearchResult, None, None]: - # Videoland doesn't have a documented search endpoint in the original code - # This is a placeholder - you may need to implement based on actual API - raise NotImplementedError("Search is not implemented for Videoland") + query = self.title.strip() + if not query: + return + + response = self.session.post( + url=self.config["endpoints"]["search"], + params={ + "x-algolia-agent": self.config["algolia"]["agent"], + "x-algolia-api-key": self.config["algolia"]["api_key"], + "x-algolia-application-id": self.config["algolia"]["app_id"], + }, + headers={ + "Accept": "application/json", + "Content-Type": "text/plain", + "Referer": "https://v2.videoland.com/", + "Origin": "https://v2.videoland.com", + }, + json={ + "requests": [ + { + "indexName": self.config["algolia"]["index"], + "query": query, + "clickAnalytics": True, + "hitsPerPage": 50, + "facetFilters": [ + ["metadata.item_type:program"], + [f"metadata.platforms_assets:{self.config['platform']['web']}"], + ], + } + ] + }, + ) + response.raise_for_status() + + data = response.json() + results = data.get("results", []) + if not results: + return + + seen = set() + + for hit in results[0].get("hits", []): + metadata = hit.get("metadata", {}) or {} + item = hit.get("item", {}) or {} + item_content = item.get("itemContent", {}) or {} + + target = ( + item_content.get("action", {}) + .get("target", {}) + .get("value_layout", {}) + ) + + content = hit.get("content", {}) or {} + content_id = str(target.get("id") or content.get("id") or "").strip() + seo = target.get("seo") + title = item_content.get("title") or metadata.get("title") + + if not content_id or not title: + continue + + if content_id in seen: + continue + seen.add(content_id) + + edito_tags = metadata.get("tags", {}).get("edito", []) or [] + program_nature = metadata.get("tags", {}).get("program_nature", []) or [] + + if "CONTENTTYPE:Film" in edito_tags: + label = "MOVIE" + elif "CONTENTTYPE:Series" in edito_tags: + label = "SERIES" + elif "Unitary" in program_nature: + label = "MOVIE" + else: + label = "SERIES" + + url = f"https://v2.videoland.com/{seo}-p_{content_id}" if seo else None + + yield SearchResult( + id_=content_id, + title=title, + label=label, + url=url, + ) def get_titles(self) -> Titles_T: title_match = re.match(self.TITLE_RE, self.title) @@ -205,11 +342,9 @@ class VLD(Service): title_slug = title_match.group("title_id") - # Handle folder URLs (e.g., title-f_12345) if re.match(r".+?-f_[0-9]+", title_slug): title_slug = self._get_program_title(title_slug) - # Extract title ID from slug (e.g., "show-name-p_12345" -> "12345") title_id = title_slug.split("-p_")[-1] if "-p_" in title_slug else title_slug metadata = self.session.get( @@ -221,11 +356,33 @@ class VLD(Service): params={"nbPages": "10"}, ).json() - # Check for API errors + # ── Token expiry detection and automatic recovery ───────────────── if isinstance(metadata, dict) and metadata.get("error"): - raise ValueError(f"API Error: {metadata.get('message', 'Unknown error')}") + message = metadata.get("message", "Unknown error") + # The API returns "Token expired/invalid" when the JWT has lapsed. + # Re-authenticate once and retry the same request rather than + # crashing with a ValueError. + if "token" in message.lower() and ( + "expired" in message.lower() or "invalid" in message.lower() + ): + self._reauthenticate() + # Retry the metadata request with the fresh token + metadata = self.session.get( + url=self.config["endpoints"]["layout"].format( + platform=self.platform, + token=self.platform_token, + endpoint=f"program/{title_id}", + ), + params={"nbPages": "10"}, + ).json() + # If it still fails after re-auth, raise normally + if isinstance(metadata, dict) and metadata.get("error"): + raise ValueError( + f"API Error after re-authentication: {metadata.get('message', 'Unknown error')}" + ) + else: + raise ValueError(f"API Error: {message}") - # Determine if it's a movie based on metadata is_movie = "Seizoen" not in str(metadata) if is_movie: @@ -245,81 +402,81 @@ class VLD(Service): }, ) ]) - else: - seasons = [ - block - for block in metadata["blocks"] - if block["featureId"] == "videos_by_season_by_program" - ] - # Fetch all episodes from all seasons with pagination - for season in seasons: - while len(season["content"]["items"]) != season["content"]["pagination"]["totalItems"]: - season_data = self.session.get( - url=self.config["endpoints"]["seasoning"].format( - platform=self.platform, - token=self.platform_token, - program=title_id, - season_id=season["id"], - ), - params={ - "nbPages": "10", - "page": season["content"]["pagination"]["nextPage"], + seasons = [ + block + for block in metadata["blocks"] + if block["featureId"] == "videos_by_season_by_program" + ] + + for season in seasons: + while ( + len(season["content"]["items"]) + != season["content"]["pagination"]["totalItems"] + ): + season_data = self.session.get( + url=self.config["endpoints"]["seasoning"].format( + platform=self.platform, + token=self.platform_token, + program=title_id, + season_id=season["id"], + ), + params={ + "nbPages": "10", + "page": season["content"]["pagination"]["nextPage"], + }, + ).json() + + for episode in season_data["content"]["items"]: + if episode not in season["content"]["items"]: + season["content"]["items"].append(episode) + + season["content"]["pagination"]["nextPage"] = ( + season_data["content"]["pagination"]["nextPage"] + ) + + episodes = [] + for season in seasons: + season_title = season.get("title", {}).get("long", "") + season_match = re.search(r"(\d+)", season_title) + season_number = int(season_match.group(1)) if season_match else 1 + + for idx, episode_data in enumerate(season["content"]["items"]): + extra_title = episode_data["itemContent"].get("extraTitle", "") + + episode_number = None + episode_name = extra_title + + ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title) + if ep_match: + episode_number = int(ep_match.group(1)) + episode_name = ep_match.group(2) + else: + episode_number = idx + 1 + + viewable_id = ( + episode_data["itemContent"]["action"]["target"]["value_layout"]["id"] + ) + + episodes.append( + Episode( + id_=episode_data["ucid"], + service=self.__class__, + title=metadata["entity"]["metadata"]["title"], + season=season_number, + number=episode_number, + name=episode_name, + year=None, + language=Language.get("nl"), + data={ + "viewable": viewable_id, + "episode_data": episode_data, }, - ).json() - - for episode in season_data["content"]["items"]: - if episode not in season["content"]["items"]: - season["content"]["items"].append(episode) - - season["content"]["pagination"]["nextPage"] = season_data["content"]["pagination"]["nextPage"] - - episodes = [] - for season in seasons: - # Extract season number from title like "Seizoen 1" or "Season 1" - season_title = season.get("title", {}).get("long", "") - season_match = re.search(r"(\d+)", season_title) - season_number = int(season_match.group(1)) if season_match else 1 - - for idx, episode_data in enumerate(season["content"]["items"]): - # Get the extra title which contains episode info - extra_title = episode_data["itemContent"].get("extraTitle", "") - - # Extract episode number from extraTitle like "1. Hondenadoptiedag" or "14. Een Draak Op School (Deel 1)" - episode_number = None - episode_name = extra_title - - ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title) - if ep_match: - episode_number = int(ep_match.group(1)) - episode_name = ep_match.group(2) - else: - # Fallback to index + 1 - episode_number = idx + 1 - - viewable_id = episode_data["itemContent"]["action"]["target"]["value_layout"]["id"] - - episodes.append( - Episode( - id_=episode_data["ucid"], - service=self.__class__, - title=metadata["entity"]["metadata"]["title"], - season=season_number, - number=episode_number, - name=episode_name, - year=None, - language=Language.get("nl"), - data={ - "viewable": viewable_id, - "episode_data": episode_data, - }, - ) ) + ) - # Sort episodes by season and episode number - episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number)) - - return Series(episodes) + episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number)) + return Series(episodes) def get_tracks(self, title: Title_T) -> Tracks: viewable_id = title.data["viewable"] @@ -333,8 +490,34 @@ class VLD(Service): params={"nbPages": "2"}, ).json() + # ── Token expiry detection in get_tracks ────────────────────────── + if isinstance(manifest_response, dict) and manifest_response.get("error"): + message = manifest_response.get("message", "Unknown error") + if "token" in message.lower() and ( + "expired" in message.lower() or "invalid" in message.lower() + ): + self._reauthenticate() + manifest_response = self.session.get( + url=self.config["endpoints"]["layout"].format( + platform=self.platform, + token=self.platform_token, + endpoint=f"video/{viewable_id}", + ), + params={"nbPages": "2"}, + ).json() + if isinstance(manifest_response, dict) and manifest_response.get("error"): + raise ValueError( + f"API Error after re-authentication: {manifest_response.get('message', 'Unknown error')}" + ) + else: + raise ValueError(f"API Error: {message}") + player_block = next( - (block for block in manifest_response["blocks"] if block["templateId"] == "Player"), + ( + block + for block in manifest_response["blocks"] + if block["templateId"] == "Player" + ), None, ) @@ -342,61 +525,54 @@ class VLD(Service): raise ValueError("Could not find player block in manifest") assets = player_block["content"]["items"][0]["itemContent"]["video"]["assets"] - if not assets: raise ValueError("Failed to load content manifest - no assets found") - # Prefer HD quality - mpd_asset = next((asset for asset in assets if asset["quality"] == "hd"), None) - if not mpd_asset: - mpd_asset = next((asset for asset in assets if asset["quality"] == "sd"), None) + mpd_asset = next((a for a in assets if a["quality"] == "hd"), None) or \ + next((a for a in assets if a["quality"] == "sd"), None) if not mpd_asset: raise ValueError("No suitable quality stream found") mpd_url = mpd_asset["path"] - # Extract PlayReady PSSH from manifest - manifest_content = self.session.get(mpd_url).text - pssh_matches = re.findall(r'(.+?)', manifest_content) + tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks( + language=title.language + ) - self.pssh_playready = None - for pssh in pssh_matches: - if len(pssh) > 200: - self.pssh_playready = pssh - break - - # Store viewable ID for license request - self.current_viewable = viewable_id - - tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language) - - # Fix track URLs - replace CDN hostname for track in tracks: - if hasattr(track, 'url') and track.url: - if isinstance(track.url, list): - track.url = [ - re.sub( - r"https://.+?\.videoland\.bedrock\.tech", - "https://origin.vod.videoland.bedrock.tech", - uri.split("?")[0], - ) - for uri in track.url - ] - elif isinstance(track.url, str): - track.url = re.sub( + if not hasattr(track, "url") or not track.url: + continue + if isinstance(track.url, list): + track.url = [ + re.sub( r"https://.+?\.videoland\.bedrock\.tech", "https://origin.vod.videoland.bedrock.tech", - track.url.split("?")[0], + uri.split("?")[0], ) + for uri in track.url + ] + elif isinstance(track.url, str): + track.url = re.sub( + r"https://.+?\.videoland\.bedrock\.tech", + "https://origin.vod.videoland.bedrock.tech", + track.url.split("?")[0], + ) - # Handle subtitles for subtitle in tracks.subtitles: - if isinstance(subtitle.url, list) or (isinstance(subtitle.url, str) and "dash" in subtitle.url): - subtitle.codec = Subtitle.Codec.SubRip - else: - self.log.warning("Unknown subtitle codec detected") + url_str = str(subtitle.url) if subtitle.url else "" + if "sdh" in url_str.lower(): + subtitle.sdh = True + if "forced" in url_str.lower() or "opencaption" in url_str.lower(): + subtitle.forced = True + self.log.info( + f"Tracks: {len(tracks.videos)} video, " + f"{len(tracks.audio)} audio, " + f"{len(tracks.subtitles)} subtitle" + ) + + self.current_viewable = viewable_id return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: @@ -405,7 +581,9 @@ class VLD(Service): def get_widevine_service_certificate(self, **_) -> Optional[str]: return self.config.get("certificate") - def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: + def get_widevine_license( + self, *, challenge: bytes, title: Title_T, track: AnyTrack + ) -> Optional[Union[bytes, str]]: license_token = self._get_license_token(title) response = self.session.post( @@ -419,7 +597,9 @@ class VLD(Service): return response.json().get("license") - def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]: + def get_playready_license( + self, *, challenge: bytes, title: Title_T, track: AnyTrack + ) -> Optional[bytes]: license_token = self._get_license_token(title) response = self.session.post( @@ -433,7 +613,12 @@ class VLD(Service): return response.content + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + def _get_license_token(self, title: Title_T) -> str: + """Fetch a per-clip DRM upfront token from the Videoland token endpoint.""" viewable_id = title.data["viewable"] response = self.session.get( @@ -447,6 +632,7 @@ class VLD(Service): return response["token"] def _get_program_title(self, folder_title: str) -> str: + """Resolve a folder slug (title-f_12345) to its programme slug (title-p_12345).""" folder_id = folder_title.split("-f_")[1] response = self.session.get( @@ -458,7 +644,9 @@ class VLD(Service): params={"nbPages": "2"}, ).json() - target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"]["target"]["value_layout"] + target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"][ + "target" + ]["value_layout"] parent_seo = target["parent"]["seo"] parent_id = target["parent"]["id"] diff --git a/VLD/config.yaml b/VLD/config.yaml index 4970602..8e15119 100644 --- a/VLD/config.yaml +++ b/VLD/config.yaml @@ -17,12 +17,19 @@ endpoints: authorization: https://accounts.eu1.gigya.com/accounts.login jwt_tokens: https://front-auth.videoland.bedrock.tech/v2/platforms/{platform}/getJwt profiles: https://users.videoland.bedrock.tech/v2/platforms/{platform}/users/{gigya}/profiles + search: https://nhacvivxxk-dsn.algolia.net/1/indexes/*/queries platform: web: m6group_web android_mob: m6group_android_mob android_tv: m6group_android_tv +algolia: + app_id: NHACVIVXXK + api_key: 6ef59fc6d78ac129339ab9c35edd41fa + agent: Algolia for JavaScript (5.49.1); Search (5.49.1); Browser + index: videoland_prod_bedrock_layout_items_v2_rtlnl_main + sdk: apikey: 3_W6BPwMz2FGQEfH4_nVRaj4Ak1F1XDp33an_8y8nXULn8nk43FHvPIpb0TLOYIaUI build: "13414"