import json import re from http.cookiejar import CookieJar from typing import Optional from langcodes import Language import click from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Tracks, Subtitle class NPO(Service): """ Service code for NPO Start (npo.nl) Version: 1.1.0 Authorization: optional cookies (free/paid content supported) Security: FHD @ L3 FHD @ SL3000 (Widevine and PlayReady support) Supports: • Series ↦ https://npo.nl/start/serie/{slug} • Movies ↦ https://npo.nl/start/video/{slug} Note: Movie inside a series can be downloaded as movie by converting URL to: https://npo.nl/start/video/slug To change between Widevine and Playready, you need to change the DrmType in config.yaml to either widevine or playready """ TITLE_RE = ( r"^(?:https?://(?:www\.)?npo\.nl/start/)?" r"(?:(?Pvideo|serie)/(?P[^/]+)" r"(?:/afleveringen)?" r"(?:/seizoen-(?P[^/]+)/(?P[^/]+)/afspelen)?)?$" ) GEOFENCE = ("NL",) NO_SUBTITLES = False @staticmethod @click.command(name="NPO", short_help="https://npo.nl") @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return NPO(ctx, **kwargs) def __init__(self, ctx, title: str): super().__init__(ctx) m = re.match(self.TITLE_RE, title) if not m: raise ValueError( f"Unsupported NPO URL: {title}\n" "Use /video/slug for movies or /serie/slug for series." ) self.slug = m.group("slug") self.kind = m.group("type") or "video" self.season_slug = m.group("season") self.episode_slug = m.group("episode") if self.config is None: raise EnvironmentError("Missing service config.") # Store CDM reference self.cdm = ctx.obj.cdm def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not cookies: self.log.info("No cookies, proceeding anonymously.") return token = next((c.value for c in cookies if c.name == "__Secure-next-auth.session-token"), None) if not token: self.log.info("No session token, proceeding unauthenticated.") return self.session.headers.update({ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Firefox/143.0", "Origin": "https://npo.nl", "Referer": "https://npo.nl/", }) r = self.session.get("https://npo.nl/start/api/domain/user-profiles", cookies=cookies) if r.ok and isinstance(r.json(), list) and r.json(): self.log.info(f"NPO login OK, profiles: {[p['name'] for p in r.json()]}") else: self.log.warning("NPO auth check failed.") def _get_build_id(self, slug: str) -> str: """Fetch buildId from the actual video/series page.""" url = f"https://npo.nl/start/{'video' if self.kind == 'video' else 'serie'}/{slug}" r = self.session.get(url) r.raise_for_status() match = re.search(r'', r.text, re.DOTALL) if not match: raise RuntimeError("Failed to extract __NEXT_DATA__") data = json.loads(match.group(1)) return data["buildId"] def get_titles(self) -> Titles_T: build_id = self._get_build_id(self.slug) if self.kind == "serie": url = self.config["endpoints"]["metadata_series"].format(build_id=build_id, slug=self.slug) else: url = self.config["endpoints"]["metadata"].format(build_id=build_id, slug=self.slug) resp = self.session.get(url) resp.raise_for_status() queries = resp.json()["pageProps"]["dehydratedState"]["queries"] def get_data(fragment: str): return next((q["state"]["data"] for q in queries if fragment in str(q.get("queryKey", ""))), None) if self.kind == "serie": series_data = get_data("series:detail-") if not series_data: raise ValueError("Series metadata not found") episodes = [] seasons = get_data("series:seasons-") or [] for season in seasons: eps = get_data(f"programs:season-{season['guid']}") or [] for e in eps: episodes.append( Episode( id_=e["guid"], service=self.__class__, title=series_data["title"], season=int(season["seasonKey"]), number=int(e["programKey"]), name=e["title"], description=(e.get("synopsis", {}) or {}).get("long", ""), language=Language.get("nl"), data=e, ) ) return Series(episodes) # Movie item = get_data("program:detail-") or queries[0]["state"]["data"] synopsis = item.get("synopsis", {}) desc = synopsis.get("long") or synopsis.get("short", "") if isinstance(synopsis, dict) else str(synopsis) year = (int(item["firstBroadcastDate"]) // 31536000 + 1970) if item.get("firstBroadcastDate") else None return Movies([ Movie( id_=item["guid"], service=self.__class__, name=item["title"], description=desc, year=year, language=Language.get("nl"), data=item, ) ]) def get_tracks(self, title: Title_T) -> Tracks: product_id = title.data.get("productId") if not product_id: raise ValueError("no productId detected.") token_url = self.config["endpoints"]["player_token"].format(product_id=product_id) r_tok = self.session.get(token_url, headers={"Referer": f"https://npo.nl/start/video/{self.slug}"}) r_tok.raise_for_status() jwt = r_tok.json()["jwt"] # Request stream r_stream = self.session.post( self.config["endpoints"]["streams"], json={ "profileName": "dash", "drmType": self.config["DrmType"], "referrerUrl": f"https://npo.nl/start/video/{self.slug}", "ster": {"identifier": "npo-app-desktop", "deviceType": 4, "player": "web"}, }, headers={ "Authorization": jwt, "Content-Type": "application/json", "Origin": "https://npo.nl", "Referer": f"https://npo.nl/start/video/{self.slug}", }, ) r_stream.raise_for_status() data = r_stream.json() if "error" in data: raise PermissionError(f"Stream error: {data['error']}") stream = data["stream"] manifest_url = stream.get("streamURL") or stream.get("url") if not manifest_url: raise ValueError("No stream URL in response") is_unencrypted = "unencrypted" in manifest_url.lower() or not any(k in stream for k in ["drmToken", "token"]) # Parse DASH tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language) # Subtitles subtitles = [] for sub in (data.get("assets", {}) or {}).get("subtitles", []) or []: if not isinstance(sub, dict): continue lang = sub.get("iso", "und") location = sub.get("location") if not location: continue # skip if no URL provided subtitles.append( Subtitle( id_=sub.get("name", lang), url=location.strip(), language=Language.get(lang), is_original_lang=lang == "nl", codec=Subtitle.Codec.WebVTT, name=sub.get("name", "Unknown"), forced=False, sdh=False, ) ) tracks.subtitles = subtitles # DRM if is_unencrypted: for tr in tracks.videos + tracks.audio: if hasattr(tr, "drm") and tr.drm: tr.drm.clear() else: self.drm_token = stream.get("drmToken") or stream.get("token") or stream.get("drm_token") if not self.drm_token: raise ValueError(f"No DRM token found. Available keys: {list(stream.keys())}") for tr in tracks.videos + tracks.audio: if getattr(tr, "drm", None): if drm_type == "playready": tr.drm.license = lambda challenge, **kw: self.get_playready_license( challenge=challenge, title=title, track=tr ) else: tr.drm.license = lambda challenge, **kw: self.get_widevine_license( challenge=challenge, title=title, track=tr ) return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: return [] def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.drm_token: raise ValueError("DRM token not set, login or paid content may be required.") r = self.session.post( self.config["endpoints"]["license"], params={"custom_data": self.drm_token}, data=challenge, ) r.raise_for_status() return r.content def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.drm_token: raise ValueError("DRM token not set, login or paid content may be required.") headers = { "Content-Type": "text/xml; charset=utf-8", "SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense", "Origin": "https://npo.nl", "Referer": "https://npo.nl/", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0" ), } r = self.session.post( self.config["endpoints"]["license"], params={"custom_data": self.drm_token}, data=challenge, headers=headers, ) r.raise_for_status() return r.content