From cf3bab282ce3dd769adf1e705c1dd0c8a7b21bd0 Mon Sep 17 00:00:00 2001 From: FairTrade Date: Wed, 1 Apr 2026 11:14:06 +0200 Subject: [PATCH] Added cookie support for KNPY --- KNPY/__init__.py | 254 +++++++++++++++++++++++++++++++---------------- 1 file changed, 167 insertions(+), 87 deletions(-) diff --git a/KNPY/__init__.py b/KNPY/__init__.py index 8b21663..e7fa648 100644 --- a/KNPY/__init__.py +++ b/KNPY/__init__.py @@ -22,9 +22,9 @@ from unshackle.core.tracks import Subtitle, Tracks class KNPY(Service): """ Service code for Kanopy (kanopy.com). - Version: 1.0.0 + Version: 1.1.0 - Auth: Credential (username + password) + Auth: Cookies (kapi_token) or Credential (username + password) Security: FHD@L3 Handles both Movies and Series (Playlists). @@ -32,7 +32,6 @@ class KNPY(Service): Caching included """ - # Updated regex to match the new URL structure with library subdomain and path TITLE_RE = r"^https?://(?:www\.)?kanopy\.com/.+/(?P\d+)$" GEOFENCE = () NO_SUBTITLES = False @@ -74,110 +73,194 @@ class KNPY(Service): self.widevine_license_url = None def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - if not credential or not credential.username or not credential.password: - raise ValueError("Kanopy requires email and password for authentication.") - - cache = self.cache.get("auth_token") + """ + Authenticate using either cookies or credentials. - if cache and not cache.expired: - cached_data = cache.data - valid_token = None + Cookie-based auth: Requires 'kapi_token' cookie from browser. + Credential-based auth: Requires email and password. + """ - if isinstance(cached_data, dict) and "token" in cached_data: - if cached_data.get("username") == credential.username: - valid_token = cached_data["token"] - self.log.info("Using cached authentication token") - else: - self.log.info(f"Cached token belongs to '{cached_data.get('username')}', but logging in as '{credential.username}'. Re-authenticating.") + if cookies: + jwt_token = None + cookie_visitor_id = None + cookie_uid = None - elif isinstance(cached_data, str): - self.log.info("Found legacy cached token format. Re-authenticating to ensure correct user.") + # Extract relevant cookies + for cookie in cookies: + if cookie.name == "kapi_token": + jwt_token = cookie.value + elif cookie.name == "visitor_id": + cookie_visitor_id = cookie.value + elif cookie.name == "uid": + cookie_uid = cookie.value - if valid_token: - self._jwt = valid_token + if jwt_token: + self.log.info("Attempting cookie-based authentication...") + self._jwt = jwt_token self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) - - if not self._user_id or not self._domain_id or not self._visitor_id: - try: - decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) - self._user_id = decoded_jwt["data"]["uid"] - self._visitor_id = decoded_jwt["data"]["visitor_id"] - self.log.info(f"Extracted user_id and visitor_id from cached token.") - self._fetch_user_details() - return - except (KeyError, jwt.DecodeError) as e: - self.log.error(f"Could not decode cached token: {e}. Re-authenticating.") + + try: + # Decode JWT to extract user information + decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) + + # Check if token is expired + exp_timestamp = decoded_jwt.get("exp") + if exp_timestamp and exp_timestamp < datetime.now(timezone.utc).timestamp(): + self.log.warning("Cookie token has expired.") + if credential: + self.log.info("Falling back to credential-based authentication...") + else: + raise ValueError("Cookie token expired and no credentials provided.") + else: + # Extract user data from JWT + jwt_data = decoded_jwt.get("data", {}) + self._user_id = jwt_data.get("uid") or cookie_uid + self._visitor_id = jwt_data.get("visitor_id") or cookie_visitor_id - self.log.info("Performing handshake to get visitor token...") - r = self.session.get(self.config["endpoints"]["handshake"]) - r.raise_for_status() - handshake_data = r.json() - self._visitor_id = handshake_data["visitorId"] - initial_jwt = handshake_data["jwt"] - - self.log.info(f"Logging in as {credential.username}...") - login_payload = { - "credentialType": "email", - "emailUser": { - "email": credential.username, - "password": credential.password - } - } - r = self.session.post( - self.config["endpoints"]["login"], - json=login_payload, - headers={"authorization": f"Bearer {initial_jwt}"} - ) - r.raise_for_status() - login_data = r.json() - self._jwt = login_data["jwt"] - self._user_id = login_data["userId"] - - self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) - self.log.info(f"Successfully authenticated as {credential.username}") - - self._fetch_user_details() - - try: - decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) - exp_timestamp = decoded_jwt.get("exp") - - cache_payload = { - "token": self._jwt, - "username": credential.username - } - - if exp_timestamp: - expiration_in_seconds = int(exp_timestamp - datetime.now(timezone.utc).timestamp()) - self.log.info(f"Caching token for {expiration_in_seconds / 60:.2f} minutes.") - cache.set(data=cache_payload, expiration=expiration_in_seconds) + if not self._user_id: + raise ValueError("Could not extract user_id from cookie token") + + self.log.info(f"Successfully authenticated via cookies (user_id: {self._user_id})") + + # Fetch user library memberships to get domain_id + self._fetch_user_details() + return + + except jwt.DecodeError as e: + self.log.error(f"Failed to decode cookie token: {e}") + if credential: + self.log.info("Falling back to credential-based authentication...") + else: + raise ValueError(f"Invalid kapi_token cookie: {e}") + except KeyError as e: + self.log.error(f"Missing expected field in cookie token: {e}") + if credential: + self.log.info("Falling back to credential-based authentication...") + else: + raise ValueError(f"Invalid kapi_token structure: {e}") else: - self.log.warning("JWT has no 'exp' claim, caching for 1 hour as a fallback.") - cache.set(data=cache_payload, expiration=3600) - except Exception as e: - self.log.error(f"Failed to decode JWT for caching: {e}. Caching for 1 hour as a fallback.") - cache.set( - data={"token": self._jwt, "username": credential.username}, - expiration=3600 + self.log.info("No kapi_token found in cookies.") + if not credential: + raise ValueError("No kapi_token cookie found and no credentials provided.") + self.log.info("Falling back to credential-based authentication...") + + if not self._jwt: # Only proceed if not already authenticated via cookies + if not credential or not credential.username or not credential.password: + raise ValueError("Kanopy requires either cookies (with kapi_token) or email/password for authentication.") + + # Check for cached credential-based token + cache = self.cache.get("auth_token") + + if cache and not cache.expired: + cached_data = cache.data + valid_token = None + + if isinstance(cached_data, dict) and "token" in cached_data: + if cached_data.get("username") == credential.username: + valid_token = cached_data["token"] + self.log.info("Using cached authentication token") + else: + self.log.info(f"Cached token belongs to '{cached_data.get('username')}', but logging in as '{credential.username}'. Re-authenticating.") + + elif isinstance(cached_data, str): + self.log.info("Found legacy cached token format. Re-authenticating to ensure correct user.") + + if valid_token: + self._jwt = valid_token + self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) + + if not self._user_id or not self._domain_id or not self._visitor_id: + try: + decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) + self._user_id = decoded_jwt["data"]["uid"] + self._visitor_id = decoded_jwt["data"]["visitor_id"] + self.log.info(f"Extracted user_id and visitor_id from cached token.") + self._fetch_user_details() + return + except (KeyError, jwt.DecodeError) as e: + self.log.error(f"Could not decode cached token: {e}. Re-authenticating.") + + # Perform fresh login with credentials + self.log.info("Performing handshake to get visitor token...") + r = self.session.get(self.config["endpoints"]["handshake"]) + r.raise_for_status() + handshake_data = r.json() + self._visitor_id = handshake_data["visitorId"] + initial_jwt = handshake_data["jwt"] + + self.log.info(f"Logging in as {credential.username}...") + login_payload = { + "credentialType": "email", + "emailUser": { + "email": credential.username, + "password": credential.password + } + } + r = self.session.post( + self.config["endpoints"]["login"], + json=login_payload, + headers={"authorization": f"Bearer {initial_jwt}"} ) + r.raise_for_status() + login_data = r.json() + self._jwt = login_data["jwt"] + self._user_id = login_data["userId"] + + self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) + self.log.info(f"Successfully authenticated as {credential.username}") + + self._fetch_user_details() + + # Cache the token + try: + decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) + exp_timestamp = decoded_jwt.get("exp") + + cache_payload = { + "token": self._jwt, + "username": credential.username + } + + if exp_timestamp: + expiration_in_seconds = int(exp_timestamp - datetime.now(timezone.utc).timestamp()) + self.log.info(f"Caching token for {expiration_in_seconds / 60:.2f} minutes.") + cache.set(data=cache_payload, expiration=expiration_in_seconds) + else: + self.log.warning("JWT has no 'exp' claim, caching for 1 hour as a fallback.") + cache.set(data=cache_payload, expiration=3600) + except Exception as e: + self.log.error(f"Failed to decode JWT for caching: {e}. Caching for 1 hour as a fallback.") + cache.set( + data={"token": self._jwt, "username": credential.username}, + expiration=3600 + ) def _fetch_user_details(self): + """Fetch user library memberships to determine the active domain_id.""" self.log.info("Fetching user library memberships...") r = self.session.get(self.config["endpoints"]["memberships"].format(user_id=self._user_id)) r.raise_for_status() memberships = r.json() + # Look for the default active membership for membership in memberships.get("list", []): if membership.get("status") == "active" and membership.get("isDefault", False): self._domain_id = str(membership["domainId"]) self.log.info(f"Using default library domain: {membership.get('sitename', 'Unknown')} (ID: {self._domain_id})") return + # Fallback to first active membership + for membership in memberships.get("list", []): + if membership.get("status") == "active": + self._domain_id = str(membership["domainId"]) + self.log.warning(f"No default library found. Using first active domain: {self._domain_id}") + return + if memberships.get("list"): self._domain_id = str(memberships["list"][0]["domainId"]) - self.log.warning(f"No default library found. Using first active domain: {self._domain_id}") + self.log.warning(f"No active library found. Using first available domain: {self._domain_id}") else: - raise ValueError("No active library memberships found for this user.") + raise ValueError("No library memberships found for this user.") def get_titles(self) -> Titles_T: if not self.content_id: @@ -456,9 +539,6 @@ class KNPY(Service): title = item.get("title", "Unknown Title") - # Since the search API doesn't explicitly return "type", - # we provide a generic label or try to guess. - # In your get_titles logic, you handle both, so we point to the watch URL. yield SearchResult( id_=str(video_id), title=title, @@ -467,4 +547,4 @@ class KNPY(Service): ) def get_chapters(self, title: Title_T) -> list: - return [] + return [] \ No newline at end of file