Added cookie support for KNPY

This commit is contained in:
FairTrade 2026-04-01 11:14:06 +02:00
parent fa776a590a
commit cf3bab282c

View File

@ -22,9 +22,9 @@ from unshackle.core.tracks import Subtitle, Tracks
class KNPY(Service):
"""
Service code for Kanopy (kanopy.com).
Version: 1.0.0
Version: 1.1.0
Auth: Credential (username + password)
Auth: Cookies (kapi_token) or Credential (username + password)
Security: FHD@L3
Handles both Movies and Series (Playlists).
@ -32,7 +32,6 @@ class KNPY(Service):
Caching included
"""
# Updated regex to match the new URL structure with library subdomain and path
TITLE_RE = r"^https?://(?:www\.)?kanopy\.com/.+/(?P<id>\d+)$"
GEOFENCE = ()
NO_SUBTITLES = False
@ -74,9 +73,82 @@ class KNPY(Service):
self.widevine_license_url = None
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential or not credential.username or not credential.password:
raise ValueError("Kanopy requires email and password for authentication.")
"""
Authenticate using either cookies or credentials.
Cookie-based auth: Requires 'kapi_token' cookie from browser.
Credential-based auth: Requires email and password.
"""
if cookies:
jwt_token = None
cookie_visitor_id = None
cookie_uid = None
# Extract relevant cookies
for cookie in cookies:
if cookie.name == "kapi_token":
jwt_token = cookie.value
elif cookie.name == "visitor_id":
cookie_visitor_id = cookie.value
elif cookie.name == "uid":
cookie_uid = cookie.value
if jwt_token:
self.log.info("Attempting cookie-based authentication...")
self._jwt = jwt_token
self.session.headers.update({"authorization": f"Bearer {self._jwt}"})
try:
# Decode JWT to extract user information
decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False})
# Check if token is expired
exp_timestamp = decoded_jwt.get("exp")
if exp_timestamp and exp_timestamp < datetime.now(timezone.utc).timestamp():
self.log.warning("Cookie token has expired.")
if credential:
self.log.info("Falling back to credential-based authentication...")
else:
raise ValueError("Cookie token expired and no credentials provided.")
else:
# Extract user data from JWT
jwt_data = decoded_jwt.get("data", {})
self._user_id = jwt_data.get("uid") or cookie_uid
self._visitor_id = jwt_data.get("visitor_id") or cookie_visitor_id
if not self._user_id:
raise ValueError("Could not extract user_id from cookie token")
self.log.info(f"Successfully authenticated via cookies (user_id: {self._user_id})")
# Fetch user library memberships to get domain_id
self._fetch_user_details()
return
except jwt.DecodeError as e:
self.log.error(f"Failed to decode cookie token: {e}")
if credential:
self.log.info("Falling back to credential-based authentication...")
else:
raise ValueError(f"Invalid kapi_token cookie: {e}")
except KeyError as e:
self.log.error(f"Missing expected field in cookie token: {e}")
if credential:
self.log.info("Falling back to credential-based authentication...")
else:
raise ValueError(f"Invalid kapi_token structure: {e}")
else:
self.log.info("No kapi_token found in cookies.")
if not credential:
raise ValueError("No kapi_token cookie found and no credentials provided.")
self.log.info("Falling back to credential-based authentication...")
if not self._jwt: # Only proceed if not already authenticated via cookies
if not credential or not credential.username or not credential.password:
raise ValueError("Kanopy requires either cookies (with kapi_token) or email/password for authentication.")
# Check for cached credential-based token
cache = self.cache.get("auth_token")
if cache and not cache.expired:
@ -108,6 +180,7 @@ class KNPY(Service):
except (KeyError, jwt.DecodeError) as e:
self.log.error(f"Could not decode cached token: {e}. Re-authenticating.")
# Perform fresh login with credentials
self.log.info("Performing handshake to get visitor token...")
r = self.session.get(self.config["endpoints"]["handshake"])
r.raise_for_status()
@ -138,6 +211,7 @@ class KNPY(Service):
self._fetch_user_details()
# Cache the token
try:
decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False})
exp_timestamp = decoded_jwt.get("exp")
@ -162,22 +236,31 @@ class KNPY(Service):
)
def _fetch_user_details(self):
"""Fetch user library memberships to determine the active domain_id."""
self.log.info("Fetching user library memberships...")
r = self.session.get(self.config["endpoints"]["memberships"].format(user_id=self._user_id))
r.raise_for_status()
memberships = r.json()
# Look for the default active membership
for membership in memberships.get("list", []):
if membership.get("status") == "active" and membership.get("isDefault", False):
self._domain_id = str(membership["domainId"])
self.log.info(f"Using default library domain: {membership.get('sitename', 'Unknown')} (ID: {self._domain_id})")
return
# Fallback to first active membership
for membership in memberships.get("list", []):
if membership.get("status") == "active":
self._domain_id = str(membership["domainId"])
self.log.warning(f"No default library found. Using first active domain: {self._domain_id}")
return
if memberships.get("list"):
self._domain_id = str(memberships["list"][0]["domainId"])
self.log.warning(f"No default library found. Using first active domain: {self._domain_id}")
self.log.warning(f"No active library found. Using first available domain: {self._domain_id}")
else:
raise ValueError("No active library memberships found for this user.")
raise ValueError("No library memberships found for this user.")
def get_titles(self) -> Titles_T:
if not self.content_id:
@ -456,9 +539,6 @@ class KNPY(Service):
title = item.get("title", "Unknown Title")
# Since the search API doesn't explicitly return "type",
# we provide a generic label or try to guess.
# In your get_titles logic, you handle both, so we point to the watch URL.
yield SearchResult(
id_=str(video_id),
title=title,