From 8289b3a709090fd3a364a1cbccc1307fe819de6c Mon Sep 17 00:00:00 2001 From: FairTrade Date: Mon, 19 Jan 2026 22:17:10 +0100 Subject: [PATCH] Add all season detection from a series --- HIDI/__init__.py | 282 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 231 insertions(+), 51 deletions(-) diff --git a/HIDI/__init__.py b/HIDI/__init__.py index c9b0552..13a9f4f 100644 --- a/HIDI/__init__.py +++ b/HIDI/__init__.py @@ -1,31 +1,37 @@ import json import re +import base64 +import hashlib +import click + from http.cookiejar import CookieJar from typing import Optional, Iterable from langcodes import Language -import base64 - -import click from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Series, Movie, Movies, Title_T, Titles_T -from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio +from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio, Video +from unshackle.core.utilities import import_module_by_path class HIDI(Service): """ Service code for HiDive (hidive.com) - Version: 1.2.0 + Version: 1.3.2 Authorization: Email + password login, with automatic token refresh. Security: FHD@L3 + + IMPORTANT: UPDATE YOUR UNSHACKLE TO 2.3.0 TO GET THE NECESSARY FIX FOR THIS SERVICE + Also when downloading a series, use the link from the first season of the series """ TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P\d+)|playlist/(?P\d+))$" GEOFENCE = () NO_SUBTITLES = False + API_BASE = "https://dce-frontoffice.imggaming.com/api/v4" @staticmethod @click.command(name="HIDI", short_help="https://hidive.com") @@ -110,36 +116,160 @@ class HIDI(Service): resp.raise_for_status() return resp - def get_titles(self) -> Titles_T: - # One endpoint for both season and playlist - resp = self._api_get( + def _fetch_season_data(self, season_id: int) -> dict: + """Fetch season view data.""" + return self._api_get( self.config["endpoints"]["view"], - params={"type": ("playlist" if self.kind == "movie" else "season"), - "id": self.content_id, - "timezone": "Europe/Amsterdam"} - ) - data = resp.json() + params={ + "type": "season", + "id": season_id, + "timezone": "Europe/Amsterdam" + } + ).json() + + def _fetch_adjacent_seasons(self, series_id: int, season_id: int) -> dict: + """Fetch all seasons in a series using adjacentTo endpoint.""" + url = f"{self.API_BASE}/series/{series_id}/adjacentTo/{season_id}" + return self._api_get(url, params={"size": 25}).json() + + def _extract_series_info(self, season_data: dict) -> tuple[Optional[int], Optional[str]]: + """ + Extract series ID and title from season data. + Checks multiple locations in the JSON structure. + """ + series_id = None + series_title = None + + # Method 1: Check metadata.series + metadata = season_data.get("metadata", {}) + if metadata.get("series"): + series_id = metadata["series"].get("seriesId") + series_title = metadata["series"].get("title") + if series_id: + return series_id, series_title + + # Method 2: Check elements for $type: "series" + for elem in season_data.get("elements", []): + if elem.get("$type") == "series": + attrs = elem.get("attributes", {}) + series_id = attrs.get("id") + series_info = attrs.get("series", {}) + series_title = series_info.get("title") or series_title + if series_id: + return series_id, series_title + + # Method 3: Check bucket elements for seriesId + for elem in season_data.get("elements", []): + if elem.get("$type") == "bucket": + attrs = elem.get("attributes", {}) + if attrs.get("seriesId"): + series_id = attrs["seriesId"] + return series_id, series_title + + # Method 4: Check hero actions for seriesId + for elem in season_data.get("elements", []): + if elem.get("$type") == "hero": + for action in elem.get("attributes", {}).get("actions", []): + action_data = action.get("attributes", {}).get("action", {}).get("data", {}) + if action_data.get("seriesId"): + series_id = action_data["seriesId"] + return series_id, series_title + + return series_id, series_title + + def _extract_season_number(self, season_data: dict) -> int: + """Extract season number from season data.""" + # Check metadata.currentSeason + metadata = season_data.get("metadata", {}) + current_season = metadata.get("currentSeason", {}) + if current_season.get("title"): + # Parse "Season 2" -> 2 + title = current_season["title"] + if title.lower().startswith("season "): + try: + return int(title.split(" ")[1]) + except (ValueError, IndexError): + pass + + # Check elements for series type with seasons info + for elem in season_data.get("elements", []): + if elem.get("$type") == "series": + seasons_items = elem.get("attributes", {}).get("seasons", {}).get("items", []) + for item in seasons_items: + if item.get("seasonNumber"): + return item["seasonNumber"] + + # Check bucket title + for elem in season_data.get("elements", []): + if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season": + bucket_title = elem.get("attributes", {}).get("bucketTitle", "") + if bucket_title.lower().startswith("season "): + try: + return int(bucket_title.split(" ")[1]) + except (ValueError, IndexError): + pass + + return 1 + + def _parse_episodes_from_season(self, season_data: dict, series_title: str, season_number: int) -> list[Episode]: + """Parse episodes from season JSON data.""" + episodes = [] + + for elem in season_data.get("elements", []): + if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season": + items = elem.get("attributes", {}).get("items", []) + + for idx, item in enumerate(items): + if item.get("type") != "SEASON_VOD": + continue + + ep_title = item.get("title", "") + ep_num = idx + 1 + + # Try to extract episode number from title "E1 - Title" + if ep_title.startswith("E") and " - " in ep_title: + try: + ep_num = int(ep_title.split(" - ")[0][1:]) + except ValueError: + pass + + episodes.append(Episode( + id_=item["id"], + service=self.__class__, + title=series_title, + season=season_number, + number=ep_num, + name=ep_title, + description=item.get("description", ""), + language=Language.get("ja"), + data=item, + )) + break + + return episodes + + def get_titles(self) -> Titles_T: + anchor_data = self._fetch_season_data(self.content_id) if self.kind == "movie": - # Find the playlist bucket, then the single VOD vod_id = None movie_title = None description = "" - for elem in data.get("elements", []): + + for elem in anchor_data.get("elements", []): if elem.get("$type") == "hero": hdr = (elem.get("attributes", {}).get("header", {}) or {}).get("attributes", {}) movie_title = hdr.get("text", movie_title) for c in elem.get("attributes", {}).get("content", []): if c.get("$type") == "textblock": description = c.get("attributes", {}).get("text", description) + if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "playlist": items = elem.get("attributes", {}).get("items", []) if items: vod_id = items[0]["id"] - if not movie_title: - movie_title = items[0].get("title") - if not description: - description = items[0].get("description", "") + movie_title = movie_title or items[0].get("title") + description = description or items[0].get("description", "") break if not vod_id: @@ -157,37 +287,86 @@ class HIDI(Service): ) ]) - # Series - episodes = [] - series_title = None - for elem in data.get("elements", []): - if elem.get("$type") == "bucket" and elem["attributes"].get("type") == "season": - for item in elem["attributes"].get("items", []): - if item.get("type") != "SEASON_VOD": - continue - ep_title = item["title"] - ep_num = 1 - if ep_title.startswith("E") and " - " in ep_title: - try: - ep_num = int(ep_title.split(" - ")[0][1:]) - except: - pass - episodes.append(Episode( - id_=item["id"], - service=self.__class__, - title=data.get("metadata", {}).get("series", {}).get("title", "") or "HiDive", - season=1, - number=ep_num, - name=item["title"], - description=item.get("description", ""), - language=Language.get("en"), - data=item, - )) - break - - if not episodes: - raise ValueError("No episodes found in season data.") - return Series(sorted(episodes, key=lambda x: x.number)) + series_id, series_title = self._extract_series_info(anchor_data) + series_title = series_title or "HiDive Series" + anchor_season_num = self._extract_season_number(anchor_data) + + if not series_id: + self.log.warning("Could not determine Series ID. Fetching single season only.") + episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num) + return Series(episodes) + + try: + adj_data = self._fetch_adjacent_seasons(series_id, self.content_id) + except Exception as e: + self.log.warning(f"Failed to fetch adjacent seasons: {e}. Falling back to single season.") + episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num) + return Series(episodes) + + # Build list of all seasons + all_seasons = [] + + # Preceding seasons (these come before current season) + for s in adj_data.get("precedingSeasons", []): + all_seasons.append({ + "id": s["id"], + "seasonNumber": s.get("seasonNumber", 0), + "title": s.get("title", "") + }) + + # Current/Anchor season + all_seasons.append({ + "id": self.content_id, + "seasonNumber": anchor_season_num, + "title": f"Season {anchor_season_num}", + "_data": anchor_data # Cache to avoid re-fetching + }) + + # Following seasons (these come after current season) + for s in adj_data.get("followingSeasons", []): + all_seasons.append({ + "id": s["id"], + "seasonNumber": s.get("seasonNumber", 0), + "title": s.get("title", "") + }) + + # Deduplicate by ID and sort by season number + unique_seasons = {} + for s in all_seasons: + s_id = s["id"] + if s_id not in unique_seasons: + unique_seasons[s_id] = s + elif "_data" in s: + # Prefer the one with cached data + unique_seasons[s_id] = s + + sorted_seasons = sorted(unique_seasons.values(), key=lambda x: x["seasonNumber"]) + + all_episodes = [] + + for season_info in sorted_seasons: + s_id = season_info["id"] + s_num = season_info["seasonNumber"] + + if "_data" in season_info: + self.log.info(f"Processing Season {s_num} (ID: {s_id}) [cached]") + season_data = season_info["_data"] + else: + self.log.info(f"Fetching Season {s_num} (ID: {s_id})") + try: + season_data = self._fetch_season_data(s_id) + except Exception as e: + self.log.error(f"Failed to fetch Season {s_num}: {e}") + continue + + episodes = self._parse_episodes_from_season(season_data, series_title, s_num) + self.log.info(f" Found {len(episodes)} episodes") + all_episodes.extend(episodes) + + if not all_episodes: + raise ValueError("No episodes found across all seasons.") + + return Series(all_episodes) def _get_audio_for_langs(self, mpd_url: str, langs: Iterable[Language]) -> list[Audio]: merged: list[Audio] = [] @@ -300,11 +479,12 @@ class HIDI(Service): return base_tracks - def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]: if title.id in self._drm_cache: return self._drm_cache[title.id] self.get_tracks(title) + if title.id not in self._drm_cache: + raise ValueError("DRM information not found for this title.") return self._drm_cache[title.id] def _decode_hidive_license_payload(self, payload: bytes) -> bytes: