Compare commits

..

13 Commits
main ... main

15 changed files with 2123 additions and 23 deletions

334
HIDI/__init__.py Normal file
View File

@ -0,0 +1,334 @@
import json
import re
from http.cookiejar import CookieJar
from typing import Optional, Iterable
from langcodes import Language
import base64
import click
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Series, Movie, Movies, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle, Audio
class HIDI(Service):
"""
Service code for HiDive (hidive.com)
Version: 1.2.0
Authorization: Email + password login, with automatic token refresh.
Security: FHD@L3
"""
TITLE_RE = r"^https?://(?:www\.)?hidive\.com/(?:season/(?P<season_id>\d+)|playlist/(?P<playlist_id>\d+))$"
GEOFENCE = ()
NO_SUBTITLES = False
@staticmethod
@click.command(name="HIDI", short_help="https://hidive.com")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return HIDI(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
m = re.match(self.TITLE_RE, title)
if not m:
raise ValueError("Unsupported HiDive URL. Use /season/<id> or /playlist/<id>")
self.season_id = m.group("season_id")
self.playlist_id = m.group("playlist_id")
self.kind = "serie" if self.season_id else "movie"
self.content_id = int(self.season_id or self.playlist_id)
if not self.config:
raise EnvironmentError("Missing HIDI service config.")
self.cdm = ctx.obj.cdm
self._auth_token = None
self._refresh_token = None
self._drm_cache = {}
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
base_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US",
"Referer": "https://www.hidive.com/",
"Origin": "https://www.hidive.com",
"x-api-key": self.config["x_api_key"],
"app": "dice",
"Realm": "dce.hidive",
"x-app-var": self.config["x_app_var"],
}
self.session.headers.update(base_headers)
if not credential or not credential.username or not credential.password:
raise ValueError("HiDive requires email + password")
r_login = self.session.post(
self.config["endpoints"]["login"],
json={"id": credential.username, "secret": credential.password}
)
if r_login.status_code == 401:
raise PermissionError("Invalid email or password.")
r_login.raise_for_status()
login_data = r_login.json()
self._auth_token = login_data["authorisationToken"]
self._refresh_token = login_data["refreshToken"]
self.session.headers["Authorization"] = f"Bearer {self._auth_token}"
self.log.info("HiDive login successful.")
def _refresh_auth(self):
if not self._refresh_token:
raise PermissionError("No refresh token available to renew session.")
self.log.warning("Auth token expired, refreshing...")
r = self.session.post(
self.config["endpoints"]["refresh"],
json={"refreshToken": self._refresh_token}
)
if r.status_code == 401:
raise PermissionError("Refresh token is invalid. Please log in again.")
r.raise_for_status()
data = r.json()
self._auth_token = data["authorisationToken"]
self.session.headers["Authorization"] = f"Bearer {self._auth_token}"
self.log.info("Auth token refreshed successfully.")
def _api_get(self, url, **kwargs):
resp = self.session.get(url, **kwargs)
if resp.status_code == 401:
self._refresh_auth()
resp = self.session.get(url, **kwargs)
resp.raise_for_status()
return resp
def get_titles(self) -> Titles_T:
# One endpoint for both season and playlist
resp = self._api_get(
self.config["endpoints"]["view"],
params={"type": ("playlist" if self.kind == "movie" else "season"),
"id": self.content_id,
"timezone": "Europe/Amsterdam"}
)
data = resp.json()
if self.kind == "movie":
# Find the playlist bucket, then the single VOD
vod_id = None
movie_title = None
description = ""
for elem in data.get("elements", []):
if elem.get("$type") == "hero":
hdr = (elem.get("attributes", {}).get("header", {}) or {}).get("attributes", {})
movie_title = hdr.get("text", movie_title)
for c in elem.get("attributes", {}).get("content", []):
if c.get("$type") == "textblock":
description = c.get("attributes", {}).get("text", description)
if elem.get("$type") == "bucket" and elem.get("attributes", {}).get("type") == "playlist":
items = elem.get("attributes", {}).get("items", [])
if items:
vod_id = items[0]["id"]
if not movie_title:
movie_title = items[0].get("title")
if not description:
description = items[0].get("description", "")
break
if not vod_id:
raise ValueError("No VOD found in playlist data.")
return Movies([
Movie(
id_=vod_id,
service=self.__class__,
name=movie_title or "Unknown Title",
description=description or "",
year=None,
language=Language.get("en"),
data={"playlistId": self.content_id}
)
])
# Series
episodes = []
series_title = None
for elem in data.get("elements", []):
if elem.get("$type") == "bucket" and elem["attributes"].get("type") == "season":
for item in elem["attributes"].get("items", []):
if item.get("type") != "SEASON_VOD":
continue
ep_title = item["title"]
ep_num = 1
if ep_title.startswith("E") and " - " in ep_title:
try:
ep_num = int(ep_title.split(" - ")[0][1:])
except:
pass
episodes.append(Episode(
id_=item["id"],
service=self.__class__,
title=data.get("metadata", {}).get("series", {}).get("title", "") or "HiDive",
season=1,
number=ep_num,
name=item["title"],
description=item.get("description", ""),
language=Language.get("en"),
data=item,
))
break
if not episodes:
raise ValueError("No episodes found in season data.")
return Series(sorted(episodes, key=lambda x: x.number))
def _get_audio_for_langs(self, mpd_url: str, langs: Iterable[Language]) -> list[Audio]:
merged: list[Audio] = []
seen = set()
# Use first available language as fallback, or "en" as ultimate fallback
fallback_lang = langs[0] if langs else Language.get("en")
dash = DASH.from_url(mpd_url, session=self.session)
try:
# Parse with a valid fallback language
base_tracks = dash.to_tracks(language=fallback_lang)
except Exception:
# Try with English as ultimate fallback
base_tracks = dash.to_tracks(language=Language.get("en"))
all_audio = base_tracks.audio or []
for lang in langs:
# Match by language prefix (e.g. en, ja)
for audio in all_audio:
lang_code = getattr(audio.language, "language", "en")
if lang_code.startswith(lang.language[:2]):
key = (lang_code, getattr(audio, "codec", None), getattr(audio, "bitrate", None))
if key in seen:
continue
merged.append(audio)
seen.add(key)
# If nothing matched, just return all available audio tracks
if not merged and all_audio:
merged = all_audio
return merged
def get_tracks(self, title: Title_T) -> Tracks:
vod_resp = self._api_get(
self.config["endpoints"]["vod"].format(vod_id=title.id),
params={"includePlaybackDetails": "URL"},
)
vod = vod_resp.json()
playback_url = vod.get("playerUrlCallback")
if not playback_url:
raise ValueError("No playback URL found.")
stream_data = self._api_get(playback_url).json()
dash_list = stream_data.get("dash", [])
if not dash_list:
raise ValueError("No DASH streams available.")
entry = dash_list[0]
mpd_url = entry["url"]
# Collect available HiDive metadata languages
meta_audio_tracks = vod.get("onlinePlaybackMetadata", {}).get("audioTracks", [])
available_langs = []
for m in meta_audio_tracks:
lang_code = (m.get("languageCode") or "").split("-")[0]
if not lang_code:
continue
try:
available_langs.append(Language.get(lang_code))
except Exception:
continue
# Use first available language as fallback, or English as ultimate fallback
fallback_lang = available_langs[0] if available_langs else Language.get("en")
# Parse DASH manifest with a valid fallback language
base_tracks = DASH.from_url(mpd_url, session=self.session).to_tracks(language=fallback_lang)
audio_tracks = self._get_audio_for_langs(mpd_url, available_langs)
# Map metadata labels
meta_audio_map = {m.get("languageCode", "").split("-")[0]: m.get("label") for m in meta_audio_tracks}
for a in audio_tracks:
lang_code = getattr(a.language, "language", "en")
a.name = meta_audio_map.get(lang_code, lang_code)
a.is_original_lang = (lang_code == title.language.language)
base_tracks.audio = audio_tracks
# Subtitles
subtitles = []
for sub in entry.get("subtitles", []):
if sub.get("format", "").lower() != "vtt":
continue
lang_code = sub.get("language", "en").replace("-", "_")
try:
lang = Language.get(lang_code)
except Exception:
lang = Language.get("en")
subtitles.append(Subtitle(
id_=f"{lang_code}:vtt",
url=sub.get("url"),
language=lang,
codec=Subtitle.Codec.WebVTT,
name=lang.language_name(),
))
base_tracks.subtitles = subtitles
# DRM info
drm = entry.get("drm", {}) or {}
jwt = drm.get("jwtToken")
lic_url = (drm.get("url") or "").strip()
if jwt and lic_url:
self._drm_cache[title.id] = (jwt, lic_url)
return base_tracks
def _hidive_get_drm_info(self, title: Title_T) -> tuple[str, str]:
if title.id in self._drm_cache:
return self._drm_cache[title.id]
self.get_tracks(title)
return self._drm_cache[title.id]
def _decode_hidive_license_payload(self, payload: bytes) -> bytes:
text = payload.decode("utf-8", errors="ignore")
prefix = "data:application/octet-stream;base64,"
if text.startswith(prefix):
b64 = text.split(",", 1)[1]
return base64.b64decode(b64)
return payload
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes | str | None:
jwt_token, license_url = self._hidive_get_drm_info(title)
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/octet-stream",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"Origin": "https://www.hidive.com",
"Referer": "https://www.hidive.com/",
"X-DRM-INFO": "eyJzeXN0ZW0iOiJjb20ud2lkZXZpbmUuYWxwaGEifQ==",
}
r = self.session.post(license_url, data=challenge, headers=headers, timeout=30)
r.raise_for_status()
return self._decode_hidive_license_payload(r.content)
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []

10
HIDI/config.yaml Normal file
View File

@ -0,0 +1,10 @@
x_api_key: "857a1e5d-e35e-4fdf-805b-a87b6f8364bf"
x_app_var: "6.59.1.e16cdfd"
endpoints:
init: "https://dce-frontoffice.imggaming.com/api/v1/init/"
login: "https://dce-frontoffice.imggaming.com/api/v2/login"
vod: "https://dce-frontoffice.imggaming.com/api/v4/vod/{vod_id}?includePlaybackDetails=URL"
adjacent: "https://dce-frontoffice.imggaming.com/api/v4/vod/{vod_id}/adjacent"
view: "https://dce-frontoffice.imggaming.com/api/v1/view" # Changed from season_view
refresh: "https://dce-frontoffice.imggaming.com/api/v2/token/refresh"

407
KNPY/__init__.py Normal file
View File

@ -0,0 +1,407 @@
import base64
import json
import re
from datetime import datetime, timezone
from http.cookiejar import CookieJar
from typing import List, Optional
import click
import jwt
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Subtitle, Tracks
class KNPY(Service):
"""
Service code for Kanopy (kanopy.com).
Version: 1.0.0
Auth: Credential (username + password)
Security: FHD@L3
Handles both Movies and Series (Playlists).
Detects and stops for movies that require tickets.
Caching included
"""
# Updated regex to match the new URL structure with library subdomain and path
TITLE_RE = r"^https?://(?:www\.)?kanopy\.com/.+/(?P<id>\d+)$"
GEOFENCE = ()
NO_SUBTITLES = False
@staticmethod
@click.command(name="KNPY", short_help="https://kanopy.com")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return KNPY(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
if not self.config:
raise ValueError("KNPY configuration not found. Ensure config.yaml exists.")
self.cdm = ctx.obj.cdm
match = re.match(self.TITLE_RE, title)
if match:
self.content_id = match.group("id")
else:
self.content_id = None
self.search_query = title
self.API_VERSION = self.config["client"]["api_version"]
self.USER_AGENT = self.config["client"]["user_agent"]
self.WIDEVINE_UA = self.config["client"]["widevine_ua"]
self.session.headers.update({
"x-version": self.API_VERSION,
"user-agent": self.USER_AGENT
})
self._jwt = None
self._visitor_id = None
self._user_id = None
self._domain_id = None
self.widevine_license_url = None
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential or not credential.username or not credential.password:
raise ValueError("Kanopy requires email and password for authentication.")
cache = self.cache.get("auth_token")
if cache and not cache.expired:
cached_data = cache.data
valid_token = None
if isinstance(cached_data, dict) and "token" in cached_data:
if cached_data.get("username") == credential.username:
valid_token = cached_data["token"]
self.log.info("Using cached authentication token")
else:
self.log.info(f"Cached token belongs to '{cached_data.get('username')}', but logging in as '{credential.username}'. Re-authenticating.")
elif isinstance(cached_data, str):
self.log.info("Found legacy cached token format. Re-authenticating to ensure correct user.")
if valid_token:
self._jwt = valid_token
self.session.headers.update({"authorization": f"Bearer {self._jwt}"})
if not self._user_id or not self._domain_id or not self._visitor_id:
try:
decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False})
self._user_id = decoded_jwt["data"]["uid"]
self._visitor_id = decoded_jwt["data"]["visitor_id"]
self.log.info(f"Extracted user_id and visitor_id from cached token.")
self._fetch_user_details()
return
except (KeyError, jwt.DecodeError) as e:
self.log.error(f"Could not decode cached token: {e}. Re-authenticating.")
self.log.info("Performing handshake to get visitor token...")
r = self.session.get(self.config["endpoints"]["handshake"])
r.raise_for_status()
handshake_data = r.json()
self._visitor_id = handshake_data["visitorId"]
initial_jwt = handshake_data["jwt"]
self.log.info(f"Logging in as {credential.username}...")
login_payload = {
"credentialType": "email",
"emailUser": {
"email": credential.username,
"password": credential.password
}
}
r = self.session.post(
self.config["endpoints"]["login"],
json=login_payload,
headers={"authorization": f"Bearer {initial_jwt}"}
)
r.raise_for_status()
login_data = r.json()
self._jwt = login_data["jwt"]
self._user_id = login_data["userId"]
self.session.headers.update({"authorization": f"Bearer {self._jwt}"})
self.log.info(f"Successfully authenticated as {credential.username}")
self._fetch_user_details()
try:
decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False})
exp_timestamp = decoded_jwt.get("exp")
cache_payload = {
"token": self._jwt,
"username": credential.username
}
if exp_timestamp:
expiration_in_seconds = int(exp_timestamp - datetime.now(timezone.utc).timestamp())
self.log.info(f"Caching token for {expiration_in_seconds / 60:.2f} minutes.")
cache.set(data=cache_payload, expiration=expiration_in_seconds)
else:
self.log.warning("JWT has no 'exp' claim, caching for 1 hour as a fallback.")
cache.set(data=cache_payload, expiration=3600)
except Exception as e:
self.log.error(f"Failed to decode JWT for caching: {e}. Caching for 1 hour as a fallback.")
cache.set(
data={"token": self._jwt, "username": credential.username},
expiration=3600
)
def _fetch_user_details(self):
self.log.info("Fetching user library memberships...")
r = self.session.get(self.config["endpoints"]["memberships"].format(user_id=self._user_id))
r.raise_for_status()
memberships = r.json()
for membership in memberships.get("list", []):
if membership.get("status") == "active" and membership.get("isDefault", False):
self._domain_id = str(membership["domainId"])
self.log.info(f"Using default library domain: {membership.get('sitename', 'Unknown')} (ID: {self._domain_id})")
return
if memberships.get("list"):
self._domain_id = str(memberships["list"][0]["domainId"])
self.log.warning(f"No default library found. Using first active domain: {self._domain_id}")
else:
raise ValueError("No active library memberships found for this user.")
def get_titles(self) -> Titles_T:
if not self.content_id:
raise ValueError("A content ID is required to get titles. Use a URL or run a search first.")
if not self._domain_id:
raise ValueError("Domain ID not set. Authentication may have failed.")
r = self.session.get(self.config["endpoints"]["video_info"].format(video_id=self.content_id, domain_id=self._domain_id))
r.raise_for_status()
content_data = r.json()
content_type = content_data.get("type")
def parse_lang(data):
try:
langs = data.get("languages", [])
if langs and isinstance(langs, list) and len(langs) > 0:
return Language.find(langs[0])
except:
pass
return Language.get("en")
if content_type == "video":
video_data = content_data["video"]
movie = Movie(
id_=str(video_data["videoId"]),
service=self.__class__,
name=video_data["title"],
year=video_data.get("productionYear"),
description=video_data.get("descriptionHtml", ""),
language=parse_lang(video_data),
data=video_data,
)
return Movies([movie])
elif content_type == "playlist":
playlist_data = content_data["playlist"]
series_title = playlist_data["title"]
series_year = playlist_data.get("productionYear")
season_match = re.search(r'(?:Season|S)\s*(\d+)', series_title, re.IGNORECASE)
season_num = int(season_match.group(1)) if season_match else 1
r = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id))
r.raise_for_status()
items_data = r.json()
episodes = []
for i, item in enumerate(items_data.get("list", [])):
if item.get("type") != "video":
continue
video_data = item["video"]
ep_num = i + 1
ep_title = video_data.get("title", "")
ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title, re.IGNORECASE)
if ep_match:
ep_num = int(ep_match.group(1))
episodes.append(
Episode(
id_=str(video_data["videoId"]),
service=self.__class__,
title=series_title,
season=season_num,
number=ep_num,
name=video_data["title"],
description=video_data.get("descriptionHtml", ""),
year=video_data.get("productionYear", series_year),
language=parse_lang(video_data),
data=video_data,
)
)
series = Series(episodes)
series.name = series_title
series.description = playlist_data.get("descriptionHtml", "")
series.year = series_year
return series
else:
raise ValueError(f"Unsupported content type: {content_type}")
def get_tracks(self, title: Title_T) -> Tracks:
play_payload = {
"videoId": int(title.id),
"domainId": int(self._domain_id),
"userId": int(self._user_id),
"visitorId": self._visitor_id
}
self.session.headers.setdefault("authorization", f"Bearer {self._jwt}")
self.session.headers.setdefault("x-version", self.API_VERSION)
self.session.headers.setdefault("user-agent", self.USER_AGENT)
r = self.session.post(self.config["endpoints"]["plays"], json=play_payload)
response_json = None
try:
response_json = r.json()
except Exception:
pass
# Handle known errors gracefully
if r.status_code == 403:
if response_json and response_json.get("errorSubcode") == "playRegionRestricted":
self.log.error("Kanopy reports: This video is not available in your country.")
raise PermissionError(
"Playback blocked by region restriction. Try connecting through a supported country or verify your librarys access region."
)
else:
self.log.error(f"Access forbidden (HTTP 403). Response: {response_json}")
raise PermissionError("Kanopy denied access to this video. It may require a different library membership or authentication.")
# Raise for any other HTTP errors
r.raise_for_status()
play_data = response_json or r.json()
manifest_url = None
for manifest in play_data.get("manifests", []):
if manifest["manifestType"] == "dash":
url = manifest["url"]
manifest_url = f"https://kanopy.com{url}" if url.startswith("/") else url
drm_type = manifest.get("drmType")
if drm_type == "kanopyDrm":
play_id = play_data.get("playId")
self.widevine_license_url = self.config["endpoints"]["widevine_license"].format(license_id=f"{play_id}-0")
elif drm_type == "studioDrm":
license_id = manifest.get("drmLicenseID", f"{play_data.get('playId')}-1")
self.widevine_license_url = self.config["endpoints"]["widevine_license"].format(license_id=license_id)
else:
self.log.warning(f"Unknown drmType: {drm_type}")
self.widevine_license_url = None
break
if not manifest_url:
raise ValueError("Could not find a DASH manifest for this title.")
if not self.widevine_license_url:
raise ValueError("Could not construct Widevine license URL.")
self.log.info(f"Fetching DASH manifest from: {manifest_url}")
r = self.session.get(manifest_url)
r.raise_for_status()
# Refresh headers for manifest parsing
self.session.headers.clear()
self.session.headers.update({
"User-Agent": self.WIDEVINE_UA,
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
})
tracks = DASH.from_text(r.text, url=manifest_url).to_tracks(language=title.language)
for caption_data in play_data.get("captions", []):
lang = caption_data.get("language", "en")
for file_info in caption_data.get("files", []):
if file_info.get("type") == "webvtt":
tracks.add(Subtitle(
id_=f"caption-{lang}",
url=file_info["url"],
codec=Subtitle.Codec.WebVTT,
language=Language.get(lang)
))
break
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.widevine_license_url:
raise ValueError("Widevine license URL was not set. Call get_tracks first.")
license_headers = {
"Content-Type": "application/octet-stream",
"User-Agent": self.WIDEVINE_UA,
"Authorization": f"Bearer {self._jwt}",
"X-Version": self.API_VERSION
}
r = self.session.post(
self.widevine_license_url,
data=challenge,
headers=license_headers
)
r.raise_for_status()
return r.content
# def search(self) -> List[SearchResult]:
# if not hasattr(self, 'search_query'):
# self.log.error("Search query not set. Cannot search.")
# return []
# self.log.info(f"Searching for '{self.search_query}'...")
# params = {
# "query": self.search_query,
# "sort": "relevance",
# "domainId": self._domain_id,
# "page": 0,
# "perPage": 20
# }
# r = self.session.get(self.config["endpoints"]["search"], params=params)
# r.raise_for_status()
# search_data = r.json()
# results = []
# for item in search_data.get("list", []):
# item_type = item.get("type")
# if item_type not in ["playlist", "video"]:
# continue
# video_id = item.get("videoId")
# title = item.get("title", "No Title")
# label = "Series" if item_type == "playlist" else "Movie"
# results.append(
# SearchResult(
# id_=str(video_id),
# title=title,
# description="",
# label=label,
# url=f"https://www.kanopy.com/watch/{video_id}"
# )
# )
# return results
def get_chapters(self, title: Title_T) -> list:
return []

15
KNPY/config.yaml Normal file
View File

@ -0,0 +1,15 @@
client:
api_version: "Android/com.kanopy/6.21.0/952 (SM-A525F; Android 15)"
user_agent: "okhttp/5.2.1"
widevine_ua: "KanopyApplication/6.21.0 (Linux;Android 15) AndroidXMedia3/1.8.0"
endpoints:
handshake: "https://kanopy.com/kapi/handshake"
login: "https://kanopy.com/kapi/login"
memberships: "https://kanopy.com/kapi/memberships?userId={user_id}"
video_info: "https://kanopy.com/kapi/videos/{video_id}?domainId={domain_id}"
video_items: "https://kanopy.com/kapi/videos/{video_id}/items?domainId={domain_id}"
search: "https://kanopy.com/kapi/search/videos"
plays: "https://kanopy.com/kapi/plays"
access_expires_in: "https://kanopy.com/kapi/users/{user_id}/history/videos/{video_id}/access_expires_in?domainId={domain_id}"
widevine_license: "https://kanopy.com/kapi/licenses/widevine/{license_id}"

297
KOWP/__init__.py Normal file
View File

@ -0,0 +1,297 @@
import json
import re
from http.cookiejar import CookieJar
from typing import Optional, List, Dict, Any
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.search_result import SearchResult
from unshackle.core.titles import Episode, Series, Title_T, Titles_T
from unshackle.core.tracks import Subtitle, Tracks
from unshackle.core.utilities import is_close_match
class KOWP(Service):
"""
Service code for Kocowa Plus (kocowa.com).
Version: 1.0.0
Auth: Credential (username + password)
Security: FHD@L3
"""
TITLE_RE = r"^(?:https?://(?:www\.)?kocowa\.com/[^/]+/season/)?(?P<title_id>\d+)"
GEOFENCE = ()
NO_SUBTITLES = False
@staticmethod
@click.command(name="kowp", short_help="https://www.kocowa.com")
@click.argument("title", type=str)
@click.option("--extras", is_flag=True, default=False, help="Include teasers/extras")
@click.pass_context
def cli(ctx, **kwargs):
return KOWP(ctx, **kwargs)
def __init__(self, ctx, title: str, extras: bool = False):
super().__init__(ctx)
match = re.match(self.TITLE_RE, title)
if match:
self.title_id = match.group("title_id")
else:
self.title_id = title # fallback to use as search keyword
self.include_extras = extras
self.brightcove_account_id = None
self.brightcove_pk = None
self.cdm = ctx.obj.cdm
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential:
raise ValueError("KOWP requires username and password")
payload = {
"username": credential.username,
"password": credential.password,
"device_id": f"{credential.username}_browser",
"device_type": "browser",
"device_model": "Firefox",
"device_version": "firefox/143.0",
"push_token": None,
"app_version": "v4.0.16",
}
r = self.session.post(
self.config["endpoints"]["login"],
json=payload,
headers={"Authorization": "anonymous", "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
res = r.json()
if res.get("code") != "0000":
raise PermissionError(f"Login failed: {res.get('message')}")
self.access_token = res["object"]["access_token"]
r = self.session.post(
self.config["endpoints"]["middleware_auth"],
json={"token": f"wA-Auth.{self.access_token}"},
headers={"Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
self.middleware_token = r.json()["token"]
self._fetch_brightcove_config()
def _fetch_brightcove_config(self):
"""Fetch Brightcove account_id and policy_key from Kocowa's public config endpoint."""
try:
r = self.session.get(
"https://middleware.bcmw.kocowa.com/api/config",
headers={
"Origin": "https://www.kocowa.com",
"Referer": "https://www.kocowa.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0"
}
)
r.raise_for_status()
config = r.json()
self.brightcove_account_id = config.get("VC_ACCOUNT_ID")
self.brightcove_pk = config.get("BCOV_POLICY_KEY")
if not self.brightcove_account_id:
raise ValueError("VC_ACCOUNT_ID missing in /api/config response")
if not self.brightcove_pk:
raise ValueError("BCOV_POLICY_KEY missing in /api/config response")
self.log.info(f"Brightcove config loaded: account_id={self.brightcove_account_id}")
except Exception as e:
raise RuntimeError(f"Failed to fetch or parse Brightcove config: {e}")
def get_titles(self) -> Titles_T:
all_episodes = []
offset = 0
limit = 20
series_title = None # Store the title from the first request
while True:
url = self.config["endpoints"]["metadata"].format(title_id=self.title_id)
sep = "&" if "?" in url else "?"
url += f"{sep}offset={offset}&limit={limit}"
r = self.session.get(
url,
headers={"Authorization": self.access_token, "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
data = r.json()["object"]
# Extract the series title only from the very first page
if series_title is None and "meta" in data:
series_title = data["meta"]["title"]["en"]
page_objects = data.get("next_episodes", {}).get("objects", [])
if not page_objects:
break
for ep in page_objects:
is_episode = ep.get("detail_type") == "episode"
is_extra = ep.get("detail_type") in ("teaser", "extra")
if is_episode or (self.include_extras and is_extra):
all_episodes.append(ep)
offset += limit
total = data.get("next_episodes", {}).get("total_count", 0)
if len(all_episodes) >= total or len(page_objects) < limit:
break
# If we never got the series title, exit with an error
if series_title is None:
raise ValueError("Could not retrieve series metadata to get the title.")
episodes = []
for ep in all_episodes:
meta = ep["meta"]
ep_type = "Episode" if ep["detail_type"] == "episode" else ep["detail_type"].capitalize()
ep_num = meta.get("episode_number", 0)
title = meta["title"].get("en") or f"{ep_type} {ep_num}"
desc = meta["description"].get("en") or ""
episodes.append(
Episode(
id_=str(ep["id"]),
service=self.__class__,
title=series_title,
season=meta.get("season_number", 1),
number=ep_num,
name=title,
description=desc,
year=None,
language=Language.get("en"),
data=ep,
)
)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# Authorize playback
r = self.session.post(
self.config["endpoints"]["authorize"].format(episode_id=title.id),
headers={"Authorization": f"Bearer {self.middleware_token}"}
)
r.raise_for_status()
auth_data = r.json()
if not auth_data.get("Success"):
raise PermissionError("Playback authorization failed")
self.playback_token = auth_data["token"]
# Fetch Brightcove manifest
manifest_url = (
f"https://edge.api.brightcove.com/playback/v1/accounts/{self.brightcove_account_id}/videos/ref:{title.id}"
)
r = self.session.get(
manifest_url,
headers={"Accept": f"application/json;pk={self.brightcove_pk}"}
)
r.raise_for_status()
manifest = r.json()
# Get DASH URL + Widevine license
dash_url = widevine_url = None
for src in manifest.get("sources", []):
if src.get("type") == "application/dash+xml":
dash_url = src["src"]
widevine_url = (
src.get("key_systems", {})
.get("com.widevine.alpha", {})
.get("license_url")
)
if dash_url and widevine_url:
break
if not dash_url or not widevine_url:
raise ValueError("No Widevine DASH stream found")
self.widevine_license_url = widevine_url
tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language)
for sub in manifest.get("text_tracks", []):
srclang = sub.get("srclang")
if not srclang or srclang == "thumbnails":
continue
subtitle_track = Subtitle(
id_=sub["id"],
url=sub["src"],
codec=Subtitle.Codec.WebVTT,
language=Language.get(srclang),
sdh=True, # Kocowa subs are SDH - mark them as such
forced=False,
)
tracks.add(subtitle_track)
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
r = self.session.post(
self.widevine_license_url,
data=challenge,
headers={
"BCOV-Auth": self.playback_token,
"Content-Type": "application/octet-stream",
"Origin": "https://www.kocowa.com",
"Referer": "https://www.kocowa.com/",
}
)
r.raise_for_status()
return r.content
def search(self) -> List[SearchResult]:
url = "https://prod-fms.kocowa.com/api/v01/fe/gks/autocomplete"
params = {
"search_category": "All",
"search_input": self.title_id,
"include_webtoon": "true",
}
r = self.session.get(
url,
params=params,
headers={
"Authorization": self.access_token,
"Origin": "https://www.kocowa.com ",
"Referer": "https://www.kocowa.com/ ",
}
)
r.raise_for_status()
response = r.json()
contents = response.get("object", {}).get("contents", [])
results = []
for item in contents:
if item.get("detail_type") != "season":
continue
meta = item["meta"]
title_en = meta["title"].get("en") or "[No Title]"
description_en = meta["description"].get("en") or ""
show_id = str(item["id"])
results.append(
SearchResult(
id_=show_id,
title=title_en,
description=description_en,
label="season",
url=f"https://www.kocowa.com/en_us/season/{show_id}/"
)
)
return results
def get_chapters(self, title: Title_T) -> list:
return []

5
KOWP/config.yaml Normal file
View File

@ -0,0 +1,5 @@
endpoints:
login: "https://prod-sgwv3.kocowa.com/api/v01/user/signin"
middleware_auth: "https://middleware.bcmw.kocowa.com/authenticate-user"
metadata: "https://prod-fms.kocowa.com/api/v01/fe/content/get?id={title_id}"
authorize: "https://middleware.bcmw.kocowa.com/api/playback/authorize/{episode_id}"

396
MUBI/__init__.py Normal file
View File

@ -0,0 +1,396 @@
import json
import re
import uuid
from http.cookiejar import CookieJar
from typing import Optional, Generator
from langcodes import Language
import base64
import click
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Title_T, Titles_T, Series
from unshackle.core.tracks import Chapter, Tracks, Subtitle
class MUBI(Service):
"""
Service code for MUBI (mubi.com)
Version: 1.2.0
Authorization: Required cookies (lt token + session)
Security: FHD @ L3 (Widevine)
Supports:
Series https://mubi.com/en/nl/series/twin-peaks
Movies https://mubi.com/en/nl/films/the-substance
"""
SERIES_TITLE_RE = r"^https?://(?:www\.)?mubi\.com(?:/[^/]+)*?/series/(?P<series_slug>[^/]+)(?:/season/(?P<season_slug>[^/]+))?$"
TITLE_RE = r"^(?:https?://(?:www\.)?mubi\.com)(?:/[^/]+)*?/films/(?P<slug>[^/?#]+)$"
NO_SUBTITLES = False
@staticmethod
@click.command(name="MUBI", short_help="https://mubi.com")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return MUBI(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
m_film = re.match(self.TITLE_RE, title)
m_series = re.match(self.SERIES_TITLE_RE, title)
if not m_film and not m_series:
raise ValueError(f"Invalid MUBI URL: {title}")
self.is_series = bool(m_series)
self.slug = m_film.group("slug") if m_film else None
self.series_slug = m_series.group("series_slug") if m_series else None
self.season_slug = m_series.group("season_slug") if m_series else None
self.film_id: Optional[int] = None
self.lt_token: Optional[str] = None
self.session_token: Optional[str] = None
self.user_id: Optional[int] = None
self.country_code: Optional[str] = None
self.anonymous_user_id: Optional[str] = None
self.default_country: Optional[str] = None
self.reels_data: Optional[list] = None
# Store CDM reference
self.cdm = ctx.obj.cdm
if self.config is None:
raise EnvironmentError("Missing service config for MUBI.")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
try:
r_ip = self.session.get(self.config["endpoints"]["ip_geolocation"], timeout=5)
r_ip.raise_for_status()
ip_data = r_ip.json()
if ip_data.get("country"):
self.default_country = ip_data["country"]
self.log.debug(f"Detected country from IP: {self.default_country}")
else:
self.log.warning("IP geolocation response did not contain a country code.")
except Exception as e:
raise ValueError(f"Failed to fetch IP geolocation: {e}")
if not cookies:
raise PermissionError("MUBI requires login cookies.")
# Extract essential tokens
lt_cookie = next((c for c in cookies if c.name == "lt"), None)
session_cookie = next((c for c in cookies if c.name == "_mubi_session"), None)
snow_id_cookie = next((c for c in cookies if c.name == "_snow_id.c006"), None)
if not lt_cookie:
raise PermissionError("Missing 'lt' cookie (Bearer token).")
if not session_cookie:
raise PermissionError("Missing '_mubi_session' cookie.")
self.lt_token = lt_cookie.value
self.session_token = session_cookie.value
# Extract anonymous_user_id from _snow_id.c006
if snow_id_cookie and "." in snow_id_cookie.value:
self.anonymous_user_id = snow_id_cookie.value.split(".")[0]
else:
self.anonymous_user_id = str(uuid.uuid4())
self.log.warning(f"No _snow_id.c006 cookie found — generated new anonymous_user_id: {self.anonymous_user_id}")
base_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Firefox/143.0",
"Origin": "https://mubi.com",
"Referer": "https://mubi.com/",
"CLIENT": "web",
"Client-Accept-Video-Codecs": "h265,vp9,h264",
"Client-Accept-Audio-Codecs": "aac",
"Authorization": f"Bearer {self.lt_token}",
"ANONYMOUS_USER_ID": self.anonymous_user_id,
"Client-Country": self.default_country,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
self.session.headers.update(base_headers)
r_account = self.session.get(self.config["endpoints"]["account"])
if not r_account.ok:
raise PermissionError(f"Failed to fetch MUBI account: {r_account.status_code} {r_account.text}")
account_data = r_account.json()
self.user_id = account_data.get("id")
self.country_code = (account_data.get("country") or {}).get("code", "NL")
self.session.headers["Client-Country"] = self.country_code
self.GEOFENCE = (self.country_code,)
self._bind_anonymous_user()
self.log.info(
f"Authenticated as user {self.user_id}, "
f"country: {self.country_code}, "
f"anonymous_id: {self.anonymous_user_id}"
)
def _bind_anonymous_user(self):
try:
r = self.session.put(
self.config["endpoints"]["current_user"],
json={"anonymous_user_uuid": self.anonymous_user_id},
headers={"Content-Type": "application/json"}
)
if r.ok:
self.log.debug("Anonymous user ID successfully bound to account.")
else:
self.log.warning(f"Failed to bind anonymous_user_uuid: {r.status_code}")
except Exception as e:
self.log.warning(f"Exception while binding anonymous_user_uuid: {e}")
def get_titles(self) -> Titles_T:
if self.is_series:
return self._get_series_titles()
else:
return self._get_film_title()
def _get_film_title(self) -> Movies:
url = self.config["endpoints"]["film_by_slug"].format(slug=self.slug)
r = self.session.get(url)
r.raise_for_status()
data = r.json()
self.film_id = data["id"]
# Fetch reels to get definitive language code and cache the response
url_reels = self.config["endpoints"]["reels"].format(film_id=self.film_id)
r_reels = self.session.get(url_reels)
r_reels.raise_for_status()
self.reels_data = r_reels.json()
# Extract original language from the first audio track of the first reel
original_language_code = "en" # Default fallback
if self.reels_data and self.reels_data[0].get("audio_tracks"):
first_audio_track = self.reels_data[0]["audio_tracks"][0]
if "language_code" in first_audio_track:
original_language_code = first_audio_track["language_code"]
self.log.debug(f"Detected original language from reels: '{original_language_code}'")
genres = ", ".join(data.get("genres", [])) or "Unknown"
description = (
data.get("default_editorial_html", "")
.replace("<p>", "").replace("</p>", "").replace("<em>", "").replace("</em>", "").strip()
)
year = data.get("year")
name = data.get("title", "Unknown")
movie = Movie(
id_=self.film_id,
service=self.__class__,
name=name,
year=year,
description=description,
language=Language.get(original_language_code),
data=data,
)
return Movies([movie])
def _get_series_titles(self) -> Titles_T:
# Fetch series metadata
series_url = self.config["endpoints"]["series"].format(series_slug=self.series_slug)
r_series = self.session.get(series_url)
r_series.raise_for_status()
series_data = r_series.json()
episodes = []
# If season is explicitly specified, only fetch that season
if self.season_slug:
eps_url = self.config["endpoints"]["season_episodes"].format(
series_slug=self.series_slug,
season_slug=self.season_slug
)
r_eps = self.session.get(eps_url)
if r_eps.status_code == 404:
raise ValueError(f"Season '{self.season_slug}' not found.")
r_eps.raise_for_status()
episodes_data = r_eps.json().get("episodes", [])
self._add_episodes_to_list(episodes, episodes_data, series_data)
else:
# No season specified fetch ALL seasons
seasons = series_data.get("seasons", [])
if not seasons:
raise ValueError("No seasons found for this series.")
for season in seasons:
season_slug = season["slug"]
eps_url = self.config["endpoints"]["season_episodes"].format(
series_slug=self.series_slug,
season_slug=season_slug
)
self.log.debug(f"Fetching episodes for season: {season_slug}")
r_eps = self.session.get(eps_url)
# Stop if season returns 404 or empty
if r_eps.status_code == 404:
self.log.info(f"Season '{season_slug}' not available, skipping.")
continue
r_eps.raise_for_status()
episodes_data = r_eps.json().get("episodes", [])
if not episodes_data:
self.log.info(f"No episodes found in season '{season_slug}'.")
continue
self._add_episodes_to_list(episodes, episodes_data, series_data)
from unshackle.core.titles import Series
return Series(sorted(episodes, key=lambda x: (x.season, x.number)))
def _add_episodes_to_list(self, episodes_list: list, episodes_data: list, series_data: dict):
"""Helper to avoid code duplication when adding episodes."""
for ep in episodes_data:
# Use episode's own language detection via its consumable.playback_languages
playback_langs = ep.get("consumable", {}).get("playback_languages", {})
audio_langs = playback_langs.get("audio_options", ["English"])
lang_code = audio_langs[0].split()[0].lower() if audio_langs else "en"
try:
detected_lang = Language.get(lang_code)
except:
detected_lang = Language.get("en")
episodes_list.append(Episode(
id_=ep["id"],
service=self.__class__,
title=series_data["title"], # Series title
season=ep["episode"]["season_number"],
number=ep["episode"]["number"],
name=ep["title"], # Episode title
description=ep.get("short_synopsis", ""),
language=detected_lang,
data=ep, # Full episode data for later use in get_tracks
))
def get_tracks(self, title: Title_T) -> Tracks:
film_id = getattr(title, "id", None)
if not film_id:
raise RuntimeError("Title ID not found.")
# For series episodes, we don't have reels cached, so skip reel-based logic
url_view = self.config["endpoints"]["initiate_viewing"].format(film_id=film_id)
r_view = self.session.post(url_view, json={}, headers={"Content-Type": "application/json"})
r_view.raise_for_status()
view_data = r_view.json()
reel_id = view_data["reel_id"]
# For films, use reels data for language/audio mapping
if not self.is_series:
if not self.film_id:
raise RuntimeError("film_id not set. Call get_titles() first.")
if not self.reels_data:
self.log.warning("Reels data not cached, fetching now.")
url_reels = self.config["endpoints"]["reels"].format(film_id=film_id)
r_reels = self.session.get(url_reels)
r_reels.raise_for_status()
reels = r_reels.json()
else:
reels = self.reels_data
reel = next((r for r in reels if r["id"] == reel_id), reels[0])
else:
# For episodes, we dont need reel-based logic — just proceed
pass
# Request secure streaming URL, works for both films and episodes
url_secure = self.config["endpoints"]["secure_url"].format(film_id=film_id)
r_secure = self.session.get(url_secure)
r_secure.raise_for_status()
secure_data = r_secure.json()
manifest_url = None
for entry in secure_data.get("urls", []):
if entry.get("content_type") == "application/dash+xml":
manifest_url = entry["src"]
break
if not manifest_url:
raise ValueError("No DASH manifest URL found.")
# Parse DASH, use title.language as fallback
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
# Add subtitles
subtitles = []
for sub in secure_data.get("text_track_urls", []):
lang_code = sub.get("language_code", "und")
vtt_url = sub.get("url")
if not vtt_url:
continue
is_original = lang_code == title.language.language
subtitles.append(
Subtitle(
id_=sub["id"],
url=vtt_url,
language=Language.get(lang_code),
is_original_lang=is_original,
codec=Subtitle.Codec.WebVTT,
name=sub.get("display_name", lang_code.upper()),
forced=False,
sdh=False,
)
)
tracks.subtitles = subtitles
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.user_id:
raise RuntimeError("user_id not set — authenticate first.")
dt_custom_data = {
"userId": self.user_id,
"sessionId": self.lt_token,
"merchant": "mubi"
}
dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode()
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"Accept": "*/*",
"Origin": "https://mubi.com",
"Referer": "https://mubi.com/",
"dt-custom-data": dt_custom_data_b64,
}
r = self.session.post(
self.config["endpoints"]["license"],
data=challenge,
headers=headers,
)
r.raise_for_status()
license_data = r.json()
if license_data.get("status") != "OK":
raise PermissionError(f"DRM license error: {license_data}")
return base64.b64decode(license_data["license"])

12
MUBI/config.yaml Normal file
View File

@ -0,0 +1,12 @@
endpoints:
account: "https://api.mubi.com/v4/account"
current_user: "https://api.mubi.com/v4/current_user"
film_by_slug: "https://api.mubi.com/v4/films/{slug}"
playback_languages: "https://api.mubi.com/v4/films/{film_id}/playback_languages"
initiate_viewing: "https://api.mubi.com/v4/films/{film_id}/viewing?parental_lock_enabled=true"
reels: "https://api.mubi.com/v4/films/{film_id}/reels"
secure_url: "https://api.mubi.com/v4/films/{film_id}/viewing/secure_url"
license: "https://lic.drmtoday.com/license-proxy-widevine/cenc/"
ip_geolocation: "https://directory.cookieyes.com/api/v1/ip"
series: "https://api.mubi.com/v4/series/{series_slug}"
season_episodes: "https://api.mubi.com/v4/series/{series_slug}/seasons/{season_slug}/episodes/available"

View File

@ -17,21 +17,21 @@ from unshackle.core.tracks import Chapter, Tracks, Subtitle
class NPO(Service):
"""
Service code for NPO Start (npo.nl)
Version: 1.0.0
Version: 1.1.0
Authorization: optional cookies (free/paid content supported)
Security: FHD @ L3 (Widevine)
Security: FHD @ L3
FHD @ SL3000
(Widevine and PlayReady support)
Supports:
Series https://npo.nl/start/serie/{slug}
Movies https://npo.nl/start/video/{slug}
Only supports widevine at the moment
Note: Movie that is inside in a series (e.g.
https://npo.nl/start/serie/zappbios/.../zappbios-captain-nova/afspelen)
can be downloaded as movies by converting the URL to:
https://npo.nl/start/video/zappbios-captain-nova
Note: Movie inside a series can be downloaded as movie by converting URL to:
https://npo.nl/start/video/slug
To change between Widevine and Playready, you need to change the DrmType in config.yaml to either widevine or playready
"""
TITLE_RE = (
@ -68,6 +68,9 @@ class NPO(Service):
if self.config is None:
raise EnvironmentError("Missing service config.")
# Store CDM reference
self.cdm = ctx.obj.cdm
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
@ -165,7 +168,6 @@ class NPO(Service):
if not product_id:
raise ValueError("no productId detected.")
# Get JWT
token_url = self.config["endpoints"]["player_token"].format(product_id=product_id)
r_tok = self.session.get(token_url, headers={"Referer": f"https://npo.nl/start/video/{self.slug}"})
r_tok.raise_for_status()
@ -176,7 +178,7 @@ class NPO(Service):
self.config["endpoints"]["streams"],
json={
"profileName": "dash",
"drmType": "widevine",
"drmType": self.config["DrmType"],
"referrerUrl": f"https://npo.nl/start/video/{self.slug}",
"ster": {"identifier": "npo-app-desktop", "deviceType": 4, "player": "web"},
},
@ -205,12 +207,17 @@ class NPO(Service):
# Subtitles
subtitles = []
for sub in data.get("assets", {}).get("subtitles", []):
for sub in (data.get("assets", {}) or {}).get("subtitles", []) or []:
if not isinstance(sub, dict):
continue
lang = sub.get("iso", "und")
location = sub.get("location")
if not location:
continue # skip if no URL provided
subtitles.append(
Subtitle(
id_=sub.get("name", lang),
url=sub["location"].strip(),
url=location.strip(),
language=Language.get(lang),
is_original_lang=lang == "nl",
codec=Subtitle.Codec.WebVTT,
@ -233,9 +240,14 @@ class NPO(Service):
for tr in tracks.videos + tracks.audio:
if getattr(tr, "drm", None):
tr.drm.license = lambda challenge, **kw: self.get_widevine_license(
challenge=challenge, title=title, track=tr
)
if drm_type == "playready":
tr.drm.license = lambda challenge, **kw: self.get_playready_license(
challenge=challenge, title=title, track=tr
)
else:
tr.drm.license = lambda challenge, **kw: self.get_widevine_license(
challenge=challenge, title=title, track=tr
)
return tracks
@ -244,11 +256,34 @@ class NPO(Service):
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.drm_token:
raise ValueError("DRM token not set login or paid content may be required.")
raise ValueError("DRM token not set, login or paid content may be required.")
r = self.session.post(
self.config["endpoints"]["widevine_license"],
self.config["endpoints"]["license"],
params={"custom_data": self.drm_token},
data=challenge,
)
r.raise_for_status()
return r.content
return r.content
def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.drm_token:
raise ValueError("DRM token not set, login or paid content may be required.")
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense",
"Origin": "https://npo.nl",
"Referer": "https://npo.nl/",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
),
}
r = self.session.post(
self.config["endpoints"]["license"],
params={"custom_data": self.drm_token},
data=challenge,
headers=headers,
)
r.raise_for_status()
return r.content

View File

@ -1,8 +1,10 @@
endpoints:
metadata: "https://npo.nl/start/_next/data/{build_id}/video/{slug}.json"
metadata_series: "https://npo.nl/start/_next/data/{build_id}/serie/{slug}.json"
metadata_series: "https://npo.nl/start/_next/data/{build_id}/serie/{slug}/afleveringen.json"
metadata_episode: "https://npo.nl/start/_next/data/{build_id}/serie/{series_slug}/seizoen-{season_slug}/{episode_slug}.json"
streams: "https://prod.npoplayer.nl/stream-link"
player_token: "https://npo.nl/start/api/domain/player-token?productId={product_id}"
widevine_license: "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
license: "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
homepage: "https://npo.nl/start"
search: " https://npo.nl/start/api/domain/search-collection-items"
DrmType: "widevine"

View File

@ -1,4 +1,35 @@
These services is new and in development. Please feel free to submit pull requests for any mistakes or suggestions.
Acknowledgment
Thanks to Adef for the NPO start downloader.
# These services is new and in development. Please feel free to submit pull requests or issue a ticket for any mistakes or suggestions.
### If you have personal questions or want to request a service, DM me at discord (jerukpurut)
- Roadmap:
1. NPO:
- To add search functionality
- More accurate metadata (the year of showing is not according the year of release)
- Have a automatic CDM recognition option instead of the user puts it manually in the config for drmType
2. KOWP:
- Audio mislabel as English
- To add Playready Support
3. PTHS
- To add Playready Support (is needed since L3 is just 480p)
- Search Functionality
- Account login if possible
4. HIDI
- Subtitle is a bit misplace if second sentences came up making the last sentence on the first order and vice versa (needs to be fixed)
5. MUBI
- Search Functionality
6. VIKI
- CSRF Token is now scraped, would be from a api requests soon
7. VIDO
- Support of paid content since right now it supports free ones only
- Search functionality not available yet
8. KNPY
- Need to fix the search function
- Acknowledgment
Thanks to Adef for the NPO start downloader.

215
VIDO/__init__.py Normal file
View File

@ -0,0 +1,215 @@
import re
import uuid
from typing import Optional
from http.cookiejar import CookieJar
from langcodes import Language
import click
from unshackle.core.search_result import SearchResult
from unshackle.core.credential import Credential
from unshackle.core.manifests import HLS
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks
class VIDO(Service):
"""
Vidio.com service, Series and Movies, login required.
Version: 1.3.0
Supports URLs like:
https://www.vidio.com/premier/2978/giligilis (Series)
https://www.vidio.com/watch/7454613-marantau-short-movie (Movie)
Note: Login is mandatory. Even free content requires valid session tokens
for stream access (as per API behavior).
"""
# Updated regex to support both series and movies
TITLE_RE = r"^https?://(?:www\.)?vidio\.com/(?:premier|series|watch)/(?P<id>\d+)"
NO_SUBTITLES = True
@staticmethod
@click.command(name="VIDO", short_help="https://vidio.com (login required)")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return VIDO(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
match = re.match(self.TITLE_RE, title)
if not match:
raise ValueError(f"Unsupported or invalid Vidio URL: {title}")
self.content_id = match.group("id")
# Determine if it's a movie or series based on URL pattern
self.is_movie = "watch" in title
# Static app identifiers from Android traffic
self.API_AUTH = "laZOmogezono5ogekaso5oz4Mezimew1"
self.USER_AGENT = "vidioandroid/7.14.6-e4d1de87f2 (3191683)"
self.API_APP_INFO = "android/15/7.14.6-e4d1de87f2-3191683"
self.VISITOR_ID = str(uuid.uuid4())
# Auth state
self._email = None
self._user_token = None
self._access_token = None
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential or not credential.username or not credential.password:
raise ValueError("Vidio requires email and password login.")
self._email = credential.username
password = credential.password
headers = {
"referer": "android-app://com.vidio.android",
"x-api-platform": "app-android",
"x-api-auth": self.API_AUTH,
"user-agent": self.USER_AGENT,
"x-api-app-info": self.API_APP_INFO,
"accept-language": "en",
"content-type": "application/x-www-form-urlencoded",
"x-visitor-id": self.VISITOR_ID,
}
data = f"login={self._email}&password={password}"
r = self.session.post("https://api.vidio.com/api/login", headers=headers, data=data)
r.raise_for_status()
auth_data = r.json()
self._user_token = auth_data["auth"]["authentication_token"]
self._access_token = auth_data["auth_tokens"]["access_token"]
self.log.info(f"Authenticated as {self._email}")
def _headers(self):
if not self._user_token or not self._access_token:
raise RuntimeError("Not authenticated. Call authenticate() first.")
return {
"referer": "android-app://com.vidio.android",
"x-api-platform": "app-android",
"x-api-auth": self.API_AUTH,
"user-agent": self.USER_AGENT,
"x-api-app-info": self.API_APP_INFO,
"x-visitor-id": self.VISITOR_ID,
"x-user-email": self._email,
"x-user-token": self._user_token,
"x-authorization": self._access_token,
"accept-language": "en",
"accept": "application/json",
"accept-charset": "UTF-8",
"content-type": "application/vnd.api+json",
}
def get_titles(self) -> Titles_T:
headers = self._headers()
if self.is_movie:
# For movies, we need to get video details directly
r = self.session.get(f"https://api.vidio.com/api/videos/{self.content_id}/detail", headers=headers)
r.raise_for_status()
video_data = r.json()["video"]
# Extract year from publish_date if available
year = None
if video_data.get("publish_date"):
try:
year = int(video_data["publish_date"][:4])
except (ValueError, TypeError):
pass
return Movies([
Movie(
id_=video_data["id"],
service=self.__class__,
name=video_data["title"],
description=video_data.get("description", ""),
year=year,
language=Language.get("id"),
data=video_data,
)
])
else:
# For series, use the existing logic
r = self.session.get(f"https://api.vidio.com/content_profiles/{self.content_id}", headers=headers)
r.raise_for_status()
root = r.json()["data"]
series_title = root["attributes"]["title"]
playlists = root["relationships"]["playlists"]["data"]
if not playlists:
raise ValueError("No season/playlist found for this series.")
playlist_id = playlists[0]["id"]
# Fetch all episodes
episodes = []
page = 1
while True:
r_eps = self.session.get(
f"https://api.vidio.com/content_profiles/{self.content_id}/playlists/{playlist_id}/videos",
params={"page[number]": page, "page[size]": 20, "sort": "order", "included": "upcoming_videos"},
headers=headers,
)
r_eps.raise_for_status()
page_data = r_eps.json()
for raw_ep in page_data["data"]:
attrs = raw_ep["attributes"]
episodes.append(
Episode(
id_=int(raw_ep["id"]),
service=self.__class__,
title=series_title,
season=1,
number=len(episodes) + 1,
name=attrs["title"],
description=attrs.get("description", ""),
language=Language.get("id"),
data=raw_ep,
)
)
if not page_data["links"].get("next"):
break
page += 1
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
headers = self._headers()
headers.update({
"x-device-brand": "samsung",
"x-device-model": "SM-A525F",
"x-device-form-factor": "phone",
"x-device-soc": "Qualcomm SM7125",
"x-device-os": "Android 15 (API 35)",
"x-device-android-mpc": "0",
"x-device-cpu-arch": "arm64-v8a",
})
# Use the correct ID attribute based on title type
video_id = str(title.id_) if hasattr(title, 'id_') else str(title.id)
r = self.session.get(
f"https://api.vidio.com/api/stream/v1/video_data/{video_id}?initialize=true",
headers=headers,
)
r.raise_for_status()
stream = r.json()
hls_url = stream.get("stream_hls_url")
if not hls_url:
raise ValueError("Stream URL not available. Possibly geo-blocked or subscription required.")
return HLS.from_url(hls_url, session=self.session).to_tracks(language=title.language)
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def search(self):
raise NotImplementedError("Search not implemented for Vidio.")

5
VIDO/config.yaml Normal file
View File

@ -0,0 +1,5 @@
endpoints:
content_profile: "https://api.vidio.com/content_profiles/{content_id}"
playlists: "https://api.vidio.com/content_profiles/{content_id}/playlists"
playlist_videos: "https://api.vidio.com/content_profiles/{content_id}/playlists/{playlist_id}/videos"
stream: "https://api.vidio.com/api/stream/v1/video_data/{video_id}?initialize=true"

328
VIKI/__init__.py Normal file
View File

@ -0,0 +1,328 @@
import base64
import json
import os
import re
from http.cookiejar import CookieJar
from typing import Optional, Generator
import click
from unshackle.core.search_result import SearchResult
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Movie, Movies, Series, Episode, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle
from unshackle.core.drm import Widevine
from langcodes import Language
class VIKI(Service):
"""
Service code for Rakuten Viki (viki.com)
Version: 1.4.0
Authorization: Required cookies (_viki_session, device_id).
Security: FHD @ L3 (Widevine)
Supports:
Movies and TV Series
"""
TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com)?/(?:movies|tv)/(?P<id>\d+c)-.+$"
GEOFENCE = ()
NO_SUBTITLES = False
@staticmethod
@click.command(name="VIKI", short_help="https://viki.com")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return VIKI(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
m = re.match(self.TITLE_RE, title)
if not m:
self.search_term = title
self.title_url = None
return
self.container_id = m.group("id")
self.title_url = title
self.video_id: Optional[str] = None
self.api_access_key: Optional[str] = None
self.drm_license_url: Optional[str] = None
self.cdm = ctx.obj.cdm
if self.config is None:
raise EnvironmentError("Missing service config for VIKI.")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise PermissionError("VIKI requires a cookie file for authentication.")
session_cookie = next((c for c in cookies if c.name == "_viki_session"), None)
device_cookie = next((c for c in cookies if c.name == "device_id"), None)
if not session_cookie or not device_cookie:
raise PermissionError("Your cookie file is missing '_viki_session' or 'device_id'.")
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"X-Viki-App-Ver": "14.64.0",
"X-Viki-Device-ID": device_cookie.value,
"Origin": "https://www.viki.com",
"Referer": "https://www.viki.com/",
})
self.log.info("VIKI authentication cookies loaded successfully.")
def get_titles(self) -> Titles_T:
if not self.title_url:
raise ValueError("No URL provided to process.")
self.log.debug(f"Scraping page for API access key: {self.title_url}")
r_page = self.session.get(self.title_url)
r_page.raise_for_status()
match = re.search(r'"token":"([^"]+)"', r_page.text)
if not match:
raise RuntimeError("Failed to extract API access key from page source.")
self.api_access_key = match.group(1)
self.log.debug(f"Extracted API access key: {self.api_access_key[:10]}...")
url = self.config["endpoints"]["container"].format(container_id=self.container_id)
params = {
"app": self.config["params"]["app"],
"token": self.api_access_key,
}
r = self.session.get(url, params=params)
r.raise_for_status()
data = r.json()
content_type = data.get("type")
if content_type == "film":
return self._parse_movie(data)
elif content_type == "series":
return self._parse_series(data)
else:
self.log.error(f"Unknown content type '{content_type}' found.")
return Movies([])
def _parse_movie(self, data: dict) -> Movies:
name = data.get("titles", {}).get("en", "Unknown Title")
year = int(data["created_at"][:4]) if "created_at" in data else None
description = data.get("descriptions", {}).get("en", "")
original_lang_code = data.get("origin", {}).get("language", "en")
self.video_id = data.get("watch_now", {}).get("id")
if not self.video_id:
raise ValueError(f"Could not find a playable video ID for container {self.container_id}.")
return Movies([
Movie(
id_=self.container_id,
service=self.__class__,
name=name,
year=year,
description=description,
language=Language.get(original_lang_code),
data=data,
)
])
def _parse_series(self, data: dict) -> Series:
"""Parse series metadata and fetch episodes."""
series_name = data.get("titles", {}).get("en", "Unknown Title")
year = int(data["created_at"][:4]) if "created_at" in data else None
description = data.get("descriptions", {}).get("en", "")
original_lang_code = data.get("origin", {}).get("language", "en")
self.log.info(f"Parsing series: {series_name}")
# Fetch episode list IDs
episodes_url = self.config["endpoints"]["episodes"].format(container_id=self.container_id)
params = {
"app": self.config["params"]["app"],
"token": self.api_access_key,
"direction": "asc",
"with_upcoming": "true",
"sort": "number",
"blocked": "true",
"only_ids": "true"
}
r = self.session.get(episodes_url, params=params)
r.raise_for_status()
episodes_data = r.json()
episode_ids = episodes_data.get("response", [])
self.log.info(f"Found {len(episode_ids)} episodes")
episodes = []
for idx, ep_id in enumerate(episode_ids, 1):
# Fetch individual episode metadata
ep_url = self.config["endpoints"]["episode_meta"].format(video_id=ep_id)
ep_params = {
"app": self.config["params"]["app"],
"token": self.api_access_key,
}
try:
r_ep = self.session.get(ep_url, params=ep_params)
r_ep.raise_for_status()
ep_data = r_ep.json()
ep_number = ep_data.get("number", idx)
ep_title = ep_data.get("titles", {}).get("en", "")
ep_description = ep_data.get("descriptions", {}).get("en", "")
# If no episode title, use generic name
if not ep_title:
ep_title = f"Episode {ep_number}"
# Store the video_id in the data dict
ep_data["video_id"] = ep_id
self.log.debug(f"Episode {ep_number}: {ep_title} ({ep_id})")
episodes.append(
Episode(
id_=ep_id,
service=self.__class__,
title=series_name, # Series title
season=1, # VIKI typically doesn't separate seasons clearly
number=ep_number,
name=ep_title, # Episode title
description=ep_description,
language=Language.get(original_lang_code),
data=ep_data
)
)
except Exception as e:
self.log.warning(f"Failed to fetch episode {ep_id}: {e}")
# Create a basic episode entry even if metadata fetch fails
episodes.append(
Episode(
id_=ep_id,
service=self.__class__,
title=series_name,
season=1,
number=idx,
name=f"Episode {idx}",
description="",
language=Language.get(original_lang_code),
data={"video_id": ep_id} # Store video_id in data
)
)
# Return Series with just the episodes list
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# For episodes, get the video_id from the data dict
if isinstance(title, Episode):
self.video_id = title.data.get("video_id")
if not self.video_id:
# Fallback to episode id if video_id not in data
self.video_id = title.data.get("id")
elif not self.video_id:
raise RuntimeError("video_id not set. Call get_titles() first.")
if not self.video_id:
raise ValueError("Could not determine video_id for this title")
self.log.info(f"Getting tracks for video ID: {self.video_id}")
url = self.config["endpoints"]["playback"].format(video_id=self.video_id)
r = self.session.get(url)
r.raise_for_status()
data = r.json()
# Get the DRM-protected manifest from queue
manifest_url = None
for item in data.get("queue", []):
if item.get("type") == "video" and item.get("format") == "mpd":
manifest_url = item.get("url")
break
if not manifest_url:
raise ValueError("No DRM-protected manifest URL found in queue")
self.log.debug(f"Found DRM-protected manifest URL: {manifest_url}")
# Create headers for manifest download
manifest_headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"Accept": "*/*",
"Accept-Language": "en",
"Accept-Encoding": "gzip, deflate, br, zstd",
"X-Viki-App-Ver": "14.64.0",
"X-Viki-Device-ID": self.session.headers.get("X-Viki-Device-ID", ""),
"Origin": "https://www.viki.com",
"Referer": "https://www.viki.com/",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
# Parse tracks from the DRM-protected manifest
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
# Subtitles
title_language = title.language.language
subtitles = []
for sub in data.get("subtitles", []):
sub_url = sub.get("src")
lang_code = sub.get("srclang")
if not sub_url or not lang_code:
continue
subtitles.append(
Subtitle(
id_=lang_code,
url=sub_url,
language=Language.get(lang_code),
is_original_lang=lang_code == title_language,
codec=Subtitle.Codec.WebVTT,
name=sub.get("label", lang_code.upper()).split(" (")[0]
)
)
tracks.subtitles = subtitles
# Store DRM license URL (only dt3) at service level
drm_b64 = data.get("drm")
if drm_b64:
drm_data = json.loads(base64.b64decode(drm_b64))
self.drm_license_url = drm_data.get("dt3") # Use dt3 as requested
else:
self.log.warning("No DRM info found, assuming unencrypted stream.")
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not hasattr(self, 'drm_license_url') or not self.drm_license_url:
raise ValueError("DRM license URL not available.")
r = self.session.post(
self.drm_license_url,
data=challenge,
headers={"Content-type": "application/octet-stream"}
)
r.raise_for_status()
return r.content
def search(self) -> Generator[SearchResult, None, None]:
self.log.warning("Search not yet implemented for VIKI.")
return
yield
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []

8
VIKI/config.yaml Normal file
View File

@ -0,0 +1,8 @@
params:
app: "100000a"
endpoints:
container: "https://api.viki.io/v4/containers/{container_id}.json"
episodes: "https://api.viki.io/v4/series/{container_id}/episodes.json" # New
episode_meta: "https://api.viki.io/v4/videos/{video_id}.json" # New
playback: "https://www.viki.com/api/videos/{video_id}"
search: "https://api.viki.io/v4/search/all.json"