import base64 import json import re import time from collections.abc import Generator from typing import Optional, Union import click import requests as req_lib from langcodes import Language from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.search_result import SearchResult from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Tracks, Video class CPY(Service): """ Service code for catchplay.com Version: 1.0.0 Authorization: Credentials Security: HD@L3 Use full URL (for example - https://www.catchplay.com/id/video/1b8c1ba3-9015-4f99-8131-25dd45a4b033) or title ID (for example - 1b8c1ba3-9015-4f99-8131-25dd45a4b033). IMPORTANT: CHANGE YOUR PARENTAL PIN IN THE CONFIG.YAML THAT ACCORDING TO YOURS """ TITLE_RE = r"^(?:https?://(?:www\.)?catchplay\.com/\w+/(?:movie|series|video)/)?(?P[a-f0-9-]{36})" GEOFENCE = ("ID", "TW", "SG", "HK", "TH") @staticmethod @click.command(name="CPY", short_help="https://catchplay.com") @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return CPY(ctx, **kwargs) def __init__(self, ctx, title): super().__init__(ctx) self.title = title self.cdm = ctx.obj.cdm self.access_token: Optional[str] = None self.refresh_token: Optional[str] = None self.token_expiry: float = 0 self.account_info: dict = {} self.play_token: Optional[str] = None self.license_url: Optional[str] = None self.license_headers: Optional[dict] = None profile_name = ctx.parent.params.get("profile") self.profile = profile_name or "default" def authenticate(self, cookies=None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not credential or not credential.username or not credential.password: raise EnvironmentError("Service requires Credentials for Authentication.") self.credential = credential cache_key = f"tokens_{self.profile}" cache = self.cache.get(cache_key) if cache and not cache.expired: cached = cache.data if isinstance(cached, dict) and cached.get("username") == credential.username: # Check if access token is still valid if cached.get("token_expiry", 0) > time.time(): self.log.info("Using cached tokens") self._restore_from_cache(cached) return # Access token expired but we have a refresh token elif cached.get("refresh_token"): self.log.info("Access token expired, refreshing...") try: self._refresh_auth(cached["refresh_token"]) self._cache_tokens(credential.username, cache_key) return except Exception as e: self.log.warning(f"Refresh failed ({e}), doing fresh login...") # Fresh login self.log.info("Logging in...") self._do_login(credential) self._cache_tokens(credential.username, cache_key) def _do_login(self, credential: Credential) -> None: """Perform full guest token + credential login flow.""" territory = self.config.get("territory", "ID") device = self.config["device"] # Step 1: Guest token self.log.info("Fetching guest token...") guest_resp = self.session.get( url=self.config["endpoints"]["guest_token"], headers={"Referer": f"https://www.catchplay.com/{territory.lower()}/home"}, ).json() if not guest_resp.get("access_token"): raise Exception("Failed to get guest token") # Step 2: Login login_resp = self.session.post( url=self.config["endpoints"]["login"], headers={ "Content-Type": "application/json", "asiaplay-territory": territory, "asiaplay-device-type": device["type"], "asiaplay-device-version": device["version"], "Referer": f"https://www.catchplay.com/{territory.lower()}/login", }, json={ "username": credential.username, "password": credential.password, "rememberMe": False, }, ).json() if not login_resp.get("access_token"): raise Exception(f"Login failed: {login_resp}") self._apply_tokens(login_resp) user = login_resp.get("user", {}) self.account_info = user self.log.info( f" + Logged in as: {credential.username} " f"[{user.get('accountType', 'unknown')} / {user.get('accountStatus', 'unknown')}]" ) def _refresh_auth(self, refresh_token: str) -> None: """Refresh the access token using the refresh token.""" refresh_resp = self.session.get( url=self.config["endpoints"]["refresh"], headers={ "Referer": "https://www.catchplay.com/", }, cookies={"connect.sid": self._connect_sid} if hasattr(self, "_connect_sid") else {}, ).json() if not refresh_resp.get("access_token"): raise Exception(f"Refresh failed: {refresh_resp}") self._apply_tokens(refresh_resp) user = refresh_resp.get("user", {}) self.account_info = user self.log.info( f" + Token refreshed " f"[{user.get('accountType', 'unknown')} / {user.get('accountStatus', 'unknown')}]" ) def _apply_tokens(self, token_data: dict) -> None: """Apply tokens from login or refresh response to session.""" self.access_token = token_data["access_token"] self.refresh_token = token_data.get("refresh_token") # Calculate expiry from JWT or expires_in expires_in = token_data.get("expires_in", 86400) self.token_expiry = time.time() + expires_in - 300 # 5 min buffer territory = self.config.get("territory", "ID") device = self.config["device"] self.session.headers.update({ "authorization": f"Bearer {self.access_token}", "asiaplay-territory": territory, "asiaplay-device-type": device["type"], "asiaplay-device-version": device["version"], "asiaplay-os-type": device["os_type"], "asiaplay-os-version": device["os_version"], "origin": "https://www.catchplay.com", "referer": "https://www.catchplay.com/", }) def _cache_tokens(self, username: str, cache_key: str) -> None: """Cache current tokens for reuse.""" cache = self.cache.get(cache_key) cache.set( data={ "username": username, "access_token": self.access_token, "refresh_token": self.refresh_token, "token_expiry": self.token_expiry, "account_info": self.account_info, } ) def _restore_from_cache(self, cached: dict) -> None: """Restore session state from cached token data.""" self.access_token = cached["access_token"] self.refresh_token = cached.get("refresh_token") self.token_expiry = cached.get("token_expiry", 0) self.account_info = cached.get("account_info", {}) territory = self.config.get("territory", "ID") device = self.config["device"] self.session.headers.update({ "authorization": f"Bearer {self.access_token}", "asiaplay-territory": territory, "asiaplay-device-type": device["type"], "asiaplay-device-version": device["version"], "asiaplay-os-type": device["os_type"], "asiaplay-os-version": device["os_version"], "origin": "https://www.catchplay.com", "referer": "https://www.catchplay.com/", }) self.log.info( f" + Restored session " f"[{self.account_info.get('accountType', 'unknown')} / " f"{self.account_info.get('accountStatus', 'unknown')}]" ) def _graphql(self, key: str, variables: dict) -> dict: """Execute a GraphQL query defined in config.""" cfg = self.config["graphql"][key] endpoint_key = cfg["endpoint"] url = self.config["endpoints"][endpoint_key] resp = self.session.post( url=url, headers={ "asiaplay-api-name": cfg["api_name"], "content-type": "application/json", }, json={ "operationName": cfg["operation"], "variables": variables, "query": cfg["query"], }, ).json() if resp.get("errors"): raise Exception(f"GraphQL error ({key}): {resp['errors']}") return resp["data"] def search(self) -> Generator[SearchResult, None, None]: self.log.info(f"Searching for: {self.title}") data = self._graphql("search", {"keyword": self.title}) programs = data.get("searchKeywordSuggestions", {}).get("programs", []) for program in programs: yield SearchResult( id_=program["id"], title=program["name"], label="TITLE", url=f"https://www.catchplay.com/id/video/{program['id']}", ) @staticmethod def _title_from(obj: dict) -> str: return obj.get("title", {}).get("eng") or obj.get("title", {}).get("local") or "Unknown" @staticmethod def _extract_season_number(title: str) -> int: match = re.search(r"S(\d+)", title) return int(match.group(1)) if match else 1 @staticmethod def _extract_episode_number(title: str) -> int: match = re.search(r"Episode\s+(\d+)", title, re.IGNORECASE) return int(match.group(1)) if match else 0 def get_titles(self) -> Titles_T: title_id = re.match(self.TITLE_RE, self.title) if not title_id: raise ValueError(f"Could not parse title ID from: {self.title}") self.title = title_id.group("title_id") self.log.info(f"Fetching metadata for: {self.title}") main = self._graphql("get_main_program", {"id": self.title})["getMainProgram"] program_type = main.get("type", "MOVIE") series_title = self._title_from(main) selected = main.get("selected", {}) release_year = selected.get("releaseYear") lang = Language.get(self.config.get("default_language", "en")) if program_type == "MOVIE": title_name = self._title_from(selected) if selected else series_title program_meta = self._graphql("get_program", {"id": self.title})["getProgram"] return Movies([ Movie( id_=self.title, service=self.__class__, name=title_name, year=release_year, language=lang, data={"videoIntros": program_meta.get("videoIntros", {})}, ) ]) elif program_type in ("SERIES", "SEASON"): episodes = [] children = main.get("children", []) for season_data in children: if season_data.get("type") == "SEASON": season_short = season_data.get("title", {}).get("short", "S1") season_num = self._extract_season_number(season_short) selected_children = ( selected.get("children", []) if selected.get("id") == season_data["id"] else [] ) selected_map = {ep["id"]: ep for ep in selected_children} for idx, ep_data in enumerate(season_data.get("children", []), start=1): ep_id = ep_data["id"] ep_detail = selected_map.get(ep_id, {}) ep_title = ( self._title_from(ep_detail) if ep_detail.get("title") else self._title_from(ep_data) if ep_data.get("title") else f"Episode {idx}" ) ep_num = self._extract_episode_number(ep_title) or idx episodes.append(Episode( id_=ep_id, service=self.__class__, title=series_title, season=season_num, number=ep_num, name=ep_title, year=release_year, language=lang, data={"season_id": season_data["id"]}, )) elif season_data.get("type") == "EPISODE": ep_title = self._title_from(season_data) ep_num = self._extract_episode_number(ep_title) or len(episodes) + 1 episodes.append(Episode( id_=season_data["id"], service=self.__class__, title=series_title, season=1, number=ep_num, name=ep_title, year=release_year, language=lang, data={}, )) if not episodes: raise Exception(f"No episodes found for: {series_title}") self.log.info(f" + Found {len(episodes)} episodes across {len(children)} season(s)") return Series(episodes) else: raise NotImplementedError(f"Unsupported program type: {program_type}") def get_tracks(self, title: Title_T) -> Tracks: is_episode = isinstance(title, Episode) # Play scenario self.log.info("Checking play scenario...") scenario = self._graphql( "get_play_scenario", {"input": {"programId": title.id}} )["getPlayScenario"] behavior = scenario.get("behaviorType") self.log.info(f" + Play scenario: {behavior}") if behavior != "PLAYABLE": reason = scenario.get("reason", {}) raise Exception( f"Not playable. Behavior: {behavior}. " f"Reason: {reason.get('message', 'Unknown')}" ) # Parental control parental = scenario.get("parentalControl", {}) if parental and parental.get("behaviorType") == "PIN_CODE_REQUIRED": self.log.info("Validating parental PIN...") pin = self.config.get("parental_pin", "0000") pin_result = self._graphql( "validate_pin", {"input": {"pinCode": pin}} )["validateParentalControlPinCode"] if pin_result.get("status") != "SUCCESSFUL": raise Exception(f"PIN validation failed: {pin_result.get('status')}") self.log.info(" + PIN validated") # Play token self.log.info("Getting play token...") play_resp = self.session.post( url=self.config["endpoints"]["play"], headers={"content-type": "application/json"}, json={ "force": False, "programType": "Video", "videoId": title.id, "watchType": "episode" if is_episode else "movie", }, ).json() if play_resp.get("code") != "0": raise Exception(f"Play token failed: {play_resp}") play_data = play_resp["data"] self.play_token = play_data.get("vcmsAccessToken") or play_data.get("playToken") video_id = play_data.get("catchplayVideoId") if not self.play_token or not video_id: raise Exception("Missing play token or video ID") self.log.info(f" + Play token for: {video_id}") # Media info self.log.info("Fetching media info...") vcms = self.config["vcms"] media_resp = self.session.get( url=self.config["endpoints"]["media_info"].format(video_id=video_id), headers={ "authorization": f"Bearer {self.play_token}", "asiaplay-device-type": vcms["device_type"], "asiaplay-device-model": vcms["device_model"], "asiaplay-os-type": vcms["os_type"], "asiaplay-os-version": vcms["os_version"], "asiaplay-app-version": vcms["app_version"], "asiaplay-platform": vcms["platform"], "content-type": "application/x-www-form-urlencoded", }, ).json() manifest_url = media_resp.get("videoUrl") if not manifest_url: raise Exception(f"No video URL: {media_resp}") self.log.debug(f"Manifest: {manifest_url}") # DRM license_info = media_resp.get("license", {}) self.license_url = license_info.get("url", self.config["endpoints"]["widevine_license"]) self.license_headers = license_info.get("extraHeaders", {}) # DASH manifest (clean CDN session) self.log.info("Parsing DASH manifest...") cdn_session = req_lib.Session() cdn_session.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36 Edg/147.0.0.0" ), "Accept": "*/*", "Accept-Language": "en-US,en;q=0.9", "Origin": "https://www.catchplay.com", "Referer": "https://www.catchplay.com/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", }) tracks = DASH.from_url(url=manifest_url, session=cdn_session).to_tracks(language=title.language) for video in tracks.videos: video.range = Video.Range.SDR # VideoIntros for chapters if is_episode: meta = self._graphql("get_program", {"id": title.id})["getProgram"] title.data["videoIntros"] = meta.get("videoIntros", {}) elif not title.data.get("videoIntros"): meta = self._graphql("get_program", {"id": title.id})["getProgram"] title.data["videoIntros"] = meta.get("videoIntros", {}) return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: chapters = [] intros = title.data.get("videoIntros", {}) if not intros: return chapters def to_ms(iso: str) -> Optional[int]: if not iso: return None m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso) if not m: return None h, mi, s = int(m.group(1) or 0), int(m.group(2) or 0), int(m.group(3) or 0) return (h * 3600 + mi * 60 + s) * 1000 if intros.get("intro"): start = to_ms(intros["intro"].get("startTime")) end = to_ms(intros["intro"].get("endTime")) if start is not None: chapters.append(Chapter(timestamp=start, name="Intro")) if end is not None: chapters.append(Chapter(timestamp=end, name="After Intro")) if intros.get("recap"): start = to_ms(intros["recap"].get("startTime")) end = to_ms(intros["recap"].get("endTime")) if start is not None: chapters.append(Chapter(timestamp=start, name="Recap")) if end is not None: chapters.append(Chapter(timestamp=end, name="After Recap")) if intros.get("credits"): start = to_ms(intros["credits"].get("startTime")) if start is not None: chapters.append(Chapter(timestamp=start, name="Credits")) chapters.sort(key=lambda c: c.timestamp) return chapters def get_widevine_service_certificate(self, **_) -> Optional[str]: return self.config.get("certificate") def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: if not self.license_url: raise ValueError("No license URL. Call get_tracks() first.") license_session = req_lib.Session() license_session.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36 Edg/147.0.0.0" ), "Accept": "*/*", "Accept-Language": "en-US,en;q=0.9", "Origin": "https://www.catchplay.com", "Referer": "https://www.catchplay.com/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", }) license_session.headers.update(self.license_headers) response = license_session.post(url=self.license_url, data=challenge) if not response.ok: self.log.error(f"License error: {response.text}") response.raise_for_status() try: return response.json().get("license") except ValueError: return response.content