diff --git a/CPY/__init__.py b/CPY/__init__.py new file mode 100644 index 0000000..c08e680 --- /dev/null +++ b/CPY/__init__.py @@ -0,0 +1,566 @@ +import base64 +import json +import re +import time +from collections.abc import Generator +from typing import Optional, Union + +import click +import requests as req_lib +from langcodes import Language + +from unshackle.core.constants import AnyTrack +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T +from unshackle.core.tracks import Chapter, Tracks, Video + + +class CPY(Service): + """ + Service code for catchplay.com + Version: 1.0.0 + + Authorization: Credentials + + Security: HD@L3 + + Use full URL (for example - https://www.catchplay.com/id/video/1b8c1ba3-9015-4f99-8131-25dd45a4b033) + or title ID (for example - 1b8c1ba3-9015-4f99-8131-25dd45a4b033). + + IMPORTANT: + CHANGE YOUR PARENTAL PIN IN THE CONFIG.YAML THAT ACCORDING TO YOURS + """ + + TITLE_RE = r"^(?:https?://(?:www\.)?catchplay\.com/\w+/(?:movie|series|video)/)?(?P[a-f0-9-]{36})" + GEOFENCE = ("ID", "TW", "SG", "HK", "TH") + + @staticmethod + @click.command(name="CPY", short_help="https://catchplay.com") + @click.argument("title", type=str) + @click.pass_context + def cli(ctx, **kwargs): + return CPY(ctx, **kwargs) + + def __init__(self, ctx, title): + super().__init__(ctx) + self.title = title + self.cdm = ctx.obj.cdm + + self.access_token: Optional[str] = None + self.refresh_token: Optional[str] = None + self.token_expiry: float = 0 + self.account_info: dict = {} + + self.play_token: Optional[str] = None + self.license_url: Optional[str] = None + self.license_headers: Optional[dict] = None + + profile_name = ctx.parent.params.get("profile") + self.profile = profile_name or "default" + + def authenticate(self, cookies=None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + + if not credential or not credential.username or not credential.password: + raise EnvironmentError("Service requires Credentials for Authentication.") + + self.credential = credential + cache_key = f"tokens_{self.profile}" + cache = self.cache.get(cache_key) + + if cache and not cache.expired: + cached = cache.data + if isinstance(cached, dict) and cached.get("username") == credential.username: + # Check if access token is still valid + if cached.get("token_expiry", 0) > time.time(): + self.log.info("Using cached tokens") + self._restore_from_cache(cached) + return + # Access token expired but we have a refresh token + elif cached.get("refresh_token"): + self.log.info("Access token expired, refreshing...") + try: + self._refresh_auth(cached["refresh_token"]) + self._cache_tokens(credential.username, cache_key) + return + except Exception as e: + self.log.warning(f"Refresh failed ({e}), doing fresh login...") + + # Fresh login + self.log.info("Logging in...") + self._do_login(credential) + self._cache_tokens(credential.username, cache_key) + + def _do_login(self, credential: Credential) -> None: + """Perform full guest token + credential login flow.""" + territory = self.config.get("territory", "ID") + device = self.config["device"] + + # Step 1: Guest token + self.log.info("Fetching guest token...") + guest_resp = self.session.get( + url=self.config["endpoints"]["guest_token"], + headers={"Referer": f"https://www.catchplay.com/{territory.lower()}/home"}, + ).json() + + if not guest_resp.get("access_token"): + raise Exception("Failed to get guest token") + + # Step 2: Login + login_resp = self.session.post( + url=self.config["endpoints"]["login"], + headers={ + "Content-Type": "application/json", + "asiaplay-territory": territory, + "asiaplay-device-type": device["type"], + "asiaplay-device-version": device["version"], + "Referer": f"https://www.catchplay.com/{territory.lower()}/login", + }, + json={ + "username": credential.username, + "password": credential.password, + "rememberMe": False, + }, + ).json() + + if not login_resp.get("access_token"): + raise Exception(f"Login failed: {login_resp}") + + self._apply_tokens(login_resp) + + user = login_resp.get("user", {}) + self.account_info = user + self.log.info( + f" + Logged in as: {credential.username} " + f"[{user.get('accountType', 'unknown')} / {user.get('accountStatus', 'unknown')}]" + ) + + def _refresh_auth(self, refresh_token: str) -> None: + """Refresh the access token using the refresh token.""" + refresh_resp = self.session.get( + url=self.config["endpoints"]["refresh"], + headers={ + "Referer": "https://www.catchplay.com/", + }, + cookies={"connect.sid": self._connect_sid} if hasattr(self, "_connect_sid") else {}, + ).json() + + if not refresh_resp.get("access_token"): + raise Exception(f"Refresh failed: {refresh_resp}") + + self._apply_tokens(refresh_resp) + + user = refresh_resp.get("user", {}) + self.account_info = user + self.log.info( + f" + Token refreshed " + f"[{user.get('accountType', 'unknown')} / {user.get('accountStatus', 'unknown')}]" + ) + + def _apply_tokens(self, token_data: dict) -> None: + """Apply tokens from login or refresh response to session.""" + self.access_token = token_data["access_token"] + self.refresh_token = token_data.get("refresh_token") + + # Calculate expiry from JWT or expires_in + expires_in = token_data.get("expires_in", 86400) + self.token_expiry = time.time() + expires_in - 300 # 5 min buffer + + territory = self.config.get("territory", "ID") + device = self.config["device"] + + self.session.headers.update({ + "authorization": f"Bearer {self.access_token}", + "asiaplay-territory": territory, + "asiaplay-device-type": device["type"], + "asiaplay-device-version": device["version"], + "asiaplay-os-type": device["os_type"], + "asiaplay-os-version": device["os_version"], + "origin": "https://www.catchplay.com", + "referer": "https://www.catchplay.com/", + }) + + def _cache_tokens(self, username: str, cache_key: str) -> None: + """Cache current tokens for reuse.""" + cache = self.cache.get(cache_key) + cache.set( + data={ + "username": username, + "access_token": self.access_token, + "refresh_token": self.refresh_token, + "token_expiry": self.token_expiry, + "account_info": self.account_info, + } + ) + + def _restore_from_cache(self, cached: dict) -> None: + """Restore session state from cached token data.""" + self.access_token = cached["access_token"] + self.refresh_token = cached.get("refresh_token") + self.token_expiry = cached.get("token_expiry", 0) + self.account_info = cached.get("account_info", {}) + + territory = self.config.get("territory", "ID") + device = self.config["device"] + + self.session.headers.update({ + "authorization": f"Bearer {self.access_token}", + "asiaplay-territory": territory, + "asiaplay-device-type": device["type"], + "asiaplay-device-version": device["version"], + "asiaplay-os-type": device["os_type"], + "asiaplay-os-version": device["os_version"], + "origin": "https://www.catchplay.com", + "referer": "https://www.catchplay.com/", + }) + + self.log.info( + f" + Restored session " + f"[{self.account_info.get('accountType', 'unknown')} / " + f"{self.account_info.get('accountStatus', 'unknown')}]" + ) + + def _graphql(self, key: str, variables: dict) -> dict: + """Execute a GraphQL query defined in config.""" + cfg = self.config["graphql"][key] + endpoint_key = cfg["endpoint"] + url = self.config["endpoints"][endpoint_key] + + resp = self.session.post( + url=url, + headers={ + "asiaplay-api-name": cfg["api_name"], + "content-type": "application/json", + }, + json={ + "operationName": cfg["operation"], + "variables": variables, + "query": cfg["query"], + }, + ).json() + + if resp.get("errors"): + raise Exception(f"GraphQL error ({key}): {resp['errors']}") + + return resp["data"] + + def search(self) -> Generator[SearchResult, None, None]: + self.log.info(f"Searching for: {self.title}") + + data = self._graphql("search", {"keyword": self.title}) + programs = data.get("searchKeywordSuggestions", {}).get("programs", []) + + for program in programs: + yield SearchResult( + id_=program["id"], + title=program["name"], + label="TITLE", + url=f"https://www.catchplay.com/id/video/{program['id']}", + ) + + @staticmethod + def _title_from(obj: dict) -> str: + return obj.get("title", {}).get("eng") or obj.get("title", {}).get("local") or "Unknown" + + @staticmethod + def _extract_season_number(title: str) -> int: + match = re.search(r"S(\d+)", title) + return int(match.group(1)) if match else 1 + + @staticmethod + def _extract_episode_number(title: str) -> int: + match = re.search(r"Episode\s+(\d+)", title, re.IGNORECASE) + return int(match.group(1)) if match else 0 + + def get_titles(self) -> Titles_T: + title_id = re.match(self.TITLE_RE, self.title) + if not title_id: + raise ValueError(f"Could not parse title ID from: {self.title}") + self.title = title_id.group("title_id") + + self.log.info(f"Fetching metadata for: {self.title}") + main = self._graphql("get_main_program", {"id": self.title})["getMainProgram"] + + program_type = main.get("type", "MOVIE") + series_title = self._title_from(main) + selected = main.get("selected", {}) + release_year = selected.get("releaseYear") + lang = Language.get(self.config.get("default_language", "en")) + + if program_type == "MOVIE": + title_name = self._title_from(selected) if selected else series_title + program_meta = self._graphql("get_program", {"id": self.title})["getProgram"] + + return Movies([ + Movie( + id_=self.title, + service=self.__class__, + name=title_name, + year=release_year, + language=lang, + data={"videoIntros": program_meta.get("videoIntros", {})}, + ) + ]) + + elif program_type in ("SERIES", "SEASON"): + episodes = [] + children = main.get("children", []) + + for season_data in children: + if season_data.get("type") == "SEASON": + season_short = season_data.get("title", {}).get("short", "S1") + season_num = self._extract_season_number(season_short) + + selected_children = ( + selected.get("children", []) + if selected.get("id") == season_data["id"] + else [] + ) + selected_map = {ep["id"]: ep for ep in selected_children} + + for idx, ep_data in enumerate(season_data.get("children", []), start=1): + ep_id = ep_data["id"] + ep_detail = selected_map.get(ep_id, {}) + + ep_title = ( + self._title_from(ep_detail) if ep_detail.get("title") else + self._title_from(ep_data) if ep_data.get("title") else + f"Episode {idx}" + ) + + ep_num = self._extract_episode_number(ep_title) or idx + + episodes.append(Episode( + id_=ep_id, + service=self.__class__, + title=series_title, + season=season_num, + number=ep_num, + name=ep_title, + year=release_year, + language=lang, + data={"season_id": season_data["id"]}, + )) + + elif season_data.get("type") == "EPISODE": + ep_title = self._title_from(season_data) + ep_num = self._extract_episode_number(ep_title) or len(episodes) + 1 + + episodes.append(Episode( + id_=season_data["id"], + service=self.__class__, + title=series_title, + season=1, + number=ep_num, + name=ep_title, + year=release_year, + language=lang, + data={}, + )) + + if not episodes: + raise Exception(f"No episodes found for: {series_title}") + + self.log.info(f" + Found {len(episodes)} episodes across {len(children)} season(s)") + return Series(episodes) + + else: + raise NotImplementedError(f"Unsupported program type: {program_type}") + + def get_tracks(self, title: Title_T) -> Tracks: + is_episode = isinstance(title, Episode) + + # Play scenario + self.log.info("Checking play scenario...") + scenario = self._graphql( + "get_play_scenario", + {"input": {"programId": title.id}} + )["getPlayScenario"] + + behavior = scenario.get("behaviorType") + self.log.info(f" + Play scenario: {behavior}") + + if behavior != "PLAYABLE": + reason = scenario.get("reason", {}) + raise Exception( + f"Not playable. Behavior: {behavior}. " + f"Reason: {reason.get('message', 'Unknown')}" + ) + + # Parental control + parental = scenario.get("parentalControl", {}) + if parental and parental.get("behaviorType") == "PIN_CODE_REQUIRED": + self.log.info("Validating parental PIN...") + pin = self.config.get("parental_pin", "0000") + pin_result = self._graphql( + "validate_pin", + {"input": {"pinCode": pin}} + )["validateParentalControlPinCode"] + + if pin_result.get("status") != "SUCCESSFUL": + raise Exception(f"PIN validation failed: {pin_result.get('status')}") + self.log.info(" + PIN validated") + + # Play token + self.log.info("Getting play token...") + play_resp = self.session.post( + url=self.config["endpoints"]["play"], + headers={"content-type": "application/json"}, + json={ + "force": False, + "programType": "Video", + "videoId": title.id, + "watchType": "episode" if is_episode else "movie", + }, + ).json() + + if play_resp.get("code") != "0": + raise Exception(f"Play token failed: {play_resp}") + + play_data = play_resp["data"] + self.play_token = play_data.get("vcmsAccessToken") or play_data.get("playToken") + video_id = play_data.get("catchplayVideoId") + + if not self.play_token or not video_id: + raise Exception("Missing play token or video ID") + + self.log.info(f" + Play token for: {video_id}") + + # Media info + self.log.info("Fetching media info...") + vcms = self.config["vcms"] + + media_resp = self.session.get( + url=self.config["endpoints"]["media_info"].format(video_id=video_id), + headers={ + "authorization": f"Bearer {self.play_token}", + "asiaplay-device-type": vcms["device_type"], + "asiaplay-device-model": vcms["device_model"], + "asiaplay-os-type": vcms["os_type"], + "asiaplay-os-version": vcms["os_version"], + "asiaplay-app-version": vcms["app_version"], + "asiaplay-platform": vcms["platform"], + "content-type": "application/x-www-form-urlencoded", + }, + ).json() + + manifest_url = media_resp.get("videoUrl") + if not manifest_url: + raise Exception(f"No video URL: {media_resp}") + + self.log.debug(f"Manifest: {manifest_url}") + + # DRM + license_info = media_resp.get("license", {}) + self.license_url = license_info.get("url", self.config["endpoints"]["widevine_license"]) + self.license_headers = license_info.get("extraHeaders", {}) + + # DASH manifest (clean CDN session) + self.log.info("Parsing DASH manifest...") + + cdn_session = req_lib.Session() + cdn_session.headers.update({ + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36 Edg/147.0.0.0" + ), + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9", + "Origin": "https://www.catchplay.com", + "Referer": "https://www.catchplay.com/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", + }) + + tracks = DASH.from_url(url=manifest_url, session=cdn_session).to_tracks(language=title.language) + + for video in tracks.videos: + video.range = Video.Range.SDR + + # VideoIntros for chapters + if is_episode: + meta = self._graphql("get_program", {"id": title.id})["getProgram"] + title.data["videoIntros"] = meta.get("videoIntros", {}) + elif not title.data.get("videoIntros"): + meta = self._graphql("get_program", {"id": title.id})["getProgram"] + title.data["videoIntros"] = meta.get("videoIntros", {}) + + return tracks + + def get_chapters(self, title: Title_T) -> list[Chapter]: + chapters = [] + intros = title.data.get("videoIntros", {}) + if not intros: + return chapters + + def to_ms(iso: str) -> Optional[int]: + if not iso: + return None + m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso) + if not m: + return None + h, mi, s = int(m.group(1) or 0), int(m.group(2) or 0), int(m.group(3) or 0) + return (h * 3600 + mi * 60 + s) * 1000 + + if intros.get("intro"): + start = to_ms(intros["intro"].get("startTime")) + end = to_ms(intros["intro"].get("endTime")) + if start is not None: + chapters.append(Chapter(timestamp=start, name="Intro")) + if end is not None: + chapters.append(Chapter(timestamp=end, name="After Intro")) + + if intros.get("recap"): + start = to_ms(intros["recap"].get("startTime")) + end = to_ms(intros["recap"].get("endTime")) + if start is not None: + chapters.append(Chapter(timestamp=start, name="Recap")) + if end is not None: + chapters.append(Chapter(timestamp=end, name="After Recap")) + + if intros.get("credits"): + start = to_ms(intros["credits"].get("startTime")) + if start is not None: + chapters.append(Chapter(timestamp=start, name="Credits")) + + chapters.sort(key=lambda c: c.timestamp) + return chapters + + def get_widevine_service_certificate(self, **_) -> Optional[str]: + return self.config.get("certificate") + + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: + if not self.license_url: + raise ValueError("No license URL. Call get_tracks() first.") + + license_session = req_lib.Session() + license_session.headers.update({ + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36 Edg/147.0.0.0" + ), + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9", + "Origin": "https://www.catchplay.com", + "Referer": "https://www.catchplay.com/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-site", + }) + license_session.headers.update(self.license_headers) + + response = license_session.post(url=self.license_url, data=challenge) + + if not response.ok: + self.log.error(f"License error: {response.text}") + + response.raise_for_status() + + try: + return response.json().get("license") + except ValueError: + return response.content \ No newline at end of file diff --git a/CPY/config.yaml b/CPY/config.yaml new file mode 100644 index 0000000..79114dc --- /dev/null +++ b/CPY/config.yaml @@ -0,0 +1,136 @@ +territory: ID +default_language: en +parental_pin: "0000" #CHANGE THIS ACCORDING TO YOUR PIN + +device: + type: WEB_PC + version: 3.0.138.4463 + os_type: Windows_Edge + os_version: "10,146.0.0.0" + +vcms: + device_type: web + device_model: windows + os_type: chrome + os_version: 147.0.0 + app_version: "3.0" + platform: desktop + +endpoints: + guest_token: https://www.catchplay.com/api/v2/oauth + login: https://www.catchplay.com/api/v2/oauth/login + refresh: https://www.catchplay.com/api/v2/oauth/refresh + graphql_program: https://sunapi.catchplay.com/program/v3/graphql + graphql_membership: https://sunapi.catchplay.com/membership/v3/graphql + graphql_membership_program: https://sunapi.catchplay.com/membership-program/v3/graphql + play: https://hp2-api.catchplay.com/me/play + media_info: "https://vcmsapi.catchplay.com/video/v3/mediaInfo/{video_id}" + widevine_license: https://vcmsapi.catchplay.com/video-drm/widevine + +graphql: + search: + operation: searchKeywordSuggestions + api_name: searchKeywordSuggestions + endpoint: graphql_membership_program + query: | + query searchKeywordSuggestions($keyword: String!) { + searchKeywordSuggestions(keyword: $keyword) { + programs { + id + name + photoUrl + orientation + } + } + } + + get_main_program: + operation: getMainProgram + api_name: getMainProgram + endpoint: graphql_program + query: | + query getMainProgram($id: ID!) { + getMainProgram(id: $id) { + id + type + title { local eng } + totalChildren + children { + id + type + title { short local eng } + children { + id + type + title { local eng } + publishedDate + playerInfo { duration videoCode } + } + } + selected { + id + type + releaseYear + synopsis + title { local eng } + children { + id + type + title { local eng } + synopsis + publishedDate + playerInfo { duration videoCode } + } + } + } + } + + get_program: + operation: getProgram + api_name: getProgram + endpoint: graphql_program + query: | + query getProgram($id: ID!) { + getProgram(id: $id) { + id + title { local eng } + type + videoIntros { + intro { startTime endTime } + recap { startTime endTime } + credits { startTime endTime } + } + } + } + + get_play_scenario: + operation: getPlayScenario + api_name: getPlayScenario + endpoint: graphql_membership_program + query: | + query getPlayScenario($input: PlayScenarioInput!) { + getPlayScenario(input: $input) { + behaviorType + description + reason { message } + parentalControl { behaviorType title message } + playProgram { + id + type + title { local playing } + playerInfo { videoCode } + } + } + } + + validate_pin: + operation: validateParentalControlPinCode + api_name: validateParentalControlPinCode + endpoint: graphql_membership + query: | + query validateParentalControlPinCode($input: ValidateParentalControlInput!) { + validateParentalControlPinCode(input: $input) { + status + description + } + } \ No newline at end of file diff --git a/README.md b/README.md index 1852f8e..2f5226c 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,8 @@ - PlayReady needed 14. GLA: - Subs sometimes broken (it's on there side) + 15. CPY: + - Currently it supports only 720p because there is no TV parameter, needed that - Acknowledgment