Fixed collection not supported and bug fix for audio (KNPY)

This commit is contained in:
FairTrade 2026-04-08 21:03:28 +02:00
parent cf3bab282c
commit 27e2eaa481

View File

@ -274,14 +274,17 @@ class KNPY(Service):
content_type = content_data.get("type")
def parse_lang(data):
def parse_lang(taxonomies_data: dict) -> Language:
"""Parses language from the taxonomies dictionary."""
try:
langs = data.get("languages", [])
if langs and isinstance(langs, list) and len(langs) > 0:
return Language.find(langs[0])
except:
langs = taxonomies_data.get("languages", [])
if langs:
lang_name = langs[0].get("name")
if lang_name:
return Language.find(lang_name)
except (IndexError, AttributeError, TypeError):
pass
return Language.get("en")
return Language.get("en") # Default to English
if content_type == "video":
video_data = content_data["video"]
@ -291,22 +294,25 @@ class KNPY(Service):
name=video_data["title"],
year=video_data.get("productionYear"),
description=video_data.get("descriptionHtml", ""),
language=parse_lang(video_data),
language=parse_lang(video_data.get("taxonomies", {})),
data=video_data,
)
return Movies([movie])
elif content_type == "playlist":
playlist_data = content_data["playlist"]
playlist_data = content_data.get("playlist")
if not playlist_data:
raise ValueError("Could not find 'playlist' data dictionary.")
series_title = playlist_data["title"]
series_year = playlist_data.get("productionYear")
season_match = re.search(r'(?:Season|S)\s*(\d+)', series_title, re.IGNORECASE)
season_num = int(season_match.group(1)) if season_match else 1
r = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id))
r.raise_for_status()
items_data = r.json()
r_items = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id))
r_items.raise_for_status()
items_data = r_items.json()
episodes = []
for i, item in enumerate(items_data.get("list", [])):
@ -316,8 +322,8 @@ class KNPY(Service):
video_data = item["video"]
ep_num = i + 1
ep_title = video_data.get("title", "")
ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title, re.IGNORECASE)
ep_title_str = video_data.get("title", "")
ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title_str, re.IGNORECASE)
if ep_match:
ep_num = int(ep_match.group(1))
@ -331,7 +337,7 @@ class KNPY(Service):
name=video_data["title"],
description=video_data.get("descriptionHtml", ""),
year=video_data.get("productionYear", series_year),
language=parse_lang(video_data),
language=parse_lang(video_data.get("taxonomies", {})),
data=video_data,
)
)
@ -341,6 +347,83 @@ class KNPY(Service):
series.description = playlist_data.get("descriptionHtml", "")
series.year = series_year
return series
elif content_type == "collection":
collection_data = content_data.get("collection")
if not collection_data:
raise ValueError("Could not find 'collection' data dictionary.")
series_title_main = collection_data["title"]
series_description_main = collection_data.get("descriptionHtml", "")
series_year_main = collection_data.get("productionYear")
r_seasons = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id))
r_seasons.raise_for_status()
seasons_data = r_seasons.json()
all_episodes = []
self.log.info(f"Processing collection '{series_title_main}', found {len(seasons_data.get('list', []))} seasons.")
season_counter = 1
for season_item in seasons_data.get("list", []):
if season_item.get("type") != "playlist":
self.log.warning(f"Skipping unexpected item of type '{season_item.get('type')}' in collection.")
continue
season_playlist_data = season_item["playlist"]
season_id = season_playlist_data["videoId"]
season_title = season_playlist_data["title"]
self.log.info(f"Fetching episodes for season: {season_title}")
season_match = re.search(r'(?:Season|S)\s*(\d+)', season_title, re.IGNORECASE)
if season_match:
season_num = int(season_match.group(1))
else:
self.log.warning(f"Could not parse season number from '{season_title}'. Using sequential number {season_counter}.")
season_num = season_counter
season_counter += 1
r_episodes = self.session.get(self.config["endpoints"]["video_items"].format(video_id=season_id, domain_id=self._domain_id))
r_episodes.raise_for_status()
episodes_data = r_episodes.json()
for i, episode_item in enumerate(episodes_data.get("list", [])):
if episode_item.get("type") != "video":
continue
video_data = episode_item["video"]
ep_num = i + 1
ep_title_str = video_data.get("title", "")
ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title_str, re.IGNORECASE)
if ep_match:
ep_num = int(ep_match.group(1))
all_episodes.append(
Episode(
id_=str(video_data["videoId"]),
service=self.__class__,
title=series_title_main,
season=season_num,
number=ep_num,
name=video_data["title"],
description=video_data.get("descriptionHtml", ""),
year=video_data.get("productionYear", series_year_main),
language=parse_lang(video_data.get("taxonomies", {})),
data=video_data,
)
)
if not all_episodes:
self.log.error(f"Collection '{series_title_main}' did not yield any episodes. The structure may have changed.")
return Series([])
series = Series(all_episodes)
series.name = series_title_main
series.description = series_description_main
series.year = series_year_main
return series
else:
raise ValueError(f"Unsupported content type: {content_type}")
@ -364,7 +447,6 @@ class KNPY(Service):
except Exception:
pass
# Handle known errors gracefully
if r.status_code == 403:
if response_json and response_json.get("errorSubcode") == "playRegionRestricted":
self.log.error("Kanopy reports: This video is not available in your country.")
@ -374,29 +456,27 @@ class KNPY(Service):
else:
self.log.error(f"Access forbidden (HTTP 403). Response: {response_json}")
raise PermissionError("Kanopy denied access to this video. It may require a different library membership or authentication.")
r.raise_for_status()
play_data = response_json or r.json()
manifest_url = None
manifest_type = None
drm_info = {}
# Iterate through manifests: prefer DASH, fallback to HLS
for manifest in play_data.get("manifests", []):
manifest_type_raw = manifest["manifestType"]
url = manifest["url"].strip() # Strip whitespace from URLs
# Construct full URL if relative
url = manifest["url"].strip()
if url.startswith("/"):
url = f"https://kanopy.com{url}"
drm_type = manifest.get("drmType")
if manifest_type_raw == "dash":
manifest_url = url
manifest_type = "dash"
if drm_type == "kanopyDrm":
play_id = play_data.get("playId")
self.widevine_license_url = self.config["endpoints"]["widevine_license"].format(
@ -410,19 +490,17 @@ class KNPY(Service):
else:
self.log.warning(f"Unknown DASH drmType: {drm_type}")
self.widevine_license_url = None
break # Prefer DASH, exit loop
break
elif manifest_type_raw == "hls" and not manifest_url:
# Store HLS as fallback if DASH not found
manifest_url = url
manifest_type = "hls"
if drm_type == "fairplay":
self.log.warning("HLS with FairPlay DRM detected - not currently supported by this service")
self.widevine_license_url = None
drm_info["fairplay"] = True
else:
# HLS with no DRM or unsupported DRM type
self.widevine_license_url = None
drm_info["clear"] = True
@ -435,20 +513,29 @@ class KNPY(Service):
r = self.session.get(manifest_url)
r.raise_for_status()
# Refresh headers for manifest parsing
self.session.headers.clear()
self.session.headers.update({
"User-Agent": self.WIDEVINE_UA,
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
})
# Parse manifest based on type
if manifest_type == "dash":
tracks = DASH.from_text(r.text, url=manifest_url).to_tracks(language=title.language)
import xml.etree.ElementTree as ET
# Parse and clean the MPD to remove PlayReady ContentProtection
ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011')
ET.register_namespace('cenc', 'urn:mpeg:cenc:2013')
ET.register_namespace('mspr', 'urn:microsoft:playready')
root = ET.fromstring(r.text)
# Remove PlayReady ContentProtection elements
for adaptation_set in root.findall('.//{urn:mpeg:dash:schema:mpd:2011}AdaptationSet'):
for cp in list(adaptation_set.findall('{urn:mpeg:dash:schema:mpd:2011}ContentProtection')):
scheme_id = cp.get('schemeIdUri', '')
# Remove PlayReady but keep Widevine and CENC
if '9a04f079-9840-4286-ab92-e65be0885f95' in scheme_id:
adaptation_set.remove(cp)
self.log.debug("Removed PlayReady ContentProtection element")
cleaned_mpd = ET.tostring(root, encoding='unicode')
tracks = DASH.from_text(cleaned_mpd, url=manifest_url).to_tracks(language=title.language)
elif manifest_type == "hls":
# Try to import HLS parser from unshackle
try:
from unshackle.core.manifests import HLS
tracks = HLS.from_text(r.text, url=manifest_url).to_tracks(language=title.language)
@ -465,21 +552,47 @@ class KNPY(Service):
else:
raise ValueError(f"Unsupported manifest type: {manifest_type}")
# Add subtitles/captions from play_data (works for both DASH and HLS)
# Update session headers for CDN segment downloads
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"Origin": "https://www.kanopy.com",
"Referer": "https://www.kanopy.com/",
})
# Remove API-specific headers that CDN doesn't need
self.session.headers.pop("x-version", None)
self.session.headers.pop("authorization", None)
# START: SUBTITLE FIX
for caption_data in play_data.get("captions", []):
lang = caption_data.get("language", "en")
# Use the descriptive label for uniqueness, fallback to the language code
label = caption_data.get("label", lang)
# Create a clean, repeatable "slug" from the label for the track ID
slug = label.lower()
slug = re.sub(r'[\s\[\]\(\)]+', '-', slug) # Replace spaces and brackets with hyphens
slug = re.sub(r'[^a-z0-9-]', '', slug) # Remove other non-alphanumeric chars
slug = slug.strip('-')
# Combine with lang code for a robust, unique ID
track_id = f"caption-{lang}-{slug}"
for file_info in caption_data.get("files", []):
if file_info.get("type") == "webvtt":
tracks.add(Subtitle(
id_=f"caption-{lang}",
id_=track_id,
name=label, # Use the original label for display
url=file_info["url"].strip(),
codec=Subtitle.Codec.WebVTT,
language=Language.get(lang)
))
# Found the file for this caption entry, move to the next one
break
return tracks
# END: SUBTITLE FIX
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.widevine_license_url: