Compare commits

...

2 Commits
main ... main

Author SHA1 Message Date
8289b3a709 Add all season detection from a series 2026-01-19 22:17:10 +01:00
a5c68a0dcb Fix subtitle missing SKST, added VLD 2026-01-12 21:25:05 +01:00
5 changed files with 732 additions and 60 deletions

View File

@ -1,31 +1,37 @@
import json import json
import re import re
import base64
import hashlib
import click
from http.cookiejar import CookieJar from http.cookiejar import CookieJar
from typing import Optional, Iterable from typing import Optional, Iterable
from langcodes import Language from langcodes import Language
import base64
import click
from unshackle.core.constants import AnyTrack from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH from unshackle.core.manifests import DASH
from unshackle.core.service import Service from unshackle.core.service import Service
from unshackle.core.titles import Episode, Series, Movie, Movies, Title_T, Titles_T from unshackle.core.titles import Episode, Series, Movie, Movies, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio, Video
from unshackle.core.utilities import import_module_by_path
class HIDI(Service): class HIDI(Service):
""" """
Service code for HiDive (hidive.com) Service code for HiDive (hidive.com)
Version: 1.2.0 Version: 1.3.2
Authorization: Email + password login, with automatic token refresh. Authorization: Email + password login, with automatic token refresh.
Security: FHD@L3 Security: FHD@L3
IMPORTANT: UPDATE YOUR UNSHACKLE TO 2.3.0 TO GET THE NECESSARY FIX FOR THIS SERVICE
Also when downloading a series, use the link from the first season of the series
""" """
TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P<season_id>\d+)|playlist/(?P<playlist_id>\d+))$" TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P<season_id>\d+)|playlist/(?P<playlist_id>\d+))$"
GEOFENCE = () GEOFENCE = ()
NO_SUBTITLES = False NO_SUBTITLES = False
API_BASE = "https://dce-frontoffice.imggaming.com/api/v4"
@staticmethod @staticmethod
@click.command(name="HIDI", short_help="https://hidive.com") @click.command(name="HIDI", short_help="https://hidive.com")
@ -110,36 +116,160 @@ class HIDI(Service):
resp.raise_for_status() resp.raise_for_status()
return resp return resp
def get_titles(self) -> Titles_T: def _fetch_season_data(self, season_id: int) -> dict:
# One endpoint for both season and playlist """Fetch season view data."""
resp = self._api_get( return self._api_get(
self.config["endpoints"]["view"], self.config["endpoints"]["view"],
params={"type": ("playlist" if self.kind == "movie" else "season"), params={
"id": self.content_id, "type": "season",
"timezone": "Europe/Amsterdam"} "id": season_id,
) "timezone": "Europe/Amsterdam"
data = resp.json() }
).json()
def _fetch_adjacent_seasons(self, series_id: int, season_id: int) -> dict:
"""Fetch all seasons in a series using adjacentTo endpoint."""
url = f"{self.API_BASE}/series/{series_id}/adjacentTo/{season_id}"
return self._api_get(url, params={"size": 25}).json()
def _extract_series_info(self, season_data: dict) -> tuple[Optional[int], Optional[str]]:
"""
Extract series ID and title from season data.
Checks multiple locations in the JSON structure.
"""
series_id = None
series_title = None
# Method 1: Check metadata.series
metadata = season_data.get("metadata", {})
if metadata.get("series"):
series_id = metadata["series"].get("seriesId")
series_title = metadata["series"].get("title")
if series_id:
return series_id, series_title
# Method 2: Check elements for $type: "series"
for elem in season_data.get("elements", []):
if elem.get("$type") == "series":
attrs = elem.get("attributes", {})
series_id = attrs.get("id")
series_info = attrs.get("series", {})
series_title = series_info.get("title") or series_title
if series_id:
return series_id, series_title
# Method 3: Check bucket elements for seriesId
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket":
attrs = elem.get("attributes", {})
if attrs.get("seriesId"):
series_id = attrs["seriesId"]
return series_id, series_title
# Method 4: Check hero actions for seriesId
for elem in season_data.get("elements", []):
if elem.get("$type") == "hero":
for action in elem.get("attributes", {}).get("actions", []):
action_data = action.get("attributes", {}).get("action", {}).get("data", {})
if action_data.get("seriesId"):
series_id = action_data["seriesId"]
return series_id, series_title
return series_id, series_title
def _extract_season_number(self, season_data: dict) -> int:
"""Extract season number from season data."""
# Check metadata.currentSeason
metadata = season_data.get("metadata", {})
current_season = metadata.get("currentSeason", {})
if current_season.get("title"):
# Parse "Season 2" -> 2
title = current_season["title"]
if title.lower().startswith("season "):
try:
return int(title.split(" ")[1])
except (ValueError, IndexError):
pass
# Check elements for series type with seasons info
for elem in season_data.get("elements", []):
if elem.get("$type") == "series":
seasons_items = elem.get("attributes", {}).get("seasons", {}).get("items", [])
for item in seasons_items:
if item.get("seasonNumber"):
return item["seasonNumber"]
# Check bucket title
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season":
bucket_title = elem.get("attributes", {}).get("bucketTitle", "")
if bucket_title.lower().startswith("season "):
try:
return int(bucket_title.split(" ")[1])
except (ValueError, IndexError):
pass
return 1
def _parse_episodes_from_season(self, season_data: dict, series_title: str, season_number: int) -> list[Episode]:
"""Parse episodes from season JSON data."""
episodes = []
for elem in season_data.get("elements", []):
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "season":
items = elem.get("attributes", {}).get("items", [])
for idx, item in enumerate(items):
if item.get("type") != "SEASON_VOD":
continue
ep_title = item.get("title", "")
ep_num = idx + 1
# Try to extract episode number from title "E1 - Title"
if ep_title.startswith("E") and " - " in ep_title:
try:
ep_num = int(ep_title.split(" - ")[0][1:])
except ValueError:
pass
episodes.append(Episode(
id_=item["id"],
service=self.__class__,
title=series_title,
season=season_number,
number=ep_num,
name=ep_title,
description=item.get("description", ""),
language=Language.get("ja"),
data=item,
))
break
return episodes
def get_titles(self) -> Titles_T:
anchor_data = self._fetch_season_data(self.content_id)
if self.kind == "movie": if self.kind == "movie":
# Find the playlist bucket, then the single VOD
vod_id = None vod_id = None
movie_title = None movie_title = None
description = "" description = ""
for elem in data.get("elements", []):
for elem in anchor_data.get("elements", []):
if elem.get("$type") == "hero": if elem.get("$type") == "hero":
hdr = (elem.get("attributes", {}).get("header", {}) or {}).get("attributes", {}) hdr = (elem.get("attributes", {}).get("header", {}) or {}).get("attributes", {})
movie_title = hdr.get("text", movie_title) movie_title = hdr.get("text", movie_title)
for c in elem.get("attributes", {}).get("content", []): for c in elem.get("attributes", {}).get("content", []):
if c.get("$type") == "textblock": if c.get("$type") == "textblock":
description = c.get("attributes", {}).get("text", description) description = c.get("attributes", {}).get("text", description)
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "playlist": if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "playlist":
items = elem.get("attributes", {}).get("items", []) items = elem.get("attributes", {}).get("items", [])
if items: if items:
vod_id = items[0]["id"] vod_id = items[0]["id"]
if not movie_title: movie_title = movie_title or items[0].get("title")
movie_title = items[0].get("title") description = description or items[0].get("description", "")
if not description:
description = items[0].get("description", "")
break break
if not vod_id: if not vod_id:
@ -157,37 +287,86 @@ class HIDI(Service):
) )
]) ])
# Series series_id, series_title = self._extract_series_info(anchor_data)
episodes = [] series_title = series_title or "HiDive Series"
series_title = None anchor_season_num = self._extract_season_number(anchor_data)
for elem in data.get("elements", []):
if elem.get("$type") == "bucket" and elem["attributes"].get("type") == "season": if not series_id:
for item in elem["attributes"].get("items", []): self.log.warning("Could not determine Series ID. Fetching single season only.")
if item.get("type") != "SEASON_VOD": episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num)
continue return Series(episodes)
ep_title = item["title"]
ep_num = 1 try:
if ep_title.startswith("E") and " - " in ep_title: adj_data = self._fetch_adjacent_seasons(series_id, self.content_id)
try: except Exception as e:
ep_num = int(ep_title.split(" - ")[0][1:]) self.log.warning(f"Failed to fetch adjacent seasons: {e}. Falling back to single season.")
except: episodes = self._parse_episodes_from_season(anchor_data, series_title, anchor_season_num)
pass return Series(episodes)
episodes.append(Episode(
id_=item["id"], # Build list of all seasons
service=self.__class__, all_seasons = []
title=data.get("metadata", {}).get("series", {}).get("title", "") or "HiDive",
season=1, # Preceding seasons (these come before current season)
number=ep_num, for s in adj_data.get("precedingSeasons", []):
name=item["title"], all_seasons.append({
description=item.get("description", ""), "id": s["id"],
language=Language.get("en"), "seasonNumber": s.get("seasonNumber", 0),
data=item, "title": s.get("title", "")
)) })
break
# Current/Anchor season
if not episodes: all_seasons.append({
raise ValueError("No episodes found in season data.") "id": self.content_id,
return Series(sorted(episodes, key=lambda x: x.number)) "seasonNumber": anchor_season_num,
"title": f"Season {anchor_season_num}",
"_data": anchor_data # Cache to avoid re-fetching
})
# Following seasons (these come after current season)
for s in adj_data.get("followingSeasons", []):
all_seasons.append({
"id": s["id"],
"seasonNumber": s.get("seasonNumber", 0),
"title": s.get("title", "")
})
# Deduplicate by ID and sort by season number
unique_seasons = {}
for s in all_seasons:
s_id = s["id"]
if s_id not in unique_seasons:
unique_seasons[s_id] = s
elif "_data" in s:
# Prefer the one with cached data
unique_seasons[s_id] = s
sorted_seasons = sorted(unique_seasons.values(), key=lambda x: x["seasonNumber"])
all_episodes = []
for season_info in sorted_seasons:
s_id = season_info["id"]
s_num = season_info["seasonNumber"]
if "_data" in season_info:
self.log.info(f"Processing Season {s_num} (ID: {s_id}) [cached]")
season_data = season_info["_data"]
else:
self.log.info(f"Fetching Season {s_num} (ID: {s_id})")
try:
season_data = self._fetch_season_data(s_id)
except Exception as e:
self.log.error(f"Failed to fetch Season {s_num}: {e}")
continue
episodes = self._parse_episodes_from_season(season_data, series_title, s_num)
self.log.info(f" Found {len(episodes)} episodes")
all_episodes.extend(episodes)
if not all_episodes:
raise ValueError("No episodes found across all seasons.")
return Series(all_episodes)
def _get_audio_for_langs(self, mpd_url: str, langs: Iterable[Language]) -> list[Audio]: def _get_audio_for_langs(self, mpd_url: str, langs: Iterable[Language]) -> list[Audio]:
merged: list[Audio] = [] merged: list[Audio] = []
@ -300,11 +479,12 @@ class HIDI(Service):
return base_tracks return base_tracks
def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]: def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]:
if title.id in self._drm_cache: if title.id in self._drm_cache:
return self._drm_cache[title.id] return self._drm_cache[title.id]
self.get_tracks(title) self.get_tracks(title)
if title.id not in self._drm_cache:
raise ValueError("DRM information not found for this title.")
return self._drm_cache[title.id] return self._drm_cache[title.id]
def _decode_hidive_license_payload(self, payload: bytes) -> bytes: def _decode_hidive_license_payload(self, payload: bytes) -> bytes:

View File

@ -32,7 +32,9 @@
- Search functionality - Search functionality
- Fixing few hickups - Fixing few hickups
10. SKST (the hardest service I ever dealt upon now): 10. SKST (the hardest service I ever dealt upon now):
- Subtitles is a litte bit hit or miss for movies and for series there's still no subtitles - Subtitle has been fixed, hopefully no issue
11. VLD:
- So far no issue
- Acknowledgment - Acknowledgment

View File

@ -697,18 +697,14 @@ class SKST(Service):
protection = playback_data.get("protection", {}) protection = playback_data.get("protection", {})
self.drm_license_url = protection.get("licenceAcquisitionUrl") self.drm_license_url = protection.get("licenceAcquisitionUrl")
self.license_token = protection.get("licenceToken") self.license_token = protection.get("licenceToken")
manifest_url = manifest_url + "&audio=all&subtitle=all"
dash = DASH.from_url(manifest_url, session=self.session) dash = DASH.from_url(manifest_url, session=self.session)
tracks = dash.to_tracks(language=title.language) tracks = dash.to_tracks(language=title.language)
# Remove default subtitle tracks and add properly processed ones
for track in list(tracks.subtitles):
tracks.subtitles.remove(track)
subtitles = self._process_subtitles(dash, str(title.language))
tracks.add(subtitles)
return tracks return tracks
@staticmethod @staticmethod
@ -1045,4 +1041,4 @@ class SKST(Service):
# ) # )
def get_chapters(self, title: Title_T) -> list[Chapter]: def get_chapters(self, title: Title_T) -> list[Chapter]:
return [] return []

465
VLD/__init__.py Normal file
View File

@ -0,0 +1,465 @@
import re
import uuid
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Optional, Union
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks
class VLD(Service):
"""
Service code for RTL's Dutch streaming service Videoland (https://v2.videoland.com)
Version: 1.0.0
Authorization: Credentials
Security:
- L1: >= 720p
- L3: <= 576p
They are using the license server of DRMToday with encoded streams from CastLabs.
It accepts Non-Whitelisted CDMs so every unrevoked L1 CDM should work.
Use full URL (for example - https://v2.videoland.com/title-p_12345) or title slug.
"""
ALIASES = ("VLD", "videoland")
TITLE_RE = r"^(?:https?://(?:www\.)?v2\.videoland\.com/)?(?P<title_id>[a-zA-Z0-9_-]+)"
GEOFENCE = ("NL",)
@staticmethod
@click.command(name="Videoland", short_help="https://v2.videoland.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.pass_context
def cli(ctx, **kwargs):
return VLD(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
self.cdm = ctx.obj.cdm
self.device_id = str(uuid.uuid1().int)
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
self.profile = profile_name if profile_name else "default"
self.platform = self.config["platform"]["android_tv"]
self.platform_token = "token-androidtv-3"
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential or not credential.username or not credential.password:
raise EnvironmentError("Service requires Credentials for Authentication.")
self.credential = credential # Store for potential re-auth
self.session.headers.update({
"origin": "https://v2.videoland.com",
"x-client-release": self.config["sdk"]["version"],
"x-customer-name": "rtlnl",
})
# Build cache key
cache_key = f"tokens_{self.profile}"
# Check cache first
cache = self.cache.get(cache_key)
if cache and not cache.expired:
cached_data = cache.data
if isinstance(cached_data, dict) and cached_data.get("username") == credential.username:
self.log.info("Using cached tokens")
self._restore_from_cache(cached_data)
return
# Perform fresh login
self.log.info("Retrieving new tokens")
self._do_login(credential)
# Cache the tokens
self._cache_tokens(credential.username, cache_key)
def _restore_from_cache(self, cached_data: dict) -> None:
"""Restore authentication state from cached data."""
self.access_token = cached_data["access_token"]
self.gigya_uid = cached_data["gigya_uid"]
self.profile_id = cached_data["profile_id"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
def _cache_tokens(self, username: str, cache_key: str) -> None:
"""Cache the current authentication tokens."""
cache = self.cache.get(cache_key)
cache.set(
data={
"username": username,
"access_token": self.access_token,
"gigya_uid": self.gigya_uid,
"profile_id": self.profile_id,
},
expiration=3600 # 1 hour expiration, adjust as needed
)
def _do_login(self, credential: Credential) -> None:
"""Perform full login flow."""
# Step 1: Authorize with Gigya
auth_response = self.session.post(
url=self.config["endpoints"]["authorization"],
data={
"loginID": credential.username,
"password": credential.password,
"sessionExpiration": "0",
"targetEnv": "jssdk",
"include": "profile,data",
"includeUserInfo": "true",
"lang": "nl",
"ApiKey": self.config["sdk"]["apikey"],
"authMode": "cookie",
"pageURL": "https://v2.videoland.com/",
"sdkBuild": self.config["sdk"]["build"],
"format": "json",
},
).json()
if auth_response.get("errorMessage"):
raise EnvironmentError(f"Could not authorize Videoland account: {auth_response['errorMessage']!r}")
self.gigya_uid = auth_response["UID"]
uid_signature = auth_response["UIDSignature"]
signature_timestamp = auth_response["signatureTimestamp"]
# Step 2: Get initial JWT token
jwt_headers = {
"x-auth-device-id": self.device_id,
"x-auth-device-player-size-height": "3840",
"x-auth-device-player-size-width": "2160",
"X-Auth-gigya-signature": uid_signature,
"X-Auth-gigya-signature-timestamp": signature_timestamp,
"X-Auth-gigya-uid": self.gigya_uid,
"X-Client-Release": self.config["sdk"]["version"],
"X-Customer-Name": "rtlnl",
}
jwt_response = self.session.get(
url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform),
headers=jwt_headers,
).json()
if jwt_response.get("error"):
raise EnvironmentError(f"Could not get Access Token: {jwt_response['error']['message']!r}")
initial_token = jwt_response["token"]
# Step 3: Get profiles
profiles_response = self.session.get(
url=self.config["endpoints"]["profiles"].format(
platform=self.platform,
gigya=self.gigya_uid,
),
headers={"Authorization": f"Bearer {initial_token}"},
).json()
if isinstance(profiles_response, dict) and profiles_response.get("error"):
raise EnvironmentError(f"Could not get profiles: {profiles_response['error']['message']!r}")
self.profile_id = profiles_response[0]["uid"]
# Step 4: Get final JWT token with profile
jwt_headers["X-Auth-profile-id"] = self.profile_id
final_jwt_response = self.session.get(
url=self.config["endpoints"]["jwt_tokens"].format(platform=self.platform),
headers=jwt_headers,
).json()
if final_jwt_response.get("error"):
raise EnvironmentError(f"Could not get final Access Token: {final_jwt_response['error']['message']!r}")
self.access_token = final_jwt_response["token"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
def search(self) -> Generator[SearchResult, None, None]:
# Videoland doesn't have a documented search endpoint in the original code
# This is a placeholder - you may need to implement based on actual API
raise NotImplementedError("Search is not implemented for Videoland")
def get_titles(self) -> Titles_T:
title_match = re.match(self.TITLE_RE, self.title)
if not title_match:
raise ValueError(f"Invalid title format: {self.title}")
title_slug = title_match.group("title_id")
# Handle folder URLs (e.g., title-f_12345)
if re.match(r".+?-f_[0-9]+", title_slug):
title_slug = self._get_program_title(title_slug)
# Extract title ID from slug (e.g., "show-name-p_12345" -> "12345")
title_id = title_slug.split("-p_")[-1] if "-p_" in title_slug else title_slug
metadata = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"program/{title_id}",
),
params={"nbPages": "10"},
).json()
# Check for API errors
if isinstance(metadata, dict) and metadata.get("error"):
raise ValueError(f"API Error: {metadata.get('message', 'Unknown error')}")
# Determine if it's a movie based on metadata
is_movie = "Seizoen" not in str(metadata)
if is_movie:
movie_info = metadata["blocks"][0]["content"]["items"][0]
viewable_id = movie_info["itemContent"]["action"]["target"]["value_layout"]["id"]
return Movies([
Movie(
id_=movie_info["ucid"],
service=self.__class__,
name=metadata["entity"]["metadata"]["title"],
year=None,
language=Language.get("nl"),
data={
"viewable": viewable_id,
"metadata": metadata,
},
)
])
else:
seasons = [
block
for block in metadata["blocks"]
if block["featureId"] == "videos_by_season_by_program"
]
# Fetch all episodes from all seasons with pagination
for season in seasons:
while len(season["content"]["items"]) != season["content"]["pagination"]["totalItems"]:
season_data = self.session.get(
url=self.config["endpoints"]["seasoning"].format(
platform=self.platform,
token=self.platform_token,
program=title_id,
season_id=season["id"],
),
params={
"nbPages": "10",
"page": season["content"]["pagination"]["nextPage"],
},
).json()
for episode in season_data["content"]["items"]:
if episode not in season["content"]["items"]:
season["content"]["items"].append(episode)
season["content"]["pagination"]["nextPage"] = season_data["content"]["pagination"]["nextPage"]
episodes = []
for season in seasons:
# Extract season number from title like "Seizoen 1" or "Season 1"
season_title = season.get("title", {}).get("long", "")
season_match = re.search(r"(\d+)", season_title)
season_number = int(season_match.group(1)) if season_match else 1
for idx, episode_data in enumerate(season["content"]["items"]):
# Get the extra title which contains episode info
extra_title = episode_data["itemContent"].get("extraTitle", "")
# Extract episode number from extraTitle like "1. Hondenadoptiedag" or "14. Een Draak Op School (Deel 1)"
episode_number = None
episode_name = extra_title
ep_match = re.match(r"^(\d+)\.\s*(.*)$", extra_title)
if ep_match:
episode_number = int(ep_match.group(1))
episode_name = ep_match.group(2)
else:
# Fallback to index + 1
episode_number = idx + 1
viewable_id = episode_data["itemContent"]["action"]["target"]["value_layout"]["id"]
episodes.append(
Episode(
id_=episode_data["ucid"],
service=self.__class__,
title=metadata["entity"]["metadata"]["title"],
season=season_number,
number=episode_number,
name=episode_name,
year=None,
language=Language.get("nl"),
data={
"viewable": viewable_id,
"episode_data": episode_data,
},
)
)
# Sort episodes by season and episode number
episodes = sorted(episodes, key=lambda ep: (ep.season, ep.number))
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
viewable_id = title.data["viewable"]
manifest_response = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"video/{viewable_id}",
),
params={"nbPages": "2"},
).json()
player_block = next(
(block for block in manifest_response["blocks"] if block["templateId"] == "Player"),
None,
)
if not player_block:
raise ValueError("Could not find player block in manifest")
assets = player_block["content"]["items"][0]["itemContent"]["video"]["assets"]
if not assets:
raise ValueError("Failed to load content manifest - no assets found")
# Prefer HD quality
mpd_asset = next((asset for asset in assets if asset["quality"] == "hd"), None)
if not mpd_asset:
mpd_asset = next((asset for asset in assets if asset["quality"] == "sd"), None)
if not mpd_asset:
raise ValueError("No suitable quality stream found")
mpd_url = mpd_asset["path"]
# Extract PlayReady PSSH from manifest
manifest_content = self.session.get(mpd_url).text
pssh_matches = re.findall(r'<cenc:pssh>(.+?)</cenc:pssh>', manifest_content)
self.pssh_playready = None
for pssh in pssh_matches:
if len(pssh) > 200:
self.pssh_playready = pssh
break
# Store viewable ID for license request
self.current_viewable = viewable_id
tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language)
# Fix track URLs - replace CDN hostname
for track in tracks:
if hasattr(track, 'url') and track.url:
if isinstance(track.url, list):
track.url = [
re.sub(
r"https://.+?\.videoland\.bedrock\.tech",
"https://origin.vod.videoland.bedrock.tech",
uri.split("?")[0],
)
for uri in track.url
]
elif isinstance(track.url, str):
track.url = re.sub(
r"https://.+?\.videoland\.bedrock\.tech",
"https://origin.vod.videoland.bedrock.tech",
track.url.split("?")[0],
)
# Handle subtitles
for subtitle in tracks.subtitles:
if isinstance(subtitle.url, list) or (isinstance(subtitle.url, str) and "dash" in subtitle.url):
subtitle.codec = Subtitle.Codec.SubRip
else:
self.log.warning("Unknown subtitle codec detected")
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_) -> Optional[str]:
return self.config.get("certificate")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
license_token = self._get_license_token(title)
response = self.session.post(
url=self.config["endpoints"]["license_wv"],
data=challenge,
headers={"x-dt-auth-token": license_token},
)
if response.status_code != 200:
raise ValueError(f"Failed to get Widevine license: {response.status_code}")
return response.json().get("license")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
license_token = self._get_license_token(title)
response = self.session.post(
url=self.config["endpoints"]["license_pr"],
data=challenge,
headers={"x-dt-auth-token": license_token},
)
if response.status_code != 200:
raise ValueError(f"Failed to get PlayReady license: {response.status_code}")
return response.content
def _get_license_token(self, title: Title_T) -> str:
viewable_id = title.data["viewable"]
response = self.session.get(
url=self.config["endpoints"]["license_token"].format(
platform=self.platform,
gigya=self.gigya_uid,
clip=viewable_id,
),
).json()
return response["token"]
def _get_program_title(self, folder_title: str) -> str:
folder_id = folder_title.split("-f_")[1]
response = self.session.get(
url=self.config["endpoints"]["layout"].format(
platform=self.platform,
token=self.platform_token,
endpoint=f"folder/{folder_id}",
),
params={"nbPages": "2"},
).json()
target = response["blocks"][0]["content"]["items"][0]["itemContent"]["action"]["target"]["value_layout"]
parent_seo = target["parent"]["seo"]
parent_id = target["parent"]["id"]
return f"{parent_seo}-p_{parent_id}"

29
VLD/config.yaml Normal file
View File

@ -0,0 +1,29 @@
certificate: |
CsECCAMSEBcFuRfMEgSGiwYzOi93KowYgrSCkgUijgIwggEKAoIBAQCZ7Vs7Mn2rXiTvw7YqlbWYUgrVvMs3UD4GRbgU2Ha430BRBEGtjOOtsRu4jE5yWl5
KngeVKR1YWEAjp+GvDjipEnk5MAhhC28VjIeMfiG/+/7qd+EBnh5XgeikX0YmPRTmDoBYqGB63OBPrIRXsTeo1nzN6zNwXZg6IftO7L1KEMpHSQykfqpdQ4
IY3brxyt4zkvE9b/tkQv0x4b9AsMYE0cS6TJUgpL+X7r1gkpr87vVbuvVk4tDnbNfFXHOggrmWEguDWe3OJHBwgmgNb2fG2CxKxfMTRJCnTuw3r0svAQxZ6
ChD4lgvC2ufXbD8Xm7fZPvTCLRxG88SUAGcn1oJAgMBAAE6FGxpY2Vuc2Uud2lkZXZpbmUuY29tEoADrjRzFLWoNSl/JxOI+3u4y1J30kmCPN3R2jC5MzlR
HrPMveoEuUS5J8EhNG79verJ1BORfm7BdqEEOEYKUDvBlSubpOTOD8S/wgqYCKqvS/zRnB3PzfV0zKwo0bQQQWz53ogEMBy9szTK/NDUCXhCOmQuVGE98K/
PlspKkknYVeQrOnA+8XZ/apvTbWv4K+drvwy6T95Z0qvMdv62Qke4XEMfvKUiZrYZ/DaXlUP8qcu9u/r6DhpV51Wjx7zmVflkb1gquc9wqgi5efhn9joLK3
/bNixbxOzVVdhbyqnFk8ODyFfUnaq3fkC3hR3f0kmYgI41sljnXXjqwMoW9wRzBMINk+3k6P8cbxfmJD4/Paj8FwmHDsRfuoI6Jj8M76H3CTsZCZKDJjM3B
QQ6Kb2m+bQ0LMjfVDyxoRgvfF//M/EEkPrKWyU2C3YBXpxaBquO4C8A0ujVmGEEqsxN1HX9lu6c5OMm8huDxwWFd7OHMs3avGpr7RP7DUnTikXrh6X0
endpoints:
layout: https://layout.videoland.bedrock.tech/front/v1/rtlnl/{platform}/main/{token}/{endpoint}/layout
seasoning: https://layout.videoland.bedrock.tech/front/v1/rtlnl/{platform}/main/{token}/program/{program}/block/{season_id}
license_pr: https://lic.drmtoday.com/license-proxy-headerauth/drmtoday/RightsManager.asmx
license_wv: https://lic.drmtoday.com/license-proxy-widevine/cenc/
license_token: https://drm.videoland.bedrock.tech/v1/customers/rtlnl/platforms/{platform}/services/videoland/users/{gigya}/videos/{clip}/upfront-token
authorization: https://accounts.eu1.gigya.com/accounts.login
jwt_tokens: https://front-auth.videoland.bedrock.tech/v2/platforms/{platform}/getJwt
profiles: https://users.videoland.bedrock.tech/v2/platforms/{platform}/users/{gigya}/profiles
platform:
web: m6group_web
android_mob: m6group_android_mob
android_tv: m6group_android_tv
sdk:
apikey: 3_W6BPwMz2FGQEfH4_nVRaj4Ak1F1XDp33an_8y8nXULn8nk43FHvPIpb0TLOYIaUI
build: "13414"
version: 5.47.2