import json import re from http.cookiejar import CookieJar from typing import Optional from langcodes import Language import base64 import time import click from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Tracks, Subtitle class HIDE(Service): """ Service code for HiDive (hidive.com) Version: 1.0.0 Auth: Credential (username + password) + Refresh token supported Security: FHD@L3 Note: Only for series at the moment. """ TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P\d+))$" NO_SUBTITLES = False @staticmethod @click.command(name="HIDE", short_help="https://hidive.com") @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return HIDE(ctx, **kwargs) def __init__(self, ctx, title: str): super().__init__(ctx) m = re.match(self.TITLE_RE, title) if not m: raise ValueError(f"Unsupported HiDive URL: {title}\nUse: https://www.hidive.com/season/19079") self.season_id = int(m.group("season_id")) if not self.config: raise EnvironmentError("Missing HIDE service config.") self.cdm = ctx.obj.cdm self._auth_token = None self._refresh_token = None self._drm_cache = {} def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: base_headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US", "Referer": "https://www.hidive.com/", "Origin": "https://www.hidive.com", "x-api-key": self.config["x_api_key"], "app": "dice", "Realm": "dce.hidive", "x-app-var": self.config["x_app_var"], } self.session.headers.update(base_headers) if not credential or not credential.username or not credential.password: raise ValueError("HiDive requires email + password (--credential 'email:password')") r_login = self.session.post( self.config["endpoints"]["login"], json={"id": credential.username, "secret": credential.password} ) if r_login.status_code == 401: raise PermissionError("Invalid email or password.") r_login.raise_for_status() login_data = r_login.json() self._auth_token = login_data["authorisationToken"] self._refresh_token = login_data["refreshToken"] self.session.headers["Authorization"] = f"Bearer {self._auth_token}" self.log.info("HiDive login successful.") def _refresh_auth(self): """REFRESH THE AUTH TOKEN to prevent 401 errors.""" if not self._refresh_token: raise PermissionError("No refresh token available to renew session.") self.log.warning("Auth token expired, refreshing...") r = self.session.post( self.config["endpoints"]["refresh"], json={"refreshToken": self._refresh_token} ) if r.status_code == 401: raise PermissionError("Refresh token is invalid. Please log in again.") r.raise_for_status() data = r.json() self._auth_token = data["authorisationToken"] self.session.headers["Authorization"] = f"Bearer {self._auth_token}" self.log.info("Auth token refreshed successfully.") def _api_get(self, url, **kwargs): """Wrapper for GET requests that handles token refresh.""" response = self.session.get(url, **kwargs) if response.status_code == 401: self._refresh_auth() response = self.session.get(url, **kwargs) # Retry after refresh response.raise_for_status() return response def get_titles(self) -> Titles_T: response = self._api_get( self.config["endpoints"]["season_view"], params={"type": "season", "id": self.season_id, "timezone": "Europe/Amsterdam"} ) data = response.json() episodes = [] for elem in data.get("elements", []): if elem.get("$type") == "bucket" and elem["attributes"].get("type") == "season": for item in elem["attributes"].get("items", []): if item.get("type") != "SEASON_VOD": continue ep_title = item["title"] ep_num = 1 if ep_title.startswith("E") and " - " in ep_title: try: ep_num = int(ep_title.split(" - ")[0][1:]) except: pass episodes.append( Episode( id_=item["id"], service=self.__class__, title=data["metadata"]["series"]["title"], season=1, number=ep_num, name=item["title"], description=item.get("description", ""), language=Language.get("en"), data=item ) ) break if not episodes: raise ValueError("No episodes found in season data.") series_title = data["metadata"]["series"]["title"] for ep in episodes: ep.title = series_title return Series(sorted(episodes, key=lambda x: x.number)) def get_tracks(self, title: Title_T) -> Tracks: response = self._api_get( self.config["endpoints"]["vod"].format(vod_id=title.id), params={"includePlaybackDetails": "URL"} ) vod = response.json() playback_url = vod.get("playerUrlCallback") if not playback_url: raise ValueError("No playback URL.") r_play = self._api_get(playback_url) stream_data = r_play.json() dash = stream_data.get("dash", []) if not dash: raise ValueError("No DASH stream.") entry = dash[0] tracks = DASH.from_url(entry["url"], session=self.session).to_tracks(language=Language.get("en")) if tracks.audio: english_audio = tracks.audio[0] from copy import deepcopy japanese_audio = deepcopy(english_audio) japanese_audio.name = "Japanese" japanese_audio.language = Language.get("ja") tracks.audio = [english_audio, japanese_audio] subtitles = [] for sub in entry.get("subtitles", []): fmt = sub.get("format", "").lower() if fmt == "scc": continue lang_code = sub.get("language", "und").replace("-", "_") try: lang = Language.get(lang_code) except: lang = Language.get("und") url = sub.get("url", "").strip() if not url: continue codec = Subtitle.Codec.WebVTT if fmt == "vtt" else Subtitle.Codec.SubRip try: name = lang.language_name() except: name = lang_code subtitles.append( Subtitle( id_=f"{lang_code}:{fmt}", url=url, language=lang, is_original_lang=lang.language == "ja", codec=codec, name=name, forced=False, sdh=False ) ) tracks.subtitles = subtitles # DRM: Store info for license calls drm_data = entry.get("drm", {}) jwt = drm_data.get("jwtToken") lic_url = drm_data.get("url", "").strip() if jwt and lic_url: self._drm_cache[title.id] = (jwt, lic_url) return tracks def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]: if title.id in self._drm_cache: return self._drm_cache[title.id] self.get_tracks(title) # This will populate the cache return self._drm_cache[title.id] def _decode_hidive_license_payload(self, payload: bytes) -> bytes: text = payload.decode("utf-8", errors="ignore") prefix = "data:application/octet-stream;base64," if text.startswith(prefix): b64 = text.split(",", 1)[1] return base64.b64decode(b64) return payload def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes | str | None: jwt_token, license_url = self._hidive_get_drm_info(title) headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/octet-stream", "Accept": "*/*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "Origin": "https://www.hidive.com", "Referer": "https://www.hidive.com/", "X-DRM-INFO": "eyJzeXN0ZW0iOiJjb20ud2lkZXZpbmUuYWxwaGEifQ==" } r = self.session.post(license_url, data=challenge, headers=headers, timeout=30) r.raise_for_status() return self._decode_hidive_license_payload(r.content) def get_chapters(self, title: Title_T) -> list[Chapter]: return []