import re import uuid from collections.abc import Generator from http.cookiejar import CookieJar from typing import Optional, Union import click from langcodes import Language from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.search_result import SearchResult from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Subtitle, Tracks class VLD(Service): """ Service code for RTL's Dutch streaming service Videoland (https://v2.videoland.com) Version: 1.1.0 Authorization: Credentials Security: - L1: >= 720p - L3: <= 576p They are using the license server of DRMToday with encoded streams from CastLabs. It accepts Non-Whitelisted CDMs so every unrevoked L1 CDM should work. Use full URL (for example - https://v2.videoland.com/title-p_12345) or title slug. """ ALIASES = ("VLD", "videoland") TITLE_RE = r"^(?:https?://(?:www\.)?v2\.videoland\.com/)?(?P[a-zA-Z0-9_-]+)" GEOFENCE = ("NL",) @staticmethod @click.command(name="Videoland", short_help="https://v2.videoland.com") @click.argument("title", type=str) @click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie") @click.pass_context def cli(ctx, **kwargs): return VLD(ctx, **kwargs) def __init__(self, ctx, title, movie): super().__init__(ctx) self.title = title self.movie = movie self.cdm = ctx.obj.cdm self.device_id = str(uuid.uuid1().int) if self.config is None: raise Exception("Config is missing!") profile_name = ctx.parent.params.get("profile") self.profile = profile_name if profile_name else "default" self.platform = self.config["platform"]["android_tv"] self.platform_token = "token-androidtv-3" # Auth state - initialized to None, populated by authenticate() self.access_token = None self.gigya_uid = None self.profile_id = None def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not credential or not credential.username or not credential.password: raise EnvironmentError("Service requires Credentials for Authentication.") self.credential = credential self.session.headers.update({ "origin": "https://v2.videoland.com", "x-client-release": self.config["sdk"]["version"], "x-customer-name": "rtlnl", }) cache_key = f"tokens_{credential.username}" cache = self.cache.get(cache_key) if cache and not cache.expired: cached_data = cache.data if ( isinstance(cached_data, dict) and cached_data.get("username") == credential.username and cached_data.get("access_token") and cached_data.get("gigya_uid") and cached_data.get("profile_id") ): self.log.info("Using cached Videoland tokens") self._restore_from_cache(cached_data) return else: self.log.warning("Cached token data is incomplete or mismatched, re-authenticating") self.log.info("Retrieving new Videoland tokens") self._do_login(credential) self._cache_tokens(credential.username, cache) def _invalidate_cache(self) -> None: """Wipe the cached tokens for the current credential so the next call to authenticate() is forced to perform a fresh login.""" if not self.credential: return cache_key = f"tokens_{self.credential.username}" cache = self.cache.get(cache_key) # Writing an empty dict with a TTL of 0 effectively expires it # immediately so the next cache.expired check returns True. try: cache.set(data={}, expiration=0) self.log.debug("Token cache invalidated") except Exception: pass # If the cache backend refuses, just continue def _reauthenticate(self) -> None: """Invalidate the cache and perform a completely fresh login. Call this whenever the API returns a token-expired error so that the rest of the current run continues with valid credentials. """ self.log.warning("Access token has expired — invalidating cache and re-authenticating") self._invalidate_cache() self._do_login(self.credential) # Re-persist the brand-new tokens cache_key = f"tokens_{self.credential.username}" cache = self.cache.get(cache_key) self._cache_tokens(self.credential.username, cache) def _restore_from_cache(self, cached_data: dict) -> None: """Restore authentication state from cached data.""" self.access_token = cached_data["access_token"] self.gigya_uid = cached_data["gigya_uid"] self.profile_id = cached_data["profile_id"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) def _cache_tokens(self, username: str, cache: object) -> None: """Persist the current tokens into the cache object. Accepts the cache object directly instead of re-fetching it by key, so we always write to the exact same object we checked during the cache-hit test in authenticate(). """ cache.set( data={ "username": username, "access_token": self.access_token, "gigya_uid": self.gigya_uid, "profile_id": self.profile_id, }, # 3500 seconds gives a 100-second safety margin below the # typical 1-hour JWT lifetime so we never use a nearly-expired token. expiration=3500, ) self.log.info("Videoland tokens cached successfully") def _do_login(self, credential: Credential) -> None: """Perform the full four-step Videoland / Gigya login flow.""" # ── Step 1: Gigya account login ────────────────────────────── auth_response = self.session.post( url=self.config["endpoints"]["authorization"], data={ "loginID": credential.username, "password": credential.password, "sessionExpiration": "0", "targetEnv": "jssdk", "include": "profile,data", "includeUserInfo": "true", "lang": "nl", "ApiKey": self.config["sdk"]["apikey"], "authMode": "cookie", "pageURL": "https://v2.videoland.com/", "sdkBuild": self.config["sdk"]["build"], "format": "json", }, ).json() if auth_response.get("errorMessage"): raise EnvironmentError( f"Could not authorize Videoland account: {auth_response['errorMessage']!r}" ) self.gigya_uid = auth_response["UID"] uid_signature = auth_response["UIDSignature"] signature_timestamp = auth_response["signatureTimestamp"] # ── Step 2: Exchange Gigya credentials for an initial JWT ───── jwt_headers = { "x-auth-device-id": self.device_id, "x-auth-device-player-size-height": "3840", "x-auth-device-player-size-width": "2160", "X-Auth-gigya-signature": uid_signature, "X-Auth-gigya-signature-timestamp": signature_timestamp, "X-Auth-gigya-uid": self.gigya_uid, "X-Client-Release": self.config["sdk"]["version"], "X-Customer-Name": "rtlnl", } jwt_response = self.session.get( url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform), headers=jwt_headers, ).json() if jwt_response.get("error"): raise EnvironmentError( f"Could not get Access Token: {jwt_response['error']['message']!r}" ) initial_token = jwt_response["token"] # ── Step 3: Fetch profiles and pick the first one ───────────── profiles_response = self.session.get( url=self.config["endpoints"]["profiles"].format( platform=self.platform, gigya=self.gigya_uid, ), headers={"Authorization": f"Bearer {initial_token}"}, ).json() if isinstance(profiles_response, dict) and profiles_response.get("error"): raise EnvironmentError( f"Could not get profiles: {profiles_response['error']['message']!r}" ) self.profile_id = profiles_response[0]["uid"] # ── Step 4: Obtain a profile-scoped JWT (the final token) ───── jwt_headers["X-Auth-profile-id"] = self.profile_id final_jwt_response = self.session.get( url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform), headers=jwt_headers, ).json() if final_jwt_response.get("error"): raise EnvironmentError( f"Could not get final Access Token: {final_jwt_response['error']['message']!r}" ) self.access_token = final_jwt_response["token"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) # ------------------------------------------------------------------ # Title discovery # ------------------------------------------------------------------ def search(self) -> Generator[SearchResult, None, None]: query = self.title.strip() if not query: return response = self.session.post( url=self.config["endpoints"]["search"], params={ "x-algolia-agent": self.config["algolia"]["agent"], "x-algolia-api-key": self.config["algolia"]["api_key"], "x-algolia-application-id": self.config["algolia"]["app_id"], }, headers={ "Accept": "application/json", "Content-Type": "text/plain", "Referer": "https://v2.videoland.com/", "Origin": "https://v2.videoland.com", }, json={ "requests": [ { "indexName": self.config["algolia"]["index"], "query": query, "clickAnalytics": True, "hitsPerPage": 50, "facetFilters": [ ["metadata.item_type:program"], [f"metadata.platforms_assets:{self.config['platform']['web']}"], ], } ] }, ) response.raise_for_status() data = response.json() results = data.get("results", []) if not results: return seen = set() for hit in results[0].get("hits", []): metadata = hit.get("metadata", {}) or {} item = hit.get("item", {}) or {} item_content = item.get("itemContent", {}) or {} target = ( item_content.get("action", {}) .get("target", {}) .get("value_layout", {}) ) content = hit.get("content", {}) or {} content_id = str(target.get("id") or content.get("id") or "").strip() seo = target.get("seo") title = item_content.get("title") or metadata.get("title") if not content_id or not title: continue if content_id in seen: continue seen.add(content_id) edito_tags = metadata.get("tags", {}).get("edito", []) or [] program_nature = metadata.get("tags", {}).get("program_nature", []) or [] if "CONTENTTYPE:Film" in edito_tags: label = "MOVIE" elif "CONTENTTYPE:Series" in edito_tags: label = "SERIES" elif "Unitary" in program_nature: label = "MOVIE" else: label = "SERIES" url = f"https://v2.videoland.com/{seo}-p_{content_id}" if seo else None yield SearchResult( id_=content_id, title=title, label=label, url=url, ) def get_titles(self) -> Titles_T: title_match = re.match(self.TITLE_RE, self.title) if not title_match: raise ValueError(f"Invalid title format: {self.title}") title_slug = title_match.group("title_id") if re.match(r".+?-f_[0-9]+", title_slug): title_slug = self._get_program_title(title_slug) title_id = title_slug.split("-p_")[-1] if "-p_" in title_slug else title_slug metadata = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"program/{title_id}", ), params={"nbPages": "10"}, ).json() # ── Token expiry detection and automatic recovery ───────────────── if isinstance(metadata, dict) and metadata.get("error"): message = metadata.get("message", "Unknown error") # The API returns "Token expired/invalid" when the JWT has lapsed. # Re-authenticate once and retry the same request rather than # crashing with a ValueError. if "token" in message.lower() and ( "expired" in message.lower() or "invalid" in message.lower() ): self._reauthenticate() # Retry the metadata request with the fresh token metadata = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"program/{title_id}", ), params={"nbPages": "10"}, ).json() # If it still fails after re-auth, raise normally if isinstance(metadata, dict) and metadata.get("error"): raise ValueError( f"API Error after re-authentication: {metadata.get('message', 'Unknown error')}" ) else: raise ValueError(f"API Error: {message}") is_movie = "Seizoen" not in str(metadata) if is_movie: movie_info = metadata["blocks"][0]["content"]["items"][0] viewable_id = movie_info["itemContent"]["action"]["target"]["value_layout"]["id"] return Movies([ Movie( id_=movie_info["ucid"], service=self.__class__, name=metadata["entity"]["metadata"]["title"], year=None, language=Language.get("nl"), data={ "viewable": viewable_id, "metadata": metadata, }, ) ]) seasons = [ block for block in metadata["blocks"] if block["featureId"] == "videos_by_season_by_program" ] for season in seasons: while ( len(season["content"]["items"]) != season["content"]["pagination"]["totalItems"] ): season_data = self.session.get( url=self.config["endpoints"]["seasoning"].format( platform=self.platform, token=self.platform_token, program=title_id, season_id=season["id"], ), params={ "nbPages": "10", "page": season["content"]["pagination"]["nextPage"], }, ).json() for episode in season_data["content"]["items"]: if episode not in season["content"]["items"]: season["content"]["items"].append(episode) season["content"]["pagination"]["nextPage"] = ( season_data["content"]["pagination"]["nextPage"] ) episodes = [] for season in seasons: season_title = season.get("title", {}).get("long", "") season_match = re.search(r"(\d+)", season_title) season_number = int(season_match.group(1)) if season_match else 1 for idx, episode_data in enumerate(season["content"]["items"]): extra_title = episode_data["itemContent"].get("extraTitle", "") episode_number = None episode_name = extra_title ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title) if ep_match: episode_number = int(ep_match.group(1)) episode_name = ep_match.group(2) else: episode_number = idx + 1 viewable_id = ( episode_data["itemContent"]["action"]["target"]["value_layout"]["id"] ) episodes.append( Episode( id_=episode_data["ucid"], service=self.__class__, title=metadata["entity"]["metadata"]["title"], season=season_number, number=episode_number, name=episode_name, year=None, language=Language.get("nl"), data={ "viewable": viewable_id, "episode_data": episode_data, }, ) ) episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number)) return Series(episodes) def get_tracks(self, title: Title_T) -> Tracks: viewable_id = title.data["viewable"] manifest_response = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"video/{viewable_id}", ), params={"nbPages": "2"}, ).json() # ── Token expiry detection in get_tracks ────────────────────────── if isinstance(manifest_response, dict) and manifest_response.get("error"): message = manifest_response.get("message", "Unknown error") if "token" in message.lower() and ( "expired" in message.lower() or "invalid" in message.lower() ): self._reauthenticate() manifest_response = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"video/{viewable_id}", ), params={"nbPages": "2"}, ).json() if isinstance(manifest_response, dict) and manifest_response.get("error"): raise ValueError( f"API Error after re-authentication: {manifest_response.get('message', 'Unknown error')}" ) else: raise ValueError(f"API Error: {message}") player_block = next( ( block for block in manifest_response["blocks"] if block["templateId"] == "Player" ), None, ) if not player_block: raise ValueError("Could not find player block in manifest") assets = player_block["content"]["items"][0]["itemContent"]["video"]["assets"] if not assets: raise ValueError("Failed to load content manifest - no assets found") mpd_asset = next((a for a in assets if a["quality"] == "hd"), None) or \ next((a for a in assets if a["quality"] == "sd"), None) if not mpd_asset: raise ValueError("No suitable quality stream found") mpd_url = mpd_asset["path"] tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks( language=title.language ) for track in tracks: if not hasattr(track, "url") or not track.url: continue if isinstance(track.url, list): track.url = [ re.sub( r"https://.+?\.videoland\.bedrock\.tech", "https://origin.vod.videoland.bedrock.tech", uri.split("?")[0], ) for uri in track.url ] elif isinstance(track.url, str): track.url = re.sub( r"https://.+?\.videoland\.bedrock\.tech", "https://origin.vod.videoland.bedrock.tech", track.url.split("?")[0], ) for subtitle in tracks.subtitles: url_str = str(subtitle.url) if subtitle.url else "" if "sdh" in url_str.lower(): subtitle.sdh = True if "forced" in url_str.lower() or "opencaption" in url_str.lower(): subtitle.forced = True self.log.info( f"Tracks: {len(tracks.videos)} video, " f"{len(tracks.audio)} audio, " f"{len(tracks.subtitles)} subtitle" ) self.current_viewable = viewable_id return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: return [] def get_widevine_service_certificate(self, **_) -> Optional[str]: return self.config.get("certificate") def get_widevine_license( self, *, challenge: bytes, title: Title_T, track: AnyTrack ) -> Optional[Union[bytes, str]]: license_token = self._get_license_token(title) response = self.session.post( url=self.config["endpoints"]["license_wv"], data=challenge, headers={"x-dt-auth-token": license_token}, ) if response.status_code != 200: raise ValueError(f"Failed to get Widevine license: {response.status_code}") return response.json().get("license") def get_playready_license( self, *, challenge: bytes, title: Title_T, track: AnyTrack ) -> Optional[bytes]: license_token = self._get_license_token(title) response = self.session.post( url=self.config["endpoints"]["license_pr"], data=challenge, headers={"x-dt-auth-token": license_token}, ) if response.status_code != 200: raise ValueError(f"Failed to get PlayReady license: {response.status_code}") return response.content # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _get_license_token(self, title: Title_T) -> str: """Fetch a per-clip DRM upfront token from the Videoland token endpoint.""" viewable_id = title.data["viewable"] response = self.session.get( url=self.config["endpoints"]["license_token"].format( platform=self.platform, gigya=self.gigya_uid, clip=viewable_id, ), ).json() return response["token"] def _get_program_title(self, folder_title: str) -> str: """Resolve a folder slug (title-f_12345) to its programme slug (title-p_12345).""" folder_id = folder_title.split("-f_")[1] response = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"folder/{folder_id}", ), params={"nbPages": "2"}, ).json() target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"][ "target" ]["value_layout"] parent_seo = target["parent"]["seo"] parent_id = target["parent"]["id"] return f"{parent_seo}-p_{parent_id}"