Compare commits

...

5 Commits
main ... main

12 changed files with 1540 additions and 222 deletions

View File

@ -1,31 +1,37 @@
import json
import re
import base64
import hashlib
import click
from http.cookiejar import CookieJar
from typing import Optional, Iterable
from langcodes import Language
import base64
import click
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Series, Movie, Movies, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio
from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio, Video
from unshackle.core.utilities import import_module_by_path
class HIDI(Service):
"""
Service code for HiDive (hidive.com)
Version: 1.2.0
Version: 1.3.2
Authorization: Email + password login, with automatic token refresh.
Security: FHD@L3
IMPORTANT: UPDATE YOUR UNSHACKLE TO 2.3.0 TO GET THE NECESSARY FIX FOR THIS SERVICE
Also when downloading a series, use the link from the first season of the series
"""
TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P<season_id>\d+)|playlist/(?P<playlist_id>\d+))$"
GEOFENCE = ()
NO_SUBTITLES = False
API_BASE = "https://dce-frontoffice.imggaming.com/api/v4"
@staticmethod
@click.command(name="HIDI", short_help="https://hidive.com")
@ -110,36 +116,160 @@ class HIDI(Service):
resp.raise_for_status()
return resp
def get_titles(self) -> Titles_T:
# One endpoint for both season and playlist
resp = self._api_get(
def _fetch_season_data(self, season_id: int) -> dict:
"""Fetch season view data."""
return self._api_get(
self.config["endpoints"]["view"],
params={"type": ("playlist" if self.kind == "movie" else "season"),
"id": self.content_id,
"timezone": "Europe/Amsterdam"}
)
data = resp.json()
params={
"type": "season",
"id": season_id,
"timezone": "Europe/Amsterdam"
}
).json()
def _fetch_adjacent_seasons(self, series_id: int, season_id: int) -> dict:
"""Fetch all seasons in a series using adjacentTo endpoint."""
url = f"{self.API_BASE}/series/{series_id}/adjacentTo/{season_id}"
return self._api_get(url, params={"size": 25}).json()
def _extract_series_info(self, season_data: dict) -> tuple[Optional[int], Optional[str]]:
"""
Extract series ID and title from season data.
Checks multiple locations in the JSON structure.
"""
series_id = None
series_title = None
# Method 1: Check metadata.series
metadata = season_data.get("metadata", {})
if metadata.get("series"):
series_id = metadata["series"].get("seriesId")
series_title = metadata["series"].get("title")
if series_id:
return series_id, series_title
# Method 2: Check elements for $type: "series"
for elem in season_data.get("elements", []):
if elem.get("$type") == "series":
attrs = elem.get("attributes", {})
series_id = attrs.get("id")
series_info = attrs.get("series", {})
series_title = series_info.get("title") or series_title
if series_id:
return series_id, series_title
# Method 3: Check bucket elements for seriesId
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket":
attrs = elem.get("attributes", {})
if attrs.get("seriesId"):
series_id = attrs["seriesId"]
return series_id, series_title
# Method 4: Check hero actions for seriesId
for elem in season_data.get("elements", []):
if elem.get("$type") == "hero":
for action in elem.get("attributes", {}).get("actions", []):
action_data = action.get("attributes", {}).get("action", {}).get("data", {})
if action_data.get("seriesId"):
series_id = action_data["seriesId"]
return series_id, series_title
return series_id, series_title
def _extract_season_number(self, season_data: dict) -> int:
"""Extract season number from season data."""
# Check metadata.currentSeason
metadata = season_data.get("metadata", {})
current_season = metadata.get("currentSeason", {})
if current_season.get("title"):
# Parse "Season 2" -> 2
title = current_season["title"]
if title.lower().startswith("season "):
try:
return int(title.split(" ")[1])
except (ValueError, IndexError):
pass
# Check elements for series type with seasons info
for elem in season_data.get("elements", []):
if elem.get("$type") == "series":
seasons_items = elem.get("attributes", {}).get("seasons", {}).get("items", [])
for item in seasons_items:
if item.get("seasonNumber"):
return item["seasonNumber"]
# Check bucket title
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season":
bucket_title = elem.get("attributes", {}).get("bucketTitle", "")
if bucket_title.lower().startswith("season "):
try:
return int(bucket_title.split(" ")[1])
except (ValueError, IndexError):
pass
return 1
def _parse_episodes_from_season(self, season_data: dict, series_title: str, season_number: int) -> list[Episode]:
"""Parse episodes from season JSON data."""
episodes = []
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season":
items = elem.get("attributes", {}).get("items", [])
for idx, item in enumerate(items):
if item.get("type") != "SEASON_VOD":
continue
ep_title = item.get("title", "")
ep_num = idx + 1
# Try to extract episode number from title "E1 - Title"
if ep_title.startswith("E") and " - " in ep_title:
try:
ep_num = int(ep_title.split(" - ")[0][1:])
except ValueError:
pass
episodes.append(Episode(
id_=item["id"],
service=self.__class__,
title=series_title,
season=season_number,
number=ep_num,
name=ep_title,
description=item.get("description", ""),
language=Language.get("ja"),
data=item,
))
break
return episodes
def get_titles(self) -> Titles_T:
anchor_data = self._fetch_season_data(self.content_id)
if self.kind == "movie":
# Find the playlist bucket, then the single VOD
vod_id = None
movie_title = None
description = ""
for elem in data.get("elements", []):
for elem in anchor_data.get("elements", []):
if elem.get("$type") == "hero":
hdr = (elem.get("attributes", {}).get("header", {}) or {}).get("attributes", {})
movie_title = hdr.get("text", movie_title)
for c in elem.get("attributes", {}).get("content", []):
if c.get("$type") == "textblock":
description = c.get("attributes", {}).get("text", description)
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "playlist":
items = elem.get("attributes", {}).get("items", [])
if items:
vod_id = items[0]["id"]
if not movie_title:
movie_title = items[0].get("title")
if not description:
description = items[0].get("description", "")
movie_title = movie_title or items[0].get("title")
description = description or items[0].get("description", "")
break
if not vod_id:
@ -157,37 +287,86 @@ class HIDI(Service):
)
])
# Series
episodes = []
series_title = None
for elem in data.get("elements", []):
if elem.get("$type") == "bucket" and elem["attributes"].get("type") == "season":
for item in elem["attributes"].get("items", []):
if item.get("type") != "SEASON_VOD":
continue
ep_title = item["title"]
ep_num = 1
if ep_title.startswith("E") and " - " in ep_title:
try:
ep_num = int(ep_title.split(" - ")[0][1:])
except:
pass
episodes.append(Episode(
id_=item["id"],
service=self.__class__,
title=data.get("metadata", {}).get("series", {}).get("title", "") or "HiDive",
season=1,
number=ep_num,
name=item["title"],
description=item.get("description", ""),
language=Language.get("en"),
data=item,
))
break
if not episodes:
raise ValueError("No episodes found in season data.")
return Series(sorted(episodes, key=lambda x: x.number))
series_id, series_title = self._extract_series_info(anchor_data)
series_title = series_title or "HiDive Series"
anchor_season_num = self._extract_season_number(anchor_data)
if not series_id:
self.log.warning("Could not determine Series ID. Fetching single season only.")
episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num)
return Series(episodes)
try:
adj_data = self._fetch_adjacent_seasons(series_id, self.content_id)
except Exception as e:
self.log.warning(f"Failed to fetch adjacent seasons: {e}. Falling back to single season.")
episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num)
return Series(episodes)
# Build list of all seasons
all_seasons = []
# Preceding seasons (these come before current season)
for s in adj_data.get("precedingSeasons", []):
all_seasons.append({
"id": s["id"],
"seasonNumber": s.get("seasonNumber", 0),
"title": s.get("title", "")
})
# Current/Anchor season
all_seasons.append({
"id": self.content_id,
"seasonNumber": anchor_season_num,
"title": f"Season {anchor_season_num}",
"_data": anchor_data # Cache to avoid re-fetching
})
# Following seasons (these come after current season)
for s in adj_data.get("followingSeasons", []):
all_seasons.append({
"id": s["id"],
"seasonNumber": s.get("seasonNumber", 0),
"title": s.get("title", "")
})
# Deduplicate by ID and sort by season number
unique_seasons = {}
for s in all_seasons:
s_id = s["id"]
if s_id not in unique_seasons:
unique_seasons[s_id] = s
elif "_data" in s:
# Prefer the one with cached data
unique_seasons[s_id] = s
sorted_seasons = sorted(unique_seasons.values(), key=lambda x: x["seasonNumber"])
all_episodes = []
for season_info in sorted_seasons:
s_id = season_info["id"]
s_num = season_info["seasonNumber"]
if "_data" in season_info:
self.log.info(f"Processing Season {s_num} (ID: {s_id}) [cached]")
season_data = season_info["_data"]
else:
self.log.info(f"Fetching Season {s_num} (ID: {s_id})")
try:
season_data = self._fetch_season_data(s_id)
except Exception as e:
self.log.error(f"Failed to fetch Season {s_num}: {e}")
continue
episodes = self._parse_episodes_from_season(season_data, series_title, s_num)
self.log.info(f" Found {len(episodes)} episodes")
all_episodes.extend(episodes)
if not all_episodes:
raise ValueError("No episodes found across all seasons.")
return Series(all_episodes)
def _get_audio_for_langs(self, mpd_url: str, langs: Iterable[Language]) -> list[Audio]:
merged: list[Audio] = []
@ -300,11 +479,12 @@ class HIDI(Service):
return base_tracks
def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]:
if title.id in self._drm_cache:
return self._drm_cache[title.id]
self.get_tracks(title)
if title.id not in self._drm_cache:
raise ValueError("DRM information not found for this title.")
return self._drm_cache[title.id]
def _decode_hidive_license_payload(self, payload: bytes) -> bytes:

509
HPLA/__init__.py Normal file
View File

@ -0,0 +1,509 @@
import base64
import hashlib
import json
import re
from typing import Optional, Union, Generator
import click
from langcodes import Language
from lxml import etree
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Movie, Movies, Title_T, Titles_T, Song, Album
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Audio
class HPLA(Service):
"""
Service code for Hoopla Digital (https://www.hoopladigital.com)
Version: 1.0.7
Authorization: Credentials (Email & Password)
Security:
- SL2K/SL3K/L1/L3: SD/360p
They are using the license server of DRMToday with encoded streams from CastLabs.
Supports movie and music (but kinda broken) at the moment
Television kinda sucks since you need to borrow it one by one, idk why people would want this shit quality series anyways
Use full URL (for example - https://www.hoopladigital.com/movie/title-name/10979706) or content ID.
"""
ALIASES = ("HPLA", "hoopla")
TITLE_RE = r"^(?:https?://(?:www\.)?hoopladigital\.com/[^/]*/[^/]*/)?(?P<title_id>\d+)"
GEOFENCE = ("US",)
@staticmethod
@click.command(name="HPLA", short_help="https://www.hoopladigital.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.pass_context
def cli(ctx, **kwargs):
return HPLA(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
self.profile = profile_name if profile_name else "default"
self.platform = self.config["platform"]["amazon"]
def authenticate(self, cookies: Optional[any] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential or not credential.username or not credential.password:
raise EnvironmentError("Service requires Credentials for Authentication.")
self.credential = credential
self.session.headers.update(self.platform["headers"])
cache_key = f"tokens_{self.profile}"
cache = self.cache.get(cache_key)
if cache and not cache.expired:
cached_data = cache.data
if isinstance(cached_data, dict) and cached_data.get("username") == credential.username:
self.log.info("Using cached tokens")
self._restore_from_cache(cached_data)
return
self.log.info("Logging in...")
self._do_login(credential)
self._cache_tokens(credential.username, cache_key)
def _restore_from_cache(self, cached_data: dict) -> None:
"""Restore authentication state from cached data."""
self.access_token = cached_data["access_token"]
self.patron_id = cached_data["patron_id"]
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"patron-id": self.patron_id,
})
def _cache_tokens(self, username: str, cache_key: str) -> None:
"""Cache the current authentication tokens."""
cache = self.cache.get(cache_key)
cache.set(
data={
"username": username,
"access_token": self.access_token,
"patron_id": self.patron_id,
},
expiration=3600
)
def _is_music_mpd(self, mpd: etree._Element) -> bool:
"""
Detect if MPD represents a single-file music asset.
"""
adaptation_sets = mpd.findall(".//AdaptationSet")
for aset in adaptation_sets:
if aset.get("contentType") == "video":
return False
audio_reps = mpd.findall(".//AdaptationSet[@contentType='audio']/Representation")
if len(audio_reps) != 1:
return False
if mpd.find(".//SegmentTemplate") is not None:
return False
return mpd.find(".//BaseURL") is not None
def _extract_music_audio(self, mpd: etree._Element, manifest_url: str) -> str:
base = mpd.find(".//BaseURL")
if base is None or not base.text:
raise ValueError("Music MPD has no BaseURL")
return manifest_url.rsplit("/", 1)[0] + "/" + base.text
def _do_login(self, credential: Credential) -> None:
"""Perform full login flow."""
# Step 1: Get Bearer Token
login_response = self.session.post(
url=self.config["endpoints"]["login"],
data={
"username": credential.username,
"password": credential.password,
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
).json()
if login_response.get("tokenStatus") != "SUCCESS":
raise EnvironmentError(f"Login failed: {login_response.get('tokenStatus', 'Unknown error')}")
self.access_token = login_response["token"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
# Step 2: Get Patron ID
self.log.info("Fetching Patron ID...")
query = 'query { patron { id email } }'
patron_data = self.session.post(
url=self.config["endpoints"]["graphql"],
json={"query": query},
headers={"Content-Type": "application/json"}
).json()
self.patron_id = patron_data["data"]["patron"]["id"]
self.session.headers.update({"patron-id": self.patron_id})
self.log.debug(f"Logged in as Patron ID: {self.patron_id}")
def search(self) -> Generator[SearchResult, None, None]:
query = """
query GetFilterSearchQuery($criteria: SearchCriteria!, $sort: Sort) {
search(criteria: $criteria, sort: $sort) {
hits {
id
title
kind { name }
}
}
}
"""
payload = {
"operationName": "GetFilterSearchQuery",
"variables": {
"criteria": {
"q": self.title,
"availability": "ALL_TITLES",
"pagination": {
"page": 1,
"pageSize": 48,
},
}
},
"query": query,
}
resp = self.session.post(
self.config["endpoints"]["graphql"],
json=payload,
headers={"Content-Type": "application/json"},
).json()
hits = (
resp
.get("data", {})
.get("search", {})
.get("hits", [])
)
for hit in hits:
kind = hit["kind"]["name"]
label = {
"MOVIE": "MOVIE",
"TVSHOW": "SERIES",
"MUSIC": "ALBUM",
"AUDIOBOOK": "AUDIOBOOK",
"EBOOK": "BOOK",
"COMIC": "COMIC",
}.get(kind, kind)
yield SearchResult(
id_=hit["id"],
title=hit["title"],
label=label,
url=f"https://www.hoopladigital.com/title/{hit['id']}",
)
def get_titles(self) -> Titles_T:
title_match = re.match(self.TITLE_RE, self.title)
if not title_match:
raise ValueError(f"Invalid title format: {self.title}")
content_id = title_match.group("title_id")
query = """
query {
contents(criteria:{contentIds:[%s]}) {
contents {
id
title
kind { id name }
mediaKey
circulation { id dueDate }
year
seconds
primaryArtist { name }
tracks {
id
mediaKey
name
seconds
segmentNumber
}
}
}
}
""" % content_id
data = self.session.post(
url=self.config["endpoints"]["graphql"],
json={"query": query},
headers={"Content-Type": "application/json"}
).json()
contents = data.get("data", {}).get("contents", {}).get("contents", [])
if not contents:
raise ValueError("Content not found")
meta = contents[0]
kind_name = meta["kind"]["name"]
if not meta.get("circulation"):
raise ValueError("You must borrow this title on your Hoopla account before downloading.")
if kind_name == "MOVIE":
return Movies([
Movie(
id_=meta["id"],
service=self.__class__,
name=meta["title"],
year=int(meta["year"]) if meta.get("year") else None,
language=Language.get("en"),
data={
"mediaKey": meta["mediaKey"],
"circulationId": meta["circulation"]["id"],
"is_music": False,
},
)
])
elif kind_name == "MUSIC":
if not meta.get("tracks"):
# Single-track album? Use main mediaKey
songs = [
Song(
id_=meta["id"],
service=self.__class__,
name=meta["title"],
artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"),
album=meta["title"],
track=1,
disc=1,
year=int(meta["year"]) if meta.get("year") else None,
data={
"mediaKey": meta["mediaKey"],
"circulationId": meta["circulation"]["id"],
"is_music": True,
}
)
]
else:
songs = []
for idx, track in enumerate(meta["tracks"], start=1):
songs.append(
Song(
id_=track["id"],
service=self.__class__,
name=track["name"],
artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"),
album=meta["title"],
track=track.get("segmentNumber", idx),
disc=1,
year=int(meta["year"]) if meta.get("year") else None,
data={
"mediaKey": track["mediaKey"], # ← Per-track mediaKey!
"circulationId": meta["circulation"]["id"],
"is_music": True,
}
)
)
return Album(songs)
else:
raise ValueError(f"Unsupported content type: {kind_name}. Only MOVIE and MUSIC are supported.")
def get_tracks(self, title: Title_T) -> Tracks:
media_key = title.data["mediaKey"]
circulation_id = title.data["circulationId"]
# --- DRM bootstrap ---
self.asset_id = self.session.get(
self.config["endpoints"]["license_asset"].format(media_key=media_key)
).text.strip()
self.auth_token = self.session.get(
self.config["endpoints"]["license_token"].format(
media_key=media_key,
patron_id=self.patron_id,
circulation_id=circulation_id,
)
).text.strip()
self.custom_data = self._extract_custom_data(self.auth_token)
manifest_url = self.config["endpoints"]["manifest"].format(media_key=media_key)
mpd_xml = self.session.get(manifest_url).text
mpd_xml = self._strip_namespaces(mpd_xml)
mpd = etree.fromstring(mpd_xml.encode("utf-8"))
if self._is_music_mpd(mpd):
self.log.info("Detected Hoopla music MPD")
audio_url = self._extract_music_audio(mpd, manifest_url)
tracks = Tracks()
tracks.add(
Audio(
url=audio_url,
drm=[],
codec=Audio.Codec.AAC,
language=title.language or "en",
channels=2,
)
)
return tracks
self.log.info("Detected Hoopla movie MPD")
tracks = DASH(mpd, manifest_url).to_tracks(
language=title.language or Language.get("en")
)
self._add_subtitles(tracks, manifest_url, media_key)
return tracks
def _strip_namespaces(self, xml_string: str) -> str:
"""
Strip namespace declarations and prefixes from XML string.
This is needed because unshackle's DASH parser expects plain 'MPD' tag,
not '{urn:mpeg:dash:schema:mpd:2011}MPD'.
"""
# Remove xmlns declarations (both default and prefixed)
xml_string = re.sub(r'\s+xmlns(:\w+)?="[^"]+"', '', xml_string)
# Remove namespace prefixes from element tags (e.g., <cenc:pssh> -> <pssh>)
xml_string = re.sub(r'<(/?)(\w+):', r'<\1', xml_string)
# Remove namespace prefixes from attributes (e.g., cenc:default_KID -> default_KID)
xml_string = re.sub(r'\s+\w+:(\w+)=', r' \1=', xml_string)
# Remove urn: prefixed attributes entirely (e.g., urn:assetId="...")
xml_string = re.sub(r'\s+urn:\w+="[^"]+"', '', xml_string)
return xml_string
def _extract_custom_data(self, jwt_token: str) -> str:
"""Extract and encode optData from JWT for dt-custom-data header."""
try:
jwt_parts = jwt_token.split(".")
padded_payload = jwt_parts[1] + "=" * (-len(jwt_parts[1]) % 4)
payload_json = json.loads(base64.urlsafe_b64decode(padded_payload))
opt_data_str = payload_json.get("optData")
if not opt_data_str:
raise ValueError("optData not found in JWT")
return base64.b64encode(opt_data_str.encode("utf-8")).decode("utf-8")
except Exception as e:
raise ValueError(f"Failed to process license token: {e}")
def _add_subtitles(self, tracks: Tracks, manifest_url: str, media_key: str) -> None:
"""Add VTT subtitles from manifest if available."""
base_url = manifest_url.rsplit('/', 1)[0]
vtt_patterns = [
f"{base_url}/{media_key}-8784525650515056532-en/{media_key}-8784525650515056532-en.vtt",
]
for vtt_url in vtt_patterns:
try:
response = self.session.head(vtt_url)
if response.status_code == 200:
tracks.add(
Subtitle(
id_=hashlib.md5(vtt_url.encode()).hexdigest()[0:6],
url=vtt_url,
codec=Subtitle.Codec.WebVTT,
language=Language.get("en"),
sdh=True,
)
)
break
except Exception:
pass
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_) -> Optional[str]:
return self.config.get("certificate")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
response = self.session.post(
url=self.config["endpoints"]["license_wv"],
params={
"logRequestId": "unshackle",
"assetId": self.asset_id,
},
headers={
"dt-custom-data": self.custom_data,
"x-dt-auth-token": self.auth_token,
"Content-Type": "text/xml",
},
data=challenge,
)
if response.status_code != 200:
self.log.error(f"License Error: {response.text}")
raise ValueError(f"Failed to get Widevine license: {response.status_code}")
return response.json().get("license")
def get_playready_license(self, *, challenge: bytes | str, title: Title_T, track: AnyTrack) -> bytes:
if not hasattr(self, 'auth_token') or not hasattr(self, 'custom_data'):
raise RuntimeError("Authentication tokens missing. Call get_tracks() first.")
if isinstance(challenge, str):
request_body = challenge.encode('utf-8')
else:
request_body = challenge
headers = {
"Accept": "*/*",
"Accept-Language": "nl",
"Cache-Control": "no-cache",
"Content-Type": "text/xml; charset=utf-8",
"dt-custom-data": self.custom_data,
"x-dt-auth-token": self.auth_token,
"soapaction": '"http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"',
"Origin": "https://www.hoopladigital.com",
"Referer": "https://www.hoopladigital.com/",
"Pragma": "no-cache",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
}
response = self.session.post(
url=self.config["endpoints"]["license_pr"],
data=request_body,
headers=headers,
timeout=30
)
if response.status_code != 200:
self.log.error(f"PlayReady license failed: {response.status_code}")
self.log.error(f"Response: {response.text[:1000]}")
raise ValueError(f"PlayReady license failed: HTTP {response.status_code}")
return response.content

22
HPLA/config.yaml Normal file
View File

@ -0,0 +1,22 @@
endpoints:
login: https://patron-api-gateway.hoopladigital.com/core/tokens
graphql: https://patron-api-gateway.hoopladigital.com/graphql
manifest: https://dash.hoopladigital.com/{media_key}/Manifest.mpd
license_asset: https://patron-api-gateway.hoopladigital.com/license/castlabs/asset-id/{media_key}
license_token: https://patron-api-gateway.hoopladigital.com/license/castlabs/upfront-auth-tokens/{media_key}/{patron_id}/{circulation_id}
license_wv: https://lic.drmtoday.com/license-proxy-widevine/cenc/
license_pr: https://lic.drmtoday.com/license-proxy-headerauth/drmtoday/RightsManager.asmx?persistent=false
platform:
amazon:
headers:
app: AMAZON
device-model: SM-A525F
os: AMAZON
User-Agent: Hoopla Amazon/4.84.1
app-version: "4.84.1"
os-version: "15"
ws-api: "2.1"
device-version: a52q
hoopla-version: "4.84.1"
Accept-Language: en-US

View File

@ -14,8 +14,10 @@ from unshackle.core.search_result import SearchResult
from unshackle.core.titles import Episode, Series, Title_T, Titles_T
from unshackle.core.tracks import Subtitle, Tracks
from unshackle.core.utilities import is_close_match
import uuid
import hashlib
class KOWP(Service):
class KOCW(Service):
"""
Service code for Kocowa Plus (kocowa.com).
Version: 1.0.0
@ -29,12 +31,12 @@ class KOWP(Service):
NO_SUBTITLES = False
@staticmethod
@click.command(name="kowp", short_help="https://www.kocowa.com")
@click.command(name="kocw", short_help="https://www.kocowa.com")
@click.argument("title", type=str)
@click.option("--extras", is_flag=True, default=False, help="Include teasers/extras")
@click.pass_context
def cli(ctx, **kwargs):
return KOWP(ctx, **kwargs)
return KOCW(ctx, **kwargs)
def __init__(self, ctx, title: str, extras: bool = False):
super().__init__(ctx)
@ -52,16 +54,27 @@ class KOWP(Service):
if not credential:
raise ValueError("KOWP requires username and password")
email = credential.username.lower().strip()
uuid_seed = hashlib.md5(email.encode()).digest()
fake_uuid = str(uuid.UUID(bytes=uuid_seed[:16]))
device_id = f"a_{fake_uuid}_{email}"
push_token = "fkiTs_a0SAaMYx957n-qA-:APA91bFb39IjJd_iA5bVmh-fjvaUKonvKDWw1PfKKcdpkSXanj0Jlevv_QlMPPD5ZykAQE4ELa3bs6p-Gnmz0R54U-B1o1ukBPLQEDLDdM3hU2ozZIRiy9I"
payload = {
"username": credential.username,
"password": credential.password,
"device_id": f"{credential.username}_browser",
"device_type": "browser",
"device_model": "Firefox",
"device_version": "firefox/143.0",
"device_id": device_id,
"device_type": "mobile",
"device_model": "SM-A525F",
"device_version": "Android 15",
"push_token": None,
"app_version": "v4.0.16",
"app_version": "v4.0.11",
}
self.log.debug(f"Authenticating with device_id: {device_id}")
r = self.session.post(
self.config["endpoints"]["login"],
json=payload,
@ -294,4 +307,3 @@ class KOWP(Service):
def get_chapters(self, title: Title_T) -> list:
return []

View File

@ -7,69 +7,75 @@ from langcodes import Language
import base64
import click
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.credential import Credential
from unshackle.core.titles import Episode, Movie, Movies, Title_T, Titles_T, Series
from unshackle.core.tracks import Chapter, Tracks, Subtitle
from unshackle.core.search_result import SearchResult
class MUBI(Service):
"""
Service code for MUBI (mubi.com)
Version: 1.2.0
Authorization: Required cookies (lt token + session)
Security: FHD @ L3 (Widevine)
Version: 1.2.1 (Cookie-only + Auto-UHD + Search)
Authorization: Cookies ONLY (lt token + _mubi_session)
Security: UHD @ L3/SL2K (Widevine/PlayReady)
Supports:
Series https://mubi.com/en/nl/series/twin-peaks
Movies https://mubi.com/en/nl/films/the-substance
Series https://mubi.com/en/nl/series/twin-peaks
Movies https://mubi.com/en/nl/films/the-substance
"""
SERIES_TITLE_RE = r"^https?://(?:www\.)?mubi\.com(?:/[^/]+)*?/series/(?P<series_slug>[^/]+)(?:/season/(?P<season_slug>[^/]+))?$"
TITLE_RE = r"^(?:https?://(?:www\.)?mubi\.com)(?:/[^/]+)*?/films/(?P<slug>[^/?#]+)$"
NO_SUBTITLES = False
@staticmethod
@click.command(name="MUBI", short_help="https://mubi.com")
@click.command(name="MUBI", short_help="https://mubi.com ")
@click.argument("title", type=str)
@click.option("-c", "--country", default=None, type=str,
help="With VPN set country code other than the one assigned to the account.")
@click.pass_context
def cli(ctx, **kwargs):
return MUBI(ctx, **kwargs)
def __init__(self, ctx, title: str):
def __init__(self, ctx, title: str, country: str):
super().__init__(ctx)
self.raw_title = title # Store raw input for search mode
self.country = country
# Only parse as URL if it matches MUBI patterns
m_film = re.match(self.TITLE_RE, title)
m_series = re.match(self.SERIES_TITLE_RE, title)
if not m_film and not m_series:
raise ValueError(f"Invalid MUBI URL: {title}")
self.is_series = bool(m_series)
self.slug = m_film.group("slug") if m_film else None
self.series_slug = m_series.group("series_slug") if m_series else None
self.season_slug = m_series.group("season_slug") if m_series else None
# Core state
self.film_id: Optional[int] = None
self.lt_token: Optional[str] = None
self.session_token: Optional[str] = None
self.user_id: Optional[int] = None
self.country_code: Optional[str] = None
self.set_country_code: Optional[str] = country
self.anonymous_user_id: Optional[str] = None
self.default_country: Optional[str] = None
self.reels_data: Optional[list] = None
# Store CDM reference
self.reels_data: Optional[list] = None
# ALWAYS enable UHD/HEVC path - no user flag required
self.uhd = True
self.cdm = ctx.obj.cdm
if self.config is None:
raise EnvironmentError("Missing service config for MUBI.")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise PermissionError("MUBI requires login cookies (lt + _mubi_session). Credentials login is not supported.")
# IP geolocation for country detection
try:
r_ip = self.session.get(self.config["endpoints"]["ip_geolocation"], timeout=5)
r_ip.raise_for_status()
@ -82,36 +88,34 @@ class MUBI(Service):
except Exception as e:
raise ValueError(f"Failed to fetch IP geolocation: {e}")
if not cookies:
raise PermissionError("MUBI requires login cookies.")
# Extract essential tokens
# Extract essential tokens from cookies
lt_cookie = next((c for c in cookies if c.name == "lt"), None)
session_cookie = next((c for c in cookies if c.name == "_mubi_session"), None)
snow_id_cookie = next((c for c in cookies if c.name == "_snow_id.c006"), None)
if not lt_cookie:
raise PermissionError("Missing 'lt' cookie (Bearer token).")
if not session_cookie:
raise PermissionError("Missing '_mubi_session' cookie.")
self.lt_token = lt_cookie.value
self.session_token = session_cookie.value
# Extract anonymous_user_id from _snow_id.c006
# Extract or generate anonymous_user_id
if snow_id_cookie and "." in snow_id_cookie.value:
self.anonymous_user_id = snow_id_cookie.value.split(".")[0]
else:
self.anonymous_user_id = str(uuid.uuid4())
self.log.warning(f"No _snow_id.c006 cookie found — generated new anonymous_user_id: {self.anonymous_user_id}")
# Configure session headers for UHD access
base_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Firefox/143.0",
"User-Agent": "Mozilla/5.0 (Linux; Android 13; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Origin": "https://mubi.com",
"Referer": "https://mubi.com/",
"CLIENT": "web",
"Client-Accept-Video-Codecs": "h265,vp9,h264",
"Client-Accept-Audio-Codecs": "aac",
"Client-Accept-Audio-Codecs": "eac3,ac3,aac",
"Authorization": f"Bearer {self.lt_token}",
"ANONYMOUS_USER_ID": self.anonymous_user_id,
"Client-Country": self.default_country,
@ -121,22 +125,24 @@ class MUBI(Service):
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
self.session.headers.update(base_headers)
# Fetch account info
r_account = self.session.get(self.config["endpoints"]["account"])
if not r_account.ok:
raise PermissionError(f"Failed to fetch MUBI account: {r_account.status_code} {r_account.text}")
account_data = r_account.json()
self.user_id = account_data.get("id")
self.country_code = (account_data.get("country") or {}).get("code", "NL")
if self.set_country_code is not None:
self.country_code = self.set_country_code.upper()
self.session.headers["Client-Country"] = self.country_code
self.GEOFENCE = (self.country_code,)
self._bind_anonymous_user()
self.log.info(
f"Authenticated as user {self.user_id}, "
f"country: {self.country_code}, "
@ -168,31 +174,29 @@ class MUBI(Service):
r = self.session.get(url)
r.raise_for_status()
data = r.json()
self.film_id = data["id"]
# Fetch reels to get definitive language code and cache the response
# Fetch reels for language detection and subtitle names
url_reels = self.config["endpoints"]["reels"].format(film_id=self.film_id)
r_reels = self.session.get(url_reels)
r_reels.raise_for_status()
self.reels_data = r_reels.json()
# Extract original language from the first audio track of the first reel
original_language_code = "en" # Default fallback
# Detect original language from first audio track
original_language_code = "en"
if self.reels_data and self.reels_data[0].get("audio_tracks"):
first_audio_track = self.reels_data[0]["audio_tracks"][0]
if "language_code" in first_audio_track:
original_language_code = first_audio_track["language_code"]
self.log.debug(f"Detected original language from reels: '{original_language_code}'")
genres = ", ".join(data.get("genres", [])) or "Unknown"
self.log.debug(f"Detected original language from reels: '{original_language_code}'")
description = (
data.get("default_editorial_html", "")
.replace("<p>", "").replace("</p>", "").replace("<em>", "").replace("</em>", "").strip()
)
year = data.get("year")
name = data.get("title", "Unknown")
movie = Movie(
id_=self.film_id,
service=self.__class__,
@ -202,19 +206,15 @@ class MUBI(Service):
language=Language.get(original_language_code),
data=data,
)
return Movies([movie])
def _get_series_titles(self) -> Titles_T:
# Fetch series metadata
series_url = self.config["endpoints"]["series"].format(series_slug=self.series_slug)
r_series = self.session.get(series_url)
r_series.raise_for_status()
series_data = r_series.json()
episodes = []
# If season is explicitly specified, only fetch that season
if self.season_slug:
eps_url = self.config["endpoints"]["season_episodes"].format(
series_slug=self.series_slug,
@ -227,123 +227,126 @@ class MUBI(Service):
episodes_data = r_eps.json().get("episodes", [])
self._add_episodes_to_list(episodes, episodes_data, series_data)
else:
# No season specified fetch ALL seasons
seasons = series_data.get("seasons", [])
if not seasons:
raise ValueError("No seasons found for this series.")
for season in seasons:
season_slug = season["slug"]
eps_url = self.config["endpoints"]["season_episodes"].format(
series_slug=self.series_slug,
season_slug=season_slug
)
self.log.debug(f"Fetching episodes for season: {season_slug}")
r_eps = self.session.get(eps_url)
# Stop if season returns 404 or empty
if r_eps.status_code == 404:
self.log.info(f"Season '{season_slug}' not available, skipping.")
continue
r_eps.raise_for_status()
episodes_data = r_eps.json().get("episodes", [])
if not episodes_data:
self.log.info(f"No episodes found in season '{season_slug}'.")
continue
self._add_episodes_to_list(episodes, episodes_data, series_data)
from unshackle.core.titles import Series
return Series(sorted(episodes, key=lambda x: (x.season, x.number)))
def _add_episodes_to_list(self, episodes_list: list, episodes_data: list, series_data: dict):
"""Helper to avoid code duplication when adding episodes."""
for ep in episodes_data:
# Use episode's own language detection via its consumable.playback_languages
playback_langs = ep.get("consumable", {}).get("playback_languages", {})
audio_langs = playback_langs.get("audio_options", ["English"])
lang_code = audio_langs[0].split()[0].lower() if audio_langs else "en"
try:
detected_lang = Language.get(lang_code)
except:
detected_lang = Language.get("en")
episodes_list.append(Episode(
id_=ep["id"],
service=self.__class__,
title=series_data["title"], # Series title
title=series_data["title"],
season=ep["episode"]["season_number"],
number=ep["episode"]["number"],
name=ep["title"], # Episode title
name=ep["title"],
description=ep.get("short_synopsis", ""),
language=detected_lang,
data=ep, # Full episode data for later use in get_tracks
data=ep,
))
def get_tracks(self, title: Title_T) -> Tracks:
film_id = getattr(title, "id", None)
if not film_id:
raise RuntimeError("Title ID not found.")
# For series episodes, we don't have reels cached, so skip reel-based logic
# Initiate viewing session
url_view = self.config["endpoints"]["initiate_viewing"].format(film_id=film_id)
r_view = self.session.post(url_view, json={}, headers={"Content-Type": "application/json"})
r_view.raise_for_status()
view_data = r_view.json()
reel_id = view_data["reel_id"]
# For films, use reels data for language/audio mapping
if not self.is_series:
if not self.film_id:
raise RuntimeError("film_id not set. Call get_titles() first.")
if not self.reels_data:
self.log.warning("Reels data not cached, fetching now.")
url_reels = self.config["endpoints"]["reels"].format(film_id=film_id)
r_reels = self.session.get(url_reels)
r_reels.raise_for_status()
reels = r_reels.json()
else:
reels = self.reels_data
reel = next((r for r in reels if r["id"] == reel_id), reels[0])
else:
# For episodes, we dont need reel-based logic — just proceed
pass
# Request secure streaming URL, works for both films and episodes
# Fetch reels data if not cached
if not self.film_id:
self.film_id = film_id
if not self.reels_data:
url_reels = self.config["endpoints"]["reels"].format(film_id=film_id)
r_reels = self.session.get(url_reels)
r_reels.raise_for_status()
self.reels_data = r_reels.json()
reels = self.reels_data
text_tracks_reel = reels[0]["text_tracks"]
reel = next((r for r in reels if r["id"] == reel_id), reels[0])
# Get secure streaming URL
url_secure = self.config["endpoints"]["secure_url"].format(film_id=film_id)
r_secure = self.session.get(url_secure)
r_secure.raise_for_status()
secure_data = r_secure.json()
# Find DASH manifest URL
manifest_url = None
for entry in secure_data.get("urls", []):
if entry.get("content_type") == "application/dash+xml":
manifest_url = entry["src"]
break
if not manifest_url:
raise ValueError("No DASH manifest URL found.")
# Parse DASH, use title.language as fallback
manifest_url = re.sub(
r'/default/ver1\.AVC1\.[^/]*\.mpd',
'/default/ver1.hevc.ex-vtt.mpd',
manifest_url
)
# Fallback for non-AVC URLs
if '/default/ver1.hevc.ex-vtt.mpd' not in manifest_url:
manifest_url = re.sub(
r'/default/[^/]*\.mpd',
'/default/ver1.hevc.ex-vtt.mpd',
manifest_url
)
# Parse DASH manifest
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
# Add subtitles
# Add enhanced subtitles (forced/SDH detection)
subtitles = []
for sub in secure_data.get("text_track_urls", []):
lang_code = sub.get("language_code", "und")
vtt_url = sub.get("url")
role = sub.get("role")
forced = False
sdh = False
if not vtt_url:
continue
try:
disp_name = (next(filter(lambda x: x['id'] == sub["id"], text_tracks_reel), None))["display_name"]
except:
disp_name = sub.get("role", "") + " " + lang_code.upper()
if role == "forced-subtitle":
forced = True
if role == "caption":
sdh = True
if "(SDH)" in disp_name:
disp_name = disp_name.replace("(SDH)", "").strip()
is_original = lang_code == title.language.language
subtitles.append(
Subtitle(
id_=sub["id"],
@ -351,38 +354,64 @@ class MUBI(Service):
language=Language.get(lang_code),
is_original_lang=is_original,
codec=Subtitle.Codec.WebVTT,
name=sub.get("display_name", lang_code.upper()),
forced=False,
sdh=False,
name=disp_name,
forced=forced,
sdh=sdh,
)
)
tracks.subtitles = subtitles
return tracks
def search(self) -> Generator[SearchResult, None, None]:
"""
Search MUBI films using official API endpoint.
Returns only playable films with proper metadata formatting.
"""
params = {
"query": self.raw_title,
"page": 1,
"per_page": 24,
"playable": "true",
"all_films_on_zero_hits": "true"
}
response = self.session.get(
url=self.config["endpoints"]["search"],
params=params
)
response.raise_for_status()
results = response.json()
for film in results.get("films", []):
display_title = f"{film['title']} ({film['year']})"
yield SearchResult(
id_=film["id"],
title=display_title,
label="MOVIE",
url=film["web_url"].rstrip() # Clean trailing spaces
)
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.user_id:
raise RuntimeError("user_id not set — authenticate first.")
# Cookie-based license request (NO dtinfo - credentials removed)
dt_custom_data = {
"userId": self.user_id,
"sessionId": self.lt_token,
"merchant": "mubi"
}
dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode()
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"User-Agent": "Mozilla/5.0 (Linux; Android 13; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Accept": "*/*",
"Origin": "https://mubi.com",
"Referer": "https://mubi.com/",
"dt-custom-data": dt_custom_data_b64,
}
r = self.session.post(
self.config["endpoints"]["license"],
data=challenge,
@ -394,3 +423,30 @@ class MUBI(Service):
raise PermissionError(f"DRM license error: {license_data}")
return base64.b64decode(license_data["license"])
def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.user_id:
raise RuntimeError("user_id not set — authenticate first.")
# Cookie-based PlayReady license request (NO dtinfo - credentials removed)
dt_custom_data = {
"userId": self.user_id,
"sessionId": self.lt_token,
"merchant": "mubi"
}
dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
"Accept": "*/*",
"Origin": "https://mubi.com",
"Referer": "https://mubi.com/",
"dt-custom-data": dt_custom_data_b64,
}
r = self.session.post(
self.config["endpoints"]["license_pr"],
data=challenge,
headers=headers,
)
r.raise_for_status()
if r.status_code != 200:
raise PermissionError(f"DRM license error")
return r.content

View File

@ -1,12 +1,14 @@
endpoints:
account: "https://api.mubi.com/v4/account"
current_user: "https://api.mubi.com/v4/current_user"
film_by_slug: "https://api.mubi.com/v4/films/{slug}"
playback_languages: "https://api.mubi.com/v4/films/{film_id}/playback_languages"
initiate_viewing: "https://api.mubi.com/v4/films/{film_id}/viewing?parental_lock_enabled=true"
reels: "https://api.mubi.com/v4/films/{film_id}/reels"
secure_url: "https://api.mubi.com/v4/films/{film_id}/viewing/secure_url"
license: "https://lic.drmtoday.com/license-proxy-widevine/cenc/"
ip_geolocation: "https://directory.cookieyes.com/api/v1/ip"
series: "https://api.mubi.com/v4/series/{series_slug}"
season_episodes: "https://api.mubi.com/v4/series/{series_slug}/seasons/{season_slug}/episodes/available"
account: "https://api.mubi.com/v4/account"
current_user: "https://api.mubi.com/v4/current_user"
film_by_slug: "https://api.mubi.com/v4/films/{slug}"
playback_languages: "https://api.mubi.com/v4/films/{film_id}/playback_languages"
initiate_viewing: "https://api.mubi.com/v4/films/{film_id}/viewing?parental_lock_enabled=true"
reels: "https://api.mubi.com/v4/films/{film_id}/reels"
secure_url: "https://api.mubi.com/v4/films/{film_id}/viewing/secure_url"
license: "https://lic.drmtoday.com/license-proxy-widevine/cenc/"
ip_geolocation: "https://directory.cookieyes.com/api/v1/ip"
series: "https://api.mubi.com/v4/series/{series_slug}"
season_episodes: "https://api.mubi.com/v4/series/{series_slug}/seasons/{season_slug}/episodes/available"
license_pr: "https://lic.drmtoday.com/license-proxy-headerauth/drmtoday/RightsManager.asmx?persistent=false"
search: "https://api.mubi.com/v4/search/films"

View File

@ -16,24 +16,26 @@ from unshackle.core.tracks import Tracks
class PTHS(Service):
"""
Service code for Pathé Thuis (pathe-thuis.nl)
Version: 1.0.0
Version: 1.1.0 (PlayReady Support Added)
Security: SD @ L3 (Widevine)
FHD @ L1
Authorization: Cookies or authentication token
Security: SD/FHD @ L1/L3 (Widevine)
SD/FHD @ SL2K/SL3K (Playready)
Authorization: Cookies with authenticationToken + XSRF-TOKEN
Supported:
Movies https://www.pathe-thuis.nl/film/{id}
Note:
Pathé Thuis does not have episodic content, only movies.
Subtitles are hardcoded here so yeah I can't do anything about it
The quality is depend on what you rented for, is it SD or HD?
"""
TITLE_RE = (
r"^(?:https?://(?:www\.)?pathe-thuis\.nl/film/)?(?P<id>\d+)(?:/[^/]+)?$"
)
GEOFENCE = ("NL",)
NO_SUBTITLES = True
NO_SUBTITLES = True
@staticmethod
@click.command(name="PTHS", short_help="https://www.pathe-thuis.nl")
@ -44,17 +46,15 @@ class PTHS(Service):
def __init__(self, ctx, title: str):
super().__init__(ctx)
m = re.match(self.TITLE_RE, title)
if not m:
raise ValueError(
f"Unsupported Pathé Thuis URL or ID: {title}\n"
"Use e.g. https://www.pathe-thuis.nl/film/30591"
)
self.movie_id = m.group("id")
self.drm_token = None
self.license_url = None
if self.config is None:
raise EnvironmentError("Missing service config for Pathé Thuis.")
@ -65,18 +65,27 @@ class PTHS(Service):
self.log.warning("No cookies provided, proceeding unauthenticated.")
return
token = next((c.value for c in cookies if c.name == "authenticationToken"), None)
if not token:
# Extract critical cookies
auth_token = next((c.value for c in cookies if c.name == "authenticationToken"), None)
xsrf_token = next((c.value for c in cookies if c.name == "XSRF-TOKEN"), None)
if not auth_token:
self.log.info("No authenticationToken cookie found, unauthenticated mode.")
return
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"X-Pathe-Device-Identifier": "web-widevine-1",
"X-Pathe-Auth-Session-Token": token,
})
self.log.info("Authentication token successfully attached to session.")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
"X-Pathe-Device-Identifier": "web-1",
"X-Pathe-Auth-Session-Token": auth_token,
}
if xsrf_token:
headers["X-XSRF-TOKEN"] = xsrf_token
self.log.debug(f"XSRF-TOKEN header set: {xsrf_token[:10]}...")
self.session.headers.update(headers)
auth_status = "with XSRF" if xsrf_token else "without XSRF"
self.log.info(f"Authentication token attached ({auth_status}).")
def get_titles(self) -> Titles_T:
url = self.config["endpoints"]["metadata"].format(movie_id=self.movie_id)
@ -90,16 +99,16 @@ class PTHS(Service):
name=data["name"],
description=data.get("intro", ""),
year=data.get("year"),
language=Language.get(data.get("language", "en")),
language=Language.get(data.get("language", "nl")), # Default to Dutch
data=data,
)
return Movies([movie])
def get_tracks(self, title: Title_T) -> Tracks:
ticket_id = self._get_ticket_id(title)
url = self.config["endpoints"]["ticket"].format(ticket_id=ticket_id)
base_url = self.config["endpoints"]["ticket"].format(ticket_id=ticket_id)
url = f"{base_url}?drmType=dash-widevine"
r = self.session.get(url)
r.raise_for_status()
data = r.json()
@ -107,16 +116,17 @@ class PTHS(Service):
manifest_url = stream.get("url") or stream.get("drmurl")
if not manifest_url:
raise ValueError("No stream manifest URL found.")
raise ValueError("No stream manifest URL found in ticket response.")
# Store DRM context for license acquisition
self.drm_token = stream["token"]
self.license_url = stream["rawData"]["licenseserver"]
drm_type = stream["rawData"].get("type", "unknown")
self.log.info(f"Acquired {drm_type.upper()} stream manifest. License URL set.")
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
return tracks
def _get_ticket_id(self, title: Title_T) -> str:
"""Fetch the user's owned ticket ID if present."""
data = title.data
@ -125,12 +135,45 @@ class PTHS(Service):
return str(t["id"])
raise ValueError("No valid ticket found for this movie. Ensure purchase or login.")
def get_chapters(self, title: Title_T):
return []
def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
"""
Acquire PlayReady license using the authentication token.
Matches the license request pattern observed in browser traffic.
"""
if not self.license_url or not self.drm_token:
raise ValueError("Missing license URL or DRM token. Call get_tracks() first.")
headers = {
"Content-Type": "application/octet-stream",
"Authorization": f"Bearer {self.drm_token}",
}
params = {"custom_data": self.drm_token}
self.log.debug(f"Requesting PlayReady license from {self.license_url}")
r = self.session.post(
self.license_url,
params=params,
data=challenge,
headers=headers,
timeout=10
)
r.raise_for_status()
if not r.content or len(r.content) < 10:
raise ValueError(
"Invalid PlayReady license response. "
"Check: 1) Valid session 2) XSRF token 3) Active rental/purchase"
)
self.log.info(f"Successfully acquired PlayReady license ({len(r.content)} bytes)")
return r.content
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
"""Widevine license acquisition . """
if not self.license_url or not self.drm_token:
raise ValueError("Missing license URL or token.")
@ -138,7 +181,6 @@ class PTHS(Service):
"Content-Type": "application/octet-stream",
"Authorization": f"Bearer {self.drm_token}",
}
params = {"custom_data": self.drm_token}
r = self.session.post(self.license_url, params=params, data=challenge, headers=headers)
@ -146,4 +188,4 @@ class PTHS(Service):
if not r.content:
raise ValueError("Empty license response, likely invalid or expired token.")
return r.content
return r.content

View File

@ -14,13 +14,12 @@
- Audio mislabel as English
- To add Playready Support
3. PTHS:
- To add Playready Support (is needed since L3 is just 480p)
- Search Functionality
- Account login if possible
4. HIDI:
- Subtitle is a bit misplace if second sentences came up making the last sentence on the first order and vice versa (needs to be fixed)
5. MUBI:
- Search Functionality
- Creds login
6. VIKI:
- CSRF Token is now scraped, would be from a api requests soon
7. VIDO:
@ -32,9 +31,15 @@
- Search functionality
- Fixing few hickups
10. SKST (the hardest service I ever dealt upon now):
- Subtitles is a litte bit hit or miss for movies and for series there's still no subtitles
- Subtitle has been fixed, hopefully no issue
11. VLD:
- Token isn't cached so that's a major problem with series
12. HPLA:
- No support for Television yet
- Music needs to be fixed since the output is a mp4 instead of m4a
- Acknowledgment
Thanks to Adef for the NPO start downloader.
Thanks to UPS0 for fixing MUBI script

View File

@ -697,18 +697,14 @@ class SKST(Service):
protection = playback_data.get("protection", {})
self.drm_license_url = protection.get("licenceAcquisitionUrl")
self.license_token = protection.get("licenceToken")
self.license_token = protection.get("licenceToken")
manifest_url = manifest_url + "&audio=all&subtitle=all"
dash = DASH.from_url(manifest_url, session=self.session)
tracks = dash.to_tracks(language=title.language)
# Remove default subtitle tracks and add properly processed ones
for track in list(tracks.subtitles):
tracks.subtitles.remove(track)
subtitles = self._process_subtitles(dash, str(title.language))
tracks.add(subtitles)
return tracks
@staticmethod
@ -1045,4 +1041,4 @@ class SKST(Service):
# )
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
return []

465
VLD/__init__.py Normal file
View File

@ -0,0 +1,465 @@
import re
import uuid
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Optional, Union
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks
class VLD(Service):
"""
Service code for RTL's Dutch streaming service Videoland (https://v2.videoland.com)
Version: 1.0.0
Authorization: Credentials
Security:
- L1: >= 720p
- L3: <= 576p
They are using the license server of DRMToday with encoded streams from CastLabs.
It accepts Non-Whitelisted CDMs so every unrevoked L1 CDM should work.
Use full URL (for example - https://v2.videoland.com/title-p_12345) or title slug.
"""
ALIASES = ("VLD", "videoland")
TITLE_RE = r"^(?:https?://(?:www\.)?v2\.videoland\.com/)?(?P<title_id>[a-zA-Z0-9_-]+)"
GEOFENCE = ("NL",)
@staticmethod
@click.command(name="Videoland", short_help="https://v2.videoland.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.pass_context
def cli(ctx, **kwargs):
return VLD(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
self.cdm = ctx.obj.cdm
self.device_id = str(uuid.uuid1().int)
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
self.profile = profile_name if profile_name else "default"
self.platform = self.config["platform"]["android_tv"]
self.platform_token = "token-androidtv-3"
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential or not credential.username or not credential.password:
raise EnvironmentError("Service requires Credentials for Authentication.")
self.credential = credential # Store for potential re-auth
self.session.headers.update({
"origin": "https://v2.videoland.com",
"x-client-release": self.config["sdk"]["version"],
"x-customer-name": "rtlnl",
})
# Build cache key
cache_key = f"tokens_{self.profile}"
# Check cache first
cache = self.cache.get(cache_key)
if cache and not cache.expired:
cached_data = cache.data
if isinstance(cached_data, dict) and cached_data.get("username") == credential.username:
self.log.info("Using cached tokens")
self._restore_from_cache(cached_data)
return
# Perform fresh login
self.log.info("Retrieving new tokens")
self._do_login(credential)
# Cache the tokens
self._cache_tokens(credential.username, cache_key)
def _restore_from_cache(self, cached_data: dict) -> None:
"""Restore authentication state from cached data."""
self.access_token = cached_data["access_token"]
self.gigya_uid = cached_data["gigya_uid"]
self.profile_id = cached_data["profile_id"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
def _cache_tokens(self, username: str, cache_key: str) -> None:
"""Cache the current authentication tokens."""
cache = self.cache.get(cache_key)
cache.set(
data={
"username": username,
"access_token": self.access_token,
"gigya_uid": self.gigya_uid,
"profile_id": self.profile_id,
},
expiration=3600 # 1 hour expiration, adjust as needed
)
def _do_login(self, credential: Credential) -> None:
"""Perform full login flow."""
# Step 1: Authorize with Gigya
auth_response = self.session.post(
url=self.config["endpoints"]["authorization"],
data={
"loginID": credential.username,
"password": credential.password,
"sessionExpiration": "0",
"targetEnv": "jssdk",
"include": "profile,data",
"includeUserInfo": "true",
"lang": "nl",
"ApiKey": self.config["sdk"]["apikey"],
"authMode": "cookie",
"pageURL": "https://v2.videoland.com/",
"sdkBuild": self.config["sdk"]["build"],
"format": "json",
},
).json()
if auth_response.get("errorMessage"):
raise EnvironmentError(f"Could not authorize Videoland account: {auth_response['errorMessage']!r}")
self.gigya_uid = auth_response["UID"]
uid_signature = auth_response["UIDSignature"]
signature_timestamp = auth_response["signatureTimestamp"]
# Step 2: Get initial JWT token
jwt_headers = {
"x-auth-device-id": self.device_id,
"x-auth-device-player-size-height": "3840",
"x-auth-device-player-size-width": "2160",
"X-Auth-gigya-signature": uid_signature,
"X-Auth-gigya-signature-timestamp": signature_timestamp,
"X-Auth-gigya-uid": self.gigya_uid,
"X-Client-Release": self.config["sdk"]["version"],
"X-Customer-Name": "rtlnl",
}
jwt_response = self.session.get(
url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform),
headers=jwt_headers,
).json()
if jwt_response.get("error"):
raise EnvironmentError(f"Could not get Access Token: {jwt_response['error']['message']!r}")
initial_token = jwt_response["token"]
# Step 3: Get profiles
profiles_response = self.session.get(
url=self.config["endpoints"]["profiles"].format(
platform=self.platform,
gigya=self.gigya_uid,
),
headers={"Authorization": f"Bearer {initial_token}"},
).json()
if isinstance(profiles_response, dict) and profiles_response.get("error"):
raise EnvironmentError(f"Could not get profiles: {profiles_response['error']['message']!r}")
self.profile_id = profiles_response[0]["uid"]
# Step 4: Get final JWT token with profile
jwt_headers["X-Auth-profile-id"] = self.profile_id
final_jwt_response = self.session.get(
url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform),
headers=jwt_headers,
).json()
if final_jwt_response.get("error"):
raise EnvironmentError(f"Could not get final Access Token: {final_jwt_response['error']['message']!r}")
self.access_token = final_jwt_response["token"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
def search(self) -> Generator[SearchResult, None, None]:
# Videoland doesn't have a documented search endpoint in the original code
# This is a placeholder - you may need to implement based on actual API
raise NotImplementedError("Search is not implemented for Videoland")
def get_titles(self) -> Titles_T:
title_match = re.match(self.TITLE_RE, self.title)
if not title_match:
raise ValueError(f"Invalid title format: {self.title}")
title_slug = title_match.group("title_id")
# Handle folder URLs (e.g., title-f_12345)
if re.match(r".+?-f_[0-9]+", title_slug):
title_slug = self._get_program_title(title_slug)
# Extract title ID from slug (e.g., "show-name-p_12345" -> "12345")
title_id = title_slug.split("-p_")[-1] if "-p_" in title_slug else title_slug
metadata = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"program/{title_id}",
),
params={"nbPages": "10"},
).json()
# Check for API errors
if isinstance(metadata, dict) and metadata.get("error"):
raise ValueError(f"API Error: {metadata.get('message', 'Unknown error')}")
# Determine if it's a movie based on metadata
is_movie = "Seizoen" not in str(metadata)
if is_movie:
movie_info = metadata["blocks"][0]["content"]["items"][0]
viewable_id = movie_info["itemContent"]["action"]["target"]["value_layout"]["id"]
return Movies([
Movie(
id_=movie_info["ucid"],
service=self.__class__,
name=metadata["entity"]["metadata"]["title"],
year=None,
language=Language.get("nl"),
data={
"viewable": viewable_id,
"metadata": metadata,
},
)
])
else:
seasons = [
block
for block in metadata["blocks"]
if block["featureId"] == "videos_by_season_by_program"
]
# Fetch all episodes from all seasons with pagination
for season in seasons:
while len(season["content"]["items"]) != season["content"]["pagination"]["totalItems"]:
season_data = self.session.get(
url=self.config["endpoints"]["seasoning"].format(
platform=self.platform,
token=self.platform_token,
program=title_id,
season_id=season["id"],
),
params={
"nbPages": "10",
"page": season["content"]["pagination"]["nextPage"],
},
).json()
for episode in season_data["content"]["items"]:
if episode not in season["content"]["items"]:
season["content"]["items"].append(episode)
season["content"]["pagination"]["nextPage"] = season_data["content"]["pagination"]["nextPage"]
episodes = []
for season in seasons:
# Extract season number from title like "Seizoen 1" or "Season 1"
season_title = season.get("title", {}).get("long", "")
season_match = re.search(r"(\d+)", season_title)
season_number = int(season_match.group(1)) if season_match else 1
for idx, episode_data in enumerate(season["content"]["items"]):
# Get the extra title which contains episode info
extra_title = episode_data["itemContent"].get("extraTitle", "")
# Extract episode number from extraTitle like "1. Hondenadoptiedag" or "14. Een Draak Op School (Deel 1)"
episode_number = None
episode_name = extra_title
ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title)
if ep_match:
episode_number = int(ep_match.group(1))
episode_name = ep_match.group(2)
else:
# Fallback to index + 1
episode_number = idx + 1
viewable_id = episode_data["itemContent"]["action"]["target"]["value_layout"]["id"]
episodes.append(
Episode(
id_=episode_data["ucid"],
service=self.__class__,
title=metadata["entity"]["metadata"]["title"],
season=season_number,
number=episode_number,
name=episode_name,
year=None,
language=Language.get("nl"),
data={
"viewable": viewable_id,
"episode_data": episode_data,
},
)
)
# Sort episodes by season and episode number
episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number))
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
viewable_id = title.data["viewable"]
manifest_response = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"video/{viewable_id}",
),
params={"nbPages": "2"},
).json()
player_block = next(
(block for block in manifest_response["blocks"] if block["templateId"] == "Player"),
None,
)
if not player_block:
raise ValueError("Could not find player block in manifest")
assets = player_block["content"]["items"][0]["itemContent"]["video"]["assets"]
if not assets:
raise ValueError("Failed to load content manifest - no assets found")
# Prefer HD quality
mpd_asset = next((asset for asset in assets if asset["quality"] == "hd"), None)
if not mpd_asset:
mpd_asset = next((asset for asset in assets if asset["quality"] == "sd"), None)
if not mpd_asset:
raise ValueError("No suitable quality stream found")
mpd_url = mpd_asset["path"]
# Extract PlayReady PSSH from manifest
manifest_content = self.session.get(mpd_url).text
pssh_matches = re.findall(r'<cenc:pssh>(.+?)</cenc:pssh>', manifest_content)
self.pssh_playready = None
for pssh in pssh_matches:
if len(pssh) > 200:
self.pssh_playready = pssh
break
# Store viewable ID for license request
self.current_viewable = viewable_id
tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language)
# Fix track URLs - replace CDN hostname
for track in tracks:
if hasattr(track, 'url') and track.url:
if isinstance(track.url, list):
track.url = [
re.sub(
r"https://.+?\.videoland\.bedrock\.tech",
"https://origin.vod.videoland.bedrock.tech",
uri.split("?")[0],
)
for uri in track.url
]
elif isinstance(track.url, str):
track.url = re.sub(
r"https://.+?\.videoland\.bedrock\.tech",
"https://origin.vod.videoland.bedrock.tech",
track.url.split("?")[0],
)
# Handle subtitles
for subtitle in tracks.subtitles:
if isinstance(subtitle.url, list) or (isinstance(subtitle.url, str) and "dash" in subtitle.url):
subtitle.codec = Subtitle.Codec.SubRip
else:
self.log.warning("Unknown subtitle codec detected")
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_) -> Optional[str]:
return self.config.get("certificate")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
license_token = self._get_license_token(title)
response = self.session.post(
url=self.config["endpoints"]["license_wv"],
data=challenge,
headers={"x-dt-auth-token": license_token},
)
if response.status_code != 200:
raise ValueError(f"Failed to get Widevine license: {response.status_code}")
return response.json().get("license")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
license_token = self._get_license_token(title)
response = self.session.post(
url=self.config["endpoints"]["license_pr"],
data=challenge,
headers={"x-dt-auth-token": license_token},
)
if response.status_code != 200:
raise ValueError(f"Failed to get PlayReady license: {response.status_code}")
return response.content
def _get_license_token(self, title: Title_T) -> str:
viewable_id = title.data["viewable"]
response = self.session.get(
url=self.config["endpoints"]["license_token"].format(
platform=self.platform,
gigya=self.gigya_uid,
clip=viewable_id,
),
).json()
return response["token"]
def _get_program_title(self, folder_title: str) -> str:
folder_id = folder_title.split("-f_")[1]
response = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"folder/{folder_id}",
),
params={"nbPages": "2"},
).json()
target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"]["target"]["value_layout"]
parent_seo = target["parent"]["seo"]
parent_id = target["parent"]["id"]
return f"{parent_seo}-p_{parent_id}"

29
VLD/config.yaml Normal file
View File

@ -0,0 +1,29 @@
certificate: |
CsECCAMSEBcFuRfMEgSGiwYzOi93KowYgrSCkgUijgIwggEKAoIBAQCZ7Vs7Mn2rXiTvw7YqlbWYUgrVvMs3UD4GRbgU2Ha430BRBEGtjOOtsRu4jE5yWl5
KngeVKR1YWEAjp+GvDjipEnk5MAhhC28VjIeMfiG/+/7qd+EBnh5XgeikX0YmPRTmDoBYqGB63OBPrIRXsTeo1nzN6zNwXZg6IftO7L1KEMpHSQykfqpdQ4
IY3brxyt4zkvE9b/tkQv0x4b9AsMYE0cS6TJUgpL+X7r1gkpr87vVbuvVk4tDnbNfFXHOggrmWEguDWe3OJHBwgmgNb2fG2CxKxfMTRJCnTuw3r0svAQxZ6
ChD4lgvC2ufXbD8Xm7fZPvTCLRxG88SUAGcn1oJAgMBAAE6FGxpY2Vuc2Uud2lkZXZpbmUuY29tEoADrjRzFLWoNSl/JxOI+3u4y1J30kmCPN3R2jC5MzlR
HrPMveoEuUS5J8EhNG79verJ1BORfm7BdqEEOEYKUDvBlSubpOTOD8S/wgqYCKqvS/zRnB3PzfV0zKwo0bQQQWz53ogEMBy9szTK/NDUCXhCOmQuVGE98K/
PlspKkknYVeQrOnA+8XZ/apvTbWv4K+drvwy6T95Z0qvMdv62Qke4XEMfvKUiZrYZ/DaXlUP8qcu9u/r6DhpV51Wjx7zmVflkb1gquc9wqgi5efhn9joLK3
/bNixbxOzVVdhbyqnFk8ODyFfUnaq3fkC3hR3f0kmYgI41sljnXXjqwMoW9wRzBMINk+3k6P8cbxfmJD4/Paj8FwmHDsRfuoI6Jj8M76H3CTsZCZKDJjM3B
QQ6Kb2m+bQ0LMjfVDyxoRgvfF//M/EEkPrKWyU2C3YBXpxaBquO4C8A0ujVmGEEqsxN1HX9lu6c5OMm8huDxwWFd7OHMs3avGpr7RP7DUnTikXrh6X0
endpoints:
layout: https://layout.videoland.bedrock.tech/front/v1/rtlnl/{platform}/main/{token}/{endpoint}/layout
seasoning: https://layout.videoland.bedrock.tech/front/v1/rtlnl/{platform}/main/{token}/program/{program}/block/{season_id}
license_pr: https://lic.drmtoday.com/license-proxy-headerauth/drmtoday/RightsManager.asmx
license_wv: https://lic.drmtoday.com/license-proxy-widevine/cenc/
license_token: https://drm.videoland.bedrock.tech/v1/customers/rtlnl/platforms/{platform}/services/videoland/users/{gigya}/videos/{clip}/upfront-token
authorization: https://accounts.eu1.gigya.com/accounts.login
jwt_tokens: https://front-auth.videoland.bedrock.tech/v2/platforms/{platform}/getJwt
profiles: https://users.videoland.bedrock.tech/v2/platforms/{platform}/users/{gigya}/profiles
platform:
web: m6group_web
android_mob: m6group_android_mob
android_tv: m6group_android_tv
sdk:
apikey: 3_W6BPwMz2FGQEfH4_nVRaj4Ak1F1XDp33an_8y8nXULn8nk43FHvPIpb0TLOYIaUI
build: "13414"
version: 5.47.2