diff --git a/KNPY/__init__.py b/KNPY/__init__.py index e7fa648..67ef477 100644 --- a/KNPY/__init__.py +++ b/KNPY/__init__.py @@ -274,14 +274,17 @@ class KNPY(Service): content_type = content_data.get("type") - def parse_lang(data): + def parse_lang(taxonomies_data: dict) -> Language: + """Parses language from the taxonomies dictionary.""" try: - langs = data.get("languages", []) - if langs and isinstance(langs, list) and len(langs) > 0: - return Language.find(langs[0]) - except: + langs = taxonomies_data.get("languages", []) + if langs: + lang_name = langs[0].get("name") + if lang_name: + return Language.find(lang_name) + except (IndexError, AttributeError, TypeError): pass - return Language.get("en") + return Language.get("en") # Default to English if content_type == "video": video_data = content_data["video"] @@ -291,22 +294,25 @@ class KNPY(Service): name=video_data["title"], year=video_data.get("productionYear"), description=video_data.get("descriptionHtml", ""), - language=parse_lang(video_data), + language=parse_lang(video_data.get("taxonomies", {})), data=video_data, ) return Movies([movie]) elif content_type == "playlist": - playlist_data = content_data["playlist"] + playlist_data = content_data.get("playlist") + if not playlist_data: + raise ValueError("Could not find 'playlist' data dictionary.") + series_title = playlist_data["title"] series_year = playlist_data.get("productionYear") season_match = re.search(r'(?:Season|S)\s*(\d+)', series_title, re.IGNORECASE) season_num = int(season_match.group(1)) if season_match else 1 - r = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id)) - r.raise_for_status() - items_data = r.json() + r_items = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id)) + r_items.raise_for_status() + items_data = r_items.json() episodes = [] for i, item in enumerate(items_data.get("list", [])): @@ -316,8 +322,8 @@ class KNPY(Service): video_data = item["video"] ep_num = i + 1 - ep_title = video_data.get("title", "") - ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title, re.IGNORECASE) + ep_title_str = video_data.get("title", "") + ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title_str, re.IGNORECASE) if ep_match: ep_num = int(ep_match.group(1)) @@ -331,7 +337,7 @@ class KNPY(Service): name=video_data["title"], description=video_data.get("descriptionHtml", ""), year=video_data.get("productionYear", series_year), - language=parse_lang(video_data), + language=parse_lang(video_data.get("taxonomies", {})), data=video_data, ) ) @@ -341,6 +347,83 @@ class KNPY(Service): series.description = playlist_data.get("descriptionHtml", "") series.year = series_year return series + + elif content_type == "collection": + collection_data = content_data.get("collection") + if not collection_data: + raise ValueError("Could not find 'collection' data dictionary.") + + series_title_main = collection_data["title"] + series_description_main = collection_data.get("descriptionHtml", "") + series_year_main = collection_data.get("productionYear") + + r_seasons = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id)) + r_seasons.raise_for_status() + seasons_data = r_seasons.json() + + all_episodes = [] + self.log.info(f"Processing collection '{series_title_main}', found {len(seasons_data.get('list', []))} seasons.") + + season_counter = 1 + for season_item in seasons_data.get("list", []): + if season_item.get("type") != "playlist": + self.log.warning(f"Skipping unexpected item of type '{season_item.get('type')}' in collection.") + continue + + season_playlist_data = season_item["playlist"] + season_id = season_playlist_data["videoId"] + season_title = season_playlist_data["title"] + + self.log.info(f"Fetching episodes for season: {season_title}") + + season_match = re.search(r'(?:Season|S)\s*(\d+)', season_title, re.IGNORECASE) + if season_match: + season_num = int(season_match.group(1)) + else: + self.log.warning(f"Could not parse season number from '{season_title}'. Using sequential number {season_counter}.") + season_num = season_counter + season_counter += 1 + + r_episodes = self.session.get(self.config["endpoints"]["video_items"].format(video_id=season_id, domain_id=self._domain_id)) + r_episodes.raise_for_status() + episodes_data = r_episodes.json() + + for i, episode_item in enumerate(episodes_data.get("list", [])): + if episode_item.get("type") != "video": + continue + + video_data = episode_item["video"] + ep_num = i + 1 + + ep_title_str = video_data.get("title", "") + ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title_str, re.IGNORECASE) + if ep_match: + ep_num = int(ep_match.group(1)) + + all_episodes.append( + Episode( + id_=str(video_data["videoId"]), + service=self.__class__, + title=series_title_main, + season=season_num, + number=ep_num, + name=video_data["title"], + description=video_data.get("descriptionHtml", ""), + year=video_data.get("productionYear", series_year_main), + language=parse_lang(video_data.get("taxonomies", {})), + data=video_data, + ) + ) + + if not all_episodes: + self.log.error(f"Collection '{series_title_main}' did not yield any episodes. The structure may have changed.") + return Series([]) + + series = Series(all_episodes) + series.name = series_title_main + series.description = series_description_main + series.year = series_year_main + return series else: raise ValueError(f"Unsupported content type: {content_type}") @@ -364,7 +447,6 @@ class KNPY(Service): except Exception: pass - # Handle known errors gracefully if r.status_code == 403: if response_json and response_json.get("errorSubcode") == "playRegionRestricted": self.log.error("Kanopy reports: This video is not available in your country.") @@ -374,29 +456,27 @@ class KNPY(Service): else: self.log.error(f"Access forbidden (HTTP 403). Response: {response_json}") raise PermissionError("Kanopy denied access to this video. It may require a different library membership or authentication.") - + r.raise_for_status() play_data = response_json or r.json() manifest_url = None manifest_type = None drm_info = {} - - # Iterate through manifests: prefer DASH, fallback to HLS + for manifest in play_data.get("manifests", []): manifest_type_raw = manifest["manifestType"] - url = manifest["url"].strip() # Strip whitespace from URLs - - # Construct full URL if relative + url = manifest["url"].strip() + if url.startswith("/"): url = f"https://kanopy.com{url}" - + drm_type = manifest.get("drmType") - + if manifest_type_raw == "dash": manifest_url = url manifest_type = "dash" - + if drm_type == "kanopyDrm": play_id = play_data.get("playId") self.widevine_license_url = self.config["endpoints"]["widevine_license"].format( @@ -410,19 +490,17 @@ class KNPY(Service): else: self.log.warning(f"Unknown DASH drmType: {drm_type}") self.widevine_license_url = None - break # Prefer DASH, exit loop - + break + elif manifest_type_raw == "hls" and not manifest_url: - # Store HLS as fallback if DASH not found manifest_url = url manifest_type = "hls" - + if drm_type == "fairplay": self.log.warning("HLS with FairPlay DRM detected - not currently supported by this service") self.widevine_license_url = None drm_info["fairplay"] = True else: - # HLS with no DRM or unsupported DRM type self.widevine_license_url = None drm_info["clear"] = True @@ -435,20 +513,29 @@ class KNPY(Service): r = self.session.get(manifest_url) r.raise_for_status() - # Refresh headers for manifest parsing - self.session.headers.clear() - self.session.headers.update({ - "User-Agent": self.WIDEVINE_UA, - "Accept": "*/*", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - }) - - # Parse manifest based on type if manifest_type == "dash": - tracks = DASH.from_text(r.text, url=manifest_url).to_tracks(language=title.language) + import xml.etree.ElementTree as ET + + # Parse and clean the MPD to remove PlayReady ContentProtection + ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011') + ET.register_namespace('cenc', 'urn:mpeg:cenc:2013') + ET.register_namespace('mspr', 'urn:microsoft:playready') + + root = ET.fromstring(r.text) + + # Remove PlayReady ContentProtection elements + for adaptation_set in root.findall('.//{urn:mpeg:dash:schema:mpd:2011}AdaptationSet'): + for cp in list(adaptation_set.findall('{urn:mpeg:dash:schema:mpd:2011}ContentProtection')): + scheme_id = cp.get('schemeIdUri', '') + # Remove PlayReady but keep Widevine and CENC + if '9a04f079-9840-4286-ab92-e65be0885f95' in scheme_id: + adaptation_set.remove(cp) + self.log.debug("Removed PlayReady ContentProtection element") + + cleaned_mpd = ET.tostring(root, encoding='unicode') + tracks = DASH.from_text(cleaned_mpd, url=manifest_url).to_tracks(language=title.language) + elif manifest_type == "hls": - # Try to import HLS parser from unshackle try: from unshackle.core.manifests import HLS tracks = HLS.from_text(r.text, url=manifest_url).to_tracks(language=title.language) @@ -465,21 +552,47 @@ class KNPY(Service): else: raise ValueError(f"Unsupported manifest type: {manifest_type}") - # Add subtitles/captions from play_data (works for both DASH and HLS) + # Update session headers for CDN segment downloads + self.session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9", + "Origin": "https://www.kanopy.com", + "Referer": "https://www.kanopy.com/", + }) + # Remove API-specific headers that CDN doesn't need + self.session.headers.pop("x-version", None) + self.session.headers.pop("authorization", None) + + # START: SUBTITLE FIX for caption_data in play_data.get("captions", []): lang = caption_data.get("language", "en") + # Use the descriptive label for uniqueness, fallback to the language code + label = caption_data.get("label", lang) + + # Create a clean, repeatable "slug" from the label for the track ID + slug = label.lower() + slug = re.sub(r'[\s\[\]\(\)]+', '-', slug) # Replace spaces and brackets with hyphens + slug = re.sub(r'[^a-z0-9-]', '', slug) # Remove other non-alphanumeric chars + slug = slug.strip('-') + + # Combine with lang code for a robust, unique ID + track_id = f"caption-{lang}-{slug}" + for file_info in caption_data.get("files", []): if file_info.get("type") == "webvtt": tracks.add(Subtitle( - id_=f"caption-{lang}", + id_=track_id, + name=label, # Use the original label for display url=file_info["url"].strip(), codec=Subtitle.Codec.WebVTT, language=Language.get(lang) )) + # Found the file for this caption entry, move to the next one break - - return tracks + # END: SUBTITLE FIX + return tracks def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.widevine_license_url: