import re import uuid from collections.abc import Generator from http.cookiejar import CookieJar from typing import Optional, Union import click from langcodes import Language from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.search_result import SearchResult from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Subtitle, Tracks class VLD(Service): """ Service code for RTL's Dutch streaming service Videoland (https://v2.videoland.com) Version: 1.0.0 Authorization: Credentials Security: - L1: >= 720p - L3: <= 576p They are using the license server of DRMToday with encoded streams from CastLabs. It accepts Non-Whitelisted CDMs so every unrevoked L1 CDM should work. Use full URL (for example - https://v2.videoland.com/title-p_12345) or title slug. """ ALIASES = ("VLD", "videoland") TITLE_RE = r"^(?:https?://(?:www\.)?v2\.videoland\.com/)?(?P[a-zA-Z0-9_-]+)" GEOFENCE = ("NL",) @staticmethod @click.command(name="Videoland", short_help="https://v2.videoland.com") @click.argument("title", type=str) @click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie") @click.pass_context def cli(ctx, **kwargs): return VLD(ctx, **kwargs) def __init__(self, ctx, title, movie): super().__init__(ctx) self.title = title self.movie = movie self.cdm = ctx.obj.cdm self.device_id = str(uuid.uuid1().int) if self.config is None: raise Exception("Config is missing!") profile_name = ctx.parent.params.get("profile") self.profile = profile_name if profile_name else "default" self.platform = self.config["platform"]["android_tv"] self.platform_token = "token-androidtv-3" def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not credential or not credential.username or not credential.password: raise EnvironmentError("Service requires Credentials for Authentication.") self.credential = credential # Store for potential re-auth self.session.headers.update({ "origin": "https://v2.videoland.com", "x-client-release": self.config["sdk"]["version"], "x-customer-name": "rtlnl", }) # Build cache key cache_key = f"tokens_{self.profile}" # Check cache first cache = self.cache.get(cache_key) if cache and not cache.expired: cached_data = cache.data if isinstance(cached_data, dict) and cached_data.get("username") == credential.username: self.log.info("Using cached tokens") self._restore_from_cache(cached_data) return # Perform fresh login self.log.info("Retrieving new tokens") self._do_login(credential) # Cache the tokens self._cache_tokens(credential.username, cache_key) def _restore_from_cache(self, cached_data: dict) -> None: """Restore authentication state from cached data.""" self.access_token = cached_data["access_token"] self.gigya_uid = cached_data["gigya_uid"] self.profile_id = cached_data["profile_id"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) def _cache_tokens(self, username: str, cache_key: str) -> None: """Cache the current authentication tokens.""" cache = self.cache.get(cache_key) cache.set( data={ "username": username, "access_token": self.access_token, "gigya_uid": self.gigya_uid, "profile_id": self.profile_id, }, expiration=3600 # 1 hour expiration, adjust as needed ) def _do_login(self, credential: Credential) -> None: """Perform full login flow.""" # Step 1: Authorize with Gigya auth_response = self.session.post( url=self.config["endpoints"]["authorization"], data={ "loginID": credential.username, "password": credential.password, "sessionExpiration": "0", "targetEnv": "jssdk", "include": "profile,data", "includeUserInfo": "true", "lang": "nl", "ApiKey": self.config["sdk"]["apikey"], "authMode": "cookie", "pageURL": "https://v2.videoland.com/", "sdkBuild": self.config["sdk"]["build"], "format": "json", }, ).json() if auth_response.get("errorMessage"): raise EnvironmentError(f"Could not authorize Videoland account: {auth_response['errorMessage']!r}") self.gigya_uid = auth_response["UID"] uid_signature = auth_response["UIDSignature"] signature_timestamp = auth_response["signatureTimestamp"] # Step 2: Get initial JWT token jwt_headers = { "x-auth-device-id": self.device_id, "x-auth-device-player-size-height": "3840", "x-auth-device-player-size-width": "2160", "X-Auth-gigya-signature": uid_signature, "X-Auth-gigya-signature-timestamp": signature_timestamp, "X-Auth-gigya-uid": self.gigya_uid, "X-Client-Release": self.config["sdk"]["version"], "X-Customer-Name": "rtlnl", } jwt_response = self.session.get( url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform), headers=jwt_headers, ).json() if jwt_response.get("error"): raise EnvironmentError(f"Could not get Access Token: {jwt_response['error']['message']!r}") initial_token = jwt_response["token"] # Step 3: Get profiles profiles_response = self.session.get( url=self.config["endpoints"]["profiles"].format( platform=self.platform, gigya=self.gigya_uid, ), headers={"Authorization": f"Bearer {initial_token}"}, ).json() if isinstance(profiles_response, dict) and profiles_response.get("error"): raise EnvironmentError(f"Could not get profiles: {profiles_response['error']['message']!r}") self.profile_id = profiles_response[0]["uid"] # Step 4: Get final JWT token with profile jwt_headers["X-Auth-profile-id"] = self.profile_id final_jwt_response = self.session.get( url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform), headers=jwt_headers, ).json() if final_jwt_response.get("error"): raise EnvironmentError(f"Could not get final Access Token: {final_jwt_response['error']['message']!r}") self.access_token = final_jwt_response["token"] self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) def search(self) -> Generator[SearchResult, None, None]: # Videoland doesn't have a documented search endpoint in the original code # This is a placeholder - you may need to implement based on actual API raise NotImplementedError("Search is not implemented for Videoland") def get_titles(self) -> Titles_T: title_match = re.match(self.TITLE_RE, self.title) if not title_match: raise ValueError(f"Invalid title format: {self.title}") title_slug = title_match.group("title_id") # Handle folder URLs (e.g., title-f_12345) if re.match(r".+?-f_[0-9]+", title_slug): title_slug = self._get_program_title(title_slug) # Extract title ID from slug (e.g., "show-name-p_12345" -> "12345") title_id = title_slug.split("-p_")[-1] if "-p_" in title_slug else title_slug metadata = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"program/{title_id}", ), params={"nbPages": "10"}, ).json() # Check for API errors if isinstance(metadata, dict) and metadata.get("error"): raise ValueError(f"API Error: {metadata.get('message', 'Unknown error')}") # Determine if it's a movie based on metadata is_movie = "Seizoen" not in str(metadata) if is_movie: movie_info = metadata["blocks"][0]["content"]["items"][0] viewable_id = movie_info["itemContent"]["action"]["target"]["value_layout"]["id"] return Movies([ Movie( id_=movie_info["ucid"], service=self.__class__, name=metadata["entity"]["metadata"]["title"], year=None, language=Language.get("nl"), data={ "viewable": viewable_id, "metadata": metadata, }, ) ]) else: seasons = [ block for block in metadata["blocks"] if block["featureId"] == "videos_by_season_by_program" ] # Fetch all episodes from all seasons with pagination for season in seasons: while len(season["content"]["items"]) != season["content"]["pagination"]["totalItems"]: season_data = self.session.get( url=self.config["endpoints"]["seasoning"].format( platform=self.platform, token=self.platform_token, program=title_id, season_id=season["id"], ), params={ "nbPages": "10", "page": season["content"]["pagination"]["nextPage"], }, ).json() for episode in season_data["content"]["items"]: if episode not in season["content"]["items"]: season["content"]["items"].append(episode) season["content"]["pagination"]["nextPage"] = season_data["content"]["pagination"]["nextPage"] episodes = [] for season in seasons: # Extract season number from title like "Seizoen 1" or "Season 1" season_title = season.get("title", {}).get("long", "") season_match = re.search(r"(\d+)", season_title) season_number = int(season_match.group(1)) if season_match else 1 for idx, episode_data in enumerate(season["content"]["items"]): # Get the extra title which contains episode info extra_title = episode_data["itemContent"].get("extraTitle", "") # Extract episode number from extraTitle like "1. Hondenadoptiedag" or "14. Een Draak Op School (Deel 1)" episode_number = None episode_name = extra_title ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title) if ep_match: episode_number = int(ep_match.group(1)) episode_name = ep_match.group(2) else: # Fallback to index + 1 episode_number = idx + 1 viewable_id = episode_data["itemContent"]["action"]["target"]["value_layout"]["id"] episodes.append( Episode( id_=episode_data["ucid"], service=self.__class__, title=metadata["entity"]["metadata"]["title"], season=season_number, number=episode_number, name=episode_name, year=None, language=Language.get("nl"), data={ "viewable": viewable_id, "episode_data": episode_data, }, ) ) # Sort episodes by season and episode number episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number)) return Series(episodes) def get_tracks(self, title: Title_T) -> Tracks: viewable_id = title.data["viewable"] manifest_response = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"video/{viewable_id}", ), params={"nbPages": "2"}, ).json() player_block = next( (block for block in manifest_response["blocks"] if block["templateId"] == "Player"), None, ) if not player_block: raise ValueError("Could not find player block in manifest") assets = player_block["content"]["items"][0]["itemContent"]["video"]["assets"] if not assets: raise ValueError("Failed to load content manifest - no assets found") # Prefer HD quality mpd_asset = next((asset for asset in assets if asset["quality"] == "hd"), None) if not mpd_asset: mpd_asset = next((asset for asset in assets if asset["quality"] == "sd"), None) if not mpd_asset: raise ValueError("No suitable quality stream found") mpd_url = mpd_asset["path"] # Extract PlayReady PSSH from manifest manifest_content = self.session.get(mpd_url).text pssh_matches = re.findall(r'(.+?)', manifest_content) self.pssh_playready = None for pssh in pssh_matches: if len(pssh) > 200: self.pssh_playready = pssh break # Store viewable ID for license request self.current_viewable = viewable_id tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language) # Fix track URLs - replace CDN hostname for track in tracks: if hasattr(track, 'url') and track.url: if isinstance(track.url, list): track.url = [ re.sub( r"https://.+?\.videoland\.bedrock\.tech", "https://origin.vod.videoland.bedrock.tech", uri.split("?")[0], ) for uri in track.url ] elif isinstance(track.url, str): track.url = re.sub( r"https://.+?\.videoland\.bedrock\.tech", "https://origin.vod.videoland.bedrock.tech", track.url.split("?")[0], ) # Handle subtitles for subtitle in tracks.subtitles: if isinstance(subtitle.url, list) or (isinstance(subtitle.url, str) and "dash" in subtitle.url): subtitle.codec = Subtitle.Codec.SubRip else: self.log.warning("Unknown subtitle codec detected") return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: return [] def get_widevine_service_certificate(self, **_) -> Optional[str]: return self.config.get("certificate") def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: license_token = self._get_license_token(title) response = self.session.post( url=self.config["endpoints"]["license_wv"], data=challenge, headers={"x-dt-auth-token": license_token}, ) if response.status_code != 200: raise ValueError(f"Failed to get Widevine license: {response.status_code}") return response.json().get("license") def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]: license_token = self._get_license_token(title) response = self.session.post( url=self.config["endpoints"]["license_pr"], data=challenge, headers={"x-dt-auth-token": license_token}, ) if response.status_code != 200: raise ValueError(f"Failed to get PlayReady license: {response.status_code}") return response.content def _get_license_token(self, title: Title_T) -> str: viewable_id = title.data["viewable"] response = self.session.get( url=self.config["endpoints"]["license_token"].format( platform=self.platform, gigya=self.gigya_uid, clip=viewable_id, ), ).json() return response["token"] def _get_program_title(self, folder_title: str) -> str: folder_id = folder_title.split("-f_")[1] response = self.session.get( url=self.config["endpoints"]["layout"].format( platform=self.platform, token=self.platform_token, endpoint=f"folder/{folder_id}", ), params={"nbPages": "2"}, ).json() target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"]["target"]["value_layout"] parent_seo = target["parent"]["seo"] parent_id = target["parent"]["id"] return f"{parent_seo}-p_{parent_id}"