Compare commits

..

No commits in common. "main" and "main" have entirely different histories.
main ... main

5 changed files with 20 additions and 340 deletions

View File

@ -1,273 +0,0 @@
import json
import re
from http.cookiejar import CookieJar
from typing import Optional, List, Dict, Any
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.search_result import SearchResult
from unshackle.core.titles import Episode, Series, Title_T, Titles_T
from unshackle.core.tracks import Subtitle, Tracks
class KOWP(Service):
"""
Service code for Kocowa Plus (kocowa.com).
Version: 1.0.0
Auth: Credential (username + password)
Security: FHD@L3
Note:
Brightcove account_id and policy key (pk) are configurable per region.
Put the custom Brightcove account ID and policy key on the config.yaml if the following doesn't work
"""
TITLE_RE = r"^(?:https?://(?:www\.)?kocowa\.com/[^/]+/season/)?(?P<title_id>\d+)"
GEOFENCE = ("US", "CA", "PA")
NO_SUBTITLES = False
@staticmethod
@click.command(name="kowp", short_help="https://www.kocowa.com")
@click.argument("title", type=str)
@click.option("--extras", is_flag=True, default=False, help="Include teasers/extras")
@click.pass_context
def cli(ctx, **kwargs):
return KOWP(ctx, **kwargs)
def __init__(self, ctx, title: str, extras: bool = False):
super().__init__(ctx)
if not self.config:
raise EnvironmentError("Missing KOWP config")
match = re.match(self.TITLE_RE, title)
if not match:
raise ValueError("Invalid Kocowa title ID or URL")
self.title_id = match.group("title_id")
self.include_extras = extras
# Load Brightcove config
bc_conf = self.config.get("brightcove", {})
self.brightcove_account_id = bc_conf.get("account_id", "6154734805001")
self.brightcove_pk = bc_conf.get("policy_key", "BCpkADawqM1FKrSBim1gusdFR73Prfums__ZmQ7uJ4yCRqv-RKrq2HtZIkVOn4gsmPdAqO007VNtaKJCmg0Uu1rpuUnjnP-f9OPklQ9l2-HS_F_sJXgT96KpUahg9XNukAraAlob6XDuoecD")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential:
raise ValueError("KOWP requires username and password")
payload = {
"username": credential.username,
"password": credential.password,
"device_id": f"{credential.username}_browser",
"device_type": "browser",
"device_model": "Firefox",
"device_version": "firefox/143.0",
"push_token": None,
"app_version": "v4.0.16",
}
r = self.session.post(
self.config["endpoints"]["login"],
json=payload,
headers={"Authorization": "anonymous", "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
res = r.json()
if res.get("code") != "0000":
raise PermissionError(f"Login failed: {res.get('message')}")
self.access_token = res["object"]["access_token"]
r = self.session.post(
self.config["endpoints"]["middleware_auth"],
json={"token": f"wA-Auth.{self.access_token}"},
headers={"Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
self.middleware_token = r.json()["token"]
def get_titles(self) -> Titles_T:
all_episodes = []
offset = 0
limit = 20
while True:
url = self.config["endpoints"]["metadata"].format(title_id=self.title_id)
sep = "&" if "?" in url else "?"
url += f"{sep}offset={offset}&limit={limit}"
r = self.session.get(
url,
headers={"Authorization": self.access_token, "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
data = r.json()["object"]
page_objects = data.get("next_episodes", {}).get("objects", [])
if not page_objects:
break
for ep in page_objects:
is_episode = ep.get("detail_type") == "episode"
is_extra = ep.get("detail_type") in ("teaser", "extra")
if is_episode or (self.include_extras and is_extra):
all_episodes.append(ep)
offset += limit
total = data.get("next_episodes", {}).get("total_count", 0)
if len(all_episodes) >= total or len(page_objects) < limit:
break
episodes = []
series_title = data["meta"]["title"].get("en") or "Unknown"
for ep in all_episodes:
meta = ep["meta"]
ep_type = "Episode" if ep["detail_type"] == "episode" else ep["detail_type"].capitalize()
ep_num = meta.get("episode_number", 0)
title = meta["title"].get("en") or f"{ep_type} {ep_num}"
desc = meta["description"].get("en") or ""
episodes.append(
Episode(
id_=str(ep["id"]),
service=self.__class__,
title=series_title,
season=meta.get("season_number", 1),
number=ep_num,
name=title,
description=desc,
year=None,
language=Language.get("en"),
data=ep,
)
)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# Authorize playback
r = self.session.post(
self.config["endpoints"]["authorize"].format(episode_id=title.id),
headers={"Authorization": f"Bearer {self.middleware_token}"}
)
r.raise_for_status()
auth_data = r.json()
if not auth_data.get("Success"):
raise PermissionError("Playback authorization failed")
self.playback_token = auth_data["token"]
# Fetch Brightcove manifest
manifest_url = (
f"https://edge.api.brightcove.com/playback/v1/accounts/{self.brightcove_account_id}/videos/ref:{title.id}"
)
r = self.session.get(
manifest_url,
headers={"Accept": f"application/json;pk={self.brightcove_pk}"}
)
r.raise_for_status()
manifest = r.json()
# Get DASH URL + Widevine license
dash_url = widevine_url = None
for src in manifest.get("sources", []):
if src.get("type") == "application/dash+xml":
dash_url = src["src"]
widevine_url = (
src.get("key_systems", {})
.get("com.widevine.alpha", {})
.get("license_url")
)
if dash_url and widevine_url:
break
if not dash_url or not widevine_url:
raise ValueError("No Widevine DASH stream found")
self.widevine_license_url = widevine_url
tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language)
# Add ALL subtitles from manifest
for sub in manifest.get("text_tracks", []):
srclang = sub.get("srclang")
if not srclang or srclang == "thumbnails":
continue
tracks.add(
Subtitle(
id_=sub["id"],
url=sub["src"],
codec=Subtitle.Codec.WebVTT,
language=Language.get(srclang),
sdh=True,
)
)
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
r = self.session.post(
self.widevine_license_url,
data=challenge,
headers={
"BCOV-Auth": self.playback_token,
"Content-Type": "application/octet-stream",
"Origin": "https://www.kocowa.com",
"Referer": "https://www.kocowa.com/",
}
)
r.raise_for_status()
return r.content
# def search(self) -> List[SearchResult]:
# if not hasattr(self, 'title_id') or not isinstance(self.title_id, str):
# query = getattr(self, 'title', '') # fallback if title_id isn't set yet
# else:
# query = self.title_id
#
# url = "https://prod-fms.kocowa.com/api/v01/fe/gks/autocomplete"
# params = {
# "search_category": "All",
# "search_input": query,
# "include_webtoon": "true",
# }
#
# r = self.session.get(
# url,
# params=params,
# headers={
# "Authorization": self.access_token,
# "Origin": "https://www.kocowa.com ",
# "Referer": "https://www.kocowa.com/ ",
# }
# )
# r.raise_for_status()
# response = r.json()
# contents = response.get("object", {}).get("contents", [])
#
# results = []
# for item in contents:
# if item.get("detail_type") != "season":
# continue # skip non-season items (e.g., actors, webtoons)
#
# meta = item["meta"]
# title_en = meta["title"].get("en") or "[No Title]"
# description_en = meta["description"].get("en") or ""
# show_id = str(item["id"])
#
# results.append(
# SearchResult(
# id_=show_id,
# title=title_en,
# description=description_en,
# label="season",
# url=f"https://www.kocowa.com/en_us/season/{show_id}/placeholder"
# )
# )
# return results
def get_chapters(self, title: Title_T) -> list:
return []

View File

@ -1,9 +0,0 @@
brightcove:
account_id: "6154734805001"
policy_key: "BCpkADawqM1FKrSBim1gusdFR73Prfums__ZmQ7uJ4yCRqv-RKrq2HtZIkVOn4gsmPdAqO007VNtaKJCmg0Uu1rpuUnjnP-f9OPklQ9l2-HS_F_sJXgT96KpUahg9XNukAraAlob6XDuoecD"
endpoints:
login: "https://prod-sgwv3.kocowa.com/api/v01/user/signin"
middleware_auth: "https://middleware.bcmw.kocowa.com/authenticate-user"
metadata: "https://prod-fms.kocowa.com/api/v01/fe/content/get?id={title_id}"
authorize: "https://middleware.bcmw.kocowa.com/api/playback/authorize/{episode_id}"

View File

@ -17,21 +17,21 @@ from unshackle.core.tracks import Chapter, Tracks, Subtitle
class NPO(Service):
"""
Service code for NPO Start (npo.nl)
Version: 1.1.0
Version: 1.0.0
Authorization: optional cookies (free/paid content supported)
Security: FHD @ L3
FHD @ SL3000
(Widevine and PlayReady support)
Security: FHD @ L3 (Widevine)
Supports:
Series https://npo.nl/start/serie/{slug}
Movies https://npo.nl/start/video/{slug}
Only supports widevine at the moment
Note: Movie inside a series can be downloaded as movie by converting URL to:
https://npo.nl/start/video/slug
To change between Widevine and Playready, you need to change the DrmType in config.yaml to either widevine or playready
Note: Movie that is inside in a series (e.g.
https://npo.nl/start/serie/zappbios/.../zappbios-captain-nova/afspelen)
can be downloaded as movies by converting the URL to:
https://npo.nl/start/video/zappbios-captain-nova
"""
TITLE_RE = (
@ -68,9 +68,6 @@ class NPO(Service):
if self.config is None:
raise EnvironmentError("Missing service config.")
# Store CDM reference
self.cdm = ctx.obj.cdm
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
@ -168,6 +165,7 @@ class NPO(Service):
if not product_id:
raise ValueError("no productId detected.")
# Get JWT
token_url = self.config["endpoints"]["player_token"].format(product_id=product_id)
r_tok = self.session.get(token_url, headers={"Referer": f"https://npo.nl/start/video/{self.slug}"})
r_tok.raise_for_status()
@ -178,7 +176,7 @@ class NPO(Service):
self.config["endpoints"]["streams"],
json={
"profileName": "dash",
"drmType": self.config["DrmType"],
"drmType": "widevine",
"referrerUrl": f"https://npo.nl/start/video/{self.slug}",
"ster": {"identifier": "npo-app-desktop", "deviceType": 4, "player": "web"},
},
@ -207,17 +205,12 @@ class NPO(Service):
# Subtitles
subtitles = []
for sub in (data.get("assets", {}) or {}).get("subtitles", []) or []:
if not isinstance(sub, dict):
continue
for sub in data.get("assets", {}).get("subtitles", []):
lang = sub.get("iso", "und")
location = sub.get("location")
if not location:
continue # skip if no URL provided
subtitles.append(
Subtitle(
id_=sub.get("name", lang),
url=location.strip(),
url=sub["location"].strip(),
language=Language.get(lang),
is_original_lang=lang == "nl",
codec=Subtitle.Codec.WebVTT,
@ -240,14 +233,9 @@ class NPO(Service):
for tr in tracks.videos + tracks.audio:
if getattr(tr, "drm", None):
if drm_type == "playready":
tr.drm.license = lambda challenge, **kw: self.get_playready_license(
challenge=challenge, title=title, track=tr
)
else:
tr.drm.license = lambda challenge, **kw: self.get_widevine_license(
challenge=challenge, title=title, track=tr
)
tr.drm.license = lambda challenge, **kw: self.get_widevine_license(
challenge=challenge, title=title, track=tr
)
return tracks
@ -256,34 +244,11 @@ class NPO(Service):
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.drm_token:
raise ValueError("DRM token not set, login or paid content may be required.")
raise ValueError("DRM token not set login or paid content may be required.")
r = self.session.post(
self.config["endpoints"]["license"],
self.config["endpoints"]["widevine_license"],
params={"custom_data": self.drm_token},
data=challenge,
)
r.raise_for_status()
return r.content
def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.drm_token:
raise ValueError("DRM token not set, login or paid content may be required.")
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense",
"Origin": "https://npo.nl",
"Referer": "https://npo.nl/",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
),
}
r = self.session.post(
self.config["endpoints"]["license"],
params={"custom_data": self.drm_token},
data=challenge,
headers=headers,
)
r.raise_for_status()
return r.content
return r.content

View File

@ -4,6 +4,5 @@ endpoints:
metadata_episode: "https://npo.nl/start/_next/data/{build_id}/serie/{series_slug}/seizoen-{season_slug}/{episode_slug}.json"
streams: "https://prod.npoplayer.nl/stream-link"
player_token: "https://npo.nl/start/api/domain/player-token?productId={product_id}"
license: "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
widevine_license: "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
homepage: "https://npo.nl/start"
DrmType: "widevine"

View File

@ -1,6 +1,4 @@
These services is new and in development. Please feel free to submit pull requests for any mistakes or suggestions.
- Acknowledgment
Acknowledgment
Thanks to Adef for the NPO start downloader.