import json import re import uuid from http.cookiejar import CookieJar from typing import Optional, Generator from langcodes import Language import base64 import click from unshackle.core.constants import AnyTrack from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.credential import Credential from unshackle.core.titles import Episode, Movie, Movies, Title_T, Titles_T, Series from unshackle.core.tracks import Chapter, Tracks, Subtitle from unshackle.core.search_result import SearchResult class MUBI(Service): """ Service code for MUBI (mubi.com) Version: 1.2.1 (Cookie-only + Auto-UHD + Search) Authorization: Cookies ONLY (lt token + _mubi_session) Security: UHD @ L3/SL2K (Widevine/PlayReady) Supports: • Series ↦ https://mubi.com/en/nl/series/twin-peaks • Movies ↦ https://mubi.com/en/nl/films/the-substance """ SERIES_TITLE_RE = r"^https?://(?:www\.)?mubi\.com(?:/[^/]+)*?/series/(?P[^/]+)(?:/season/(?P[^/]+))?$" TITLE_RE = r"^(?:https?://(?:www\.)?mubi\.com)(?:/[^/]+)*?/films/(?P[^/?#]+)$" NO_SUBTITLES = False @staticmethod @click.command(name="MUBI", short_help="https://mubi.com ") @click.argument("title", type=str) @click.option("-c", "--country", default=None, type=str, help="With VPN set country code other than the one assigned to the account.") @click.pass_context def cli(ctx, **kwargs): return MUBI(ctx, **kwargs) def __init__(self, ctx, title: str, country: str): super().__init__(ctx) self.raw_title = title # Store raw input for search mode self.country = country # Only parse as URL if it matches MUBI patterns m_film = re.match(self.TITLE_RE, title) m_series = re.match(self.SERIES_TITLE_RE, title) self.is_series = bool(m_series) self.slug = m_film.group("slug") if m_film else None self.series_slug = m_series.group("series_slug") if m_series else None self.season_slug = m_series.group("season_slug") if m_series else None # Core state self.film_id: Optional[int] = None self.lt_token: Optional[str] = None self.session_token: Optional[str] = None self.user_id: Optional[int] = None self.country_code: Optional[str] = None self.set_country_code: Optional[str] = country self.anonymous_user_id: Optional[str] = None self.default_country: Optional[str] = None self.reels_data: Optional[list] = None # ALWAYS enable UHD/HEVC path - no user flag required self.uhd = True self.cdm = ctx.obj.cdm if self.config is None: raise EnvironmentError("Missing service config for MUBI.") def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) if not cookies: raise PermissionError("MUBI requires login cookies (lt + _mubi_session). Credentials login is not supported.") # IP geolocation for country detection try: r_ip = self.session.get(self.config["endpoints"]["ip_geolocation"], timeout=5) r_ip.raise_for_status() ip_data = r_ip.json() if ip_data.get("country"): self.default_country = ip_data["country"] self.log.debug(f"Detected country from IP: {self.default_country}") else: self.log.warning("IP geolocation response did not contain a country code.") except Exception as e: raise ValueError(f"Failed to fetch IP geolocation: {e}") # Extract essential tokens from cookies lt_cookie = next((c for c in cookies if c.name == "lt"), None) session_cookie = next((c for c in cookies if c.name == "_mubi_session"), None) snow_id_cookie = next((c for c in cookies if c.name == "_snow_id.c006"), None) if not lt_cookie: raise PermissionError("Missing 'lt' cookie (Bearer token).") if not session_cookie: raise PermissionError("Missing '_mubi_session' cookie.") self.lt_token = lt_cookie.value self.session_token = session_cookie.value # Extract or generate anonymous_user_id if snow_id_cookie and "." in snow_id_cookie.value: self.anonymous_user_id = snow_id_cookie.value.split(".")[0] else: self.anonymous_user_id = str(uuid.uuid4()) self.log.warning(f"No _snow_id.c006 cookie found — generated new anonymous_user_id: {self.anonymous_user_id}") # Configure session headers for UHD access base_headers = { "User-Agent": "Mozilla/5.0 (Linux; Android 13; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", "Origin": "https://mubi.com", "Referer": "https://mubi.com/", "CLIENT": "web", "Client-Accept-Video-Codecs": "h265,vp9,h264", "Client-Accept-Audio-Codecs": "eac3,ac3,aac", "Authorization": f"Bearer {self.lt_token}", "ANONYMOUS_USER_ID": self.anonymous_user_id, "Client-Country": self.default_country, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", "Pragma": "no-cache", "Cache-Control": "no-cache", } self.session.headers.update(base_headers) # Fetch account info r_account = self.session.get(self.config["endpoints"]["account"]) if not r_account.ok: raise PermissionError(f"Failed to fetch MUBI account: {r_account.status_code} {r_account.text}") account_data = r_account.json() self.user_id = account_data.get("id") self.country_code = (account_data.get("country") or {}).get("code", "NL") if self.set_country_code is not None: self.country_code = self.set_country_code.upper() self.session.headers["Client-Country"] = self.country_code self.GEOFENCE = (self.country_code,) self._bind_anonymous_user() self.log.info( f"Authenticated as user {self.user_id}, " f"country: {self.country_code}, " f"anonymous_id: {self.anonymous_user_id}" ) def _bind_anonymous_user(self): try: r = self.session.put( self.config["endpoints"]["current_user"], json={"anonymous_user_uuid": self.anonymous_user_id}, headers={"Content-Type": "application/json"} ) if r.ok: self.log.debug("Anonymous user ID successfully bound to account.") else: self.log.warning(f"Failed to bind anonymous_user_uuid: {r.status_code}") except Exception as e: self.log.warning(f"Exception while binding anonymous_user_uuid: {e}") def get_titles(self) -> Titles_T: if self.is_series: return self._get_series_titles() else: return self._get_film_title() def _get_film_title(self) -> Movies: url = self.config["endpoints"]["film_by_slug"].format(slug=self.slug) r = self.session.get(url) r.raise_for_status() data = r.json() self.film_id = data["id"] # Fetch reels for language detection and subtitle names url_reels = self.config["endpoints"]["reels"].format(film_id=self.film_id) r_reels = self.session.get(url_reels) r_reels.raise_for_status() self.reels_data = r_reels.json() # Detect original language from first audio track original_language_code = "en" if self.reels_data and self.reels_data[0].get("audio_tracks"): first_audio_track = self.reels_data[0]["audio_tracks"][0] if "language_code" in first_audio_track: original_language_code = first_audio_track["language_code"] self.log.debug(f"Detected original language from reels: '{original_language_code}'") description = ( data.get("default_editorial_html", "") .replace("

", "").replace("

", "").replace("", "").replace("", "").strip() ) year = data.get("year") name = data.get("title", "Unknown") movie = Movie( id_=self.film_id, service=self.__class__, name=name, year=year, description=description, language=Language.get(original_language_code), data=data, ) return Movies([movie]) def _get_series_titles(self) -> Titles_T: series_url = self.config["endpoints"]["series"].format(series_slug=self.series_slug) r_series = self.session.get(series_url) r_series.raise_for_status() series_data = r_series.json() episodes = [] if self.season_slug: eps_url = self.config["endpoints"]["season_episodes"].format( series_slug=self.series_slug, season_slug=self.season_slug ) r_eps = self.session.get(eps_url) if r_eps.status_code == 404: raise ValueError(f"Season '{self.season_slug}' not found.") r_eps.raise_for_status() episodes_data = r_eps.json().get("episodes", []) self._add_episodes_to_list(episodes, episodes_data, series_data) else: seasons = series_data.get("seasons", []) if not seasons: raise ValueError("No seasons found for this series.") for season in seasons: season_slug = season["slug"] eps_url = self.config["endpoints"]["season_episodes"].format( series_slug=self.series_slug, season_slug=season_slug ) self.log.debug(f"Fetching episodes for season: {season_slug}") r_eps = self.session.get(eps_url) if r_eps.status_code == 404: self.log.info(f"Season '{season_slug}' not available, skipping.") continue r_eps.raise_for_status() episodes_data = r_eps.json().get("episodes", []) if not episodes_data: self.log.info(f"No episodes found in season '{season_slug}'.") continue self._add_episodes_to_list(episodes, episodes_data, series_data) return Series(sorted(episodes, key=lambda x: (x.season, x.number))) def _add_episodes_to_list(self, episodes_list: list, episodes_data: list, series_data: dict): for ep in episodes_data: playback_langs = ep.get("consumable", {}).get("playback_languages", {}) audio_langs = playback_langs.get("audio_options", ["English"]) lang_code = audio_langs[0].split()[0].lower() if audio_langs else "en" try: detected_lang = Language.get(lang_code) except: detected_lang = Language.get("en") episodes_list.append(Episode( id_=ep["id"], service=self.__class__, title=series_data["title"], season=ep["episode"]["season_number"], number=ep["episode"]["number"], name=ep["title"], description=ep.get("short_synopsis", ""), language=detected_lang, data=ep, )) def get_tracks(self, title: Title_T) -> Tracks: film_id = getattr(title, "id", None) if not film_id: raise RuntimeError("Title ID not found.") # Initiate viewing session url_view = self.config["endpoints"]["initiate_viewing"].format(film_id=film_id) r_view = self.session.post(url_view, json={}, headers={"Content-Type": "application/json"}) r_view.raise_for_status() view_data = r_view.json() reel_id = view_data["reel_id"] # Fetch reels data if not cached if not self.film_id: self.film_id = film_id if not self.reels_data: url_reels = self.config["endpoints"]["reels"].format(film_id=film_id) r_reels = self.session.get(url_reels) r_reels.raise_for_status() self.reels_data = r_reels.json() reels = self.reels_data text_tracks_reel = reels[0]["text_tracks"] reel = next((r for r in reels if r["id"] == reel_id), reels[0]) # Get secure streaming URL url_secure = self.config["endpoints"]["secure_url"].format(film_id=film_id) r_secure = self.session.get(url_secure) r_secure.raise_for_status() secure_data = r_secure.json() # Find DASH manifest URL manifest_url = None for entry in secure_data.get("urls", []): if entry.get("content_type") == "application/dash+xml": manifest_url = entry["src"] break if not manifest_url: raise ValueError("No DASH manifest URL found.") manifest_url = re.sub( r'/default/ver1\.AVC1\.[^/]*\.mpd', '/default/ver1.hevc.ex-vtt.mpd', manifest_url ) # Fallback for non-AVC URLs if '/default/ver1.hevc.ex-vtt.mpd' not in manifest_url: manifest_url = re.sub( r'/default/[^/]*\.mpd', '/default/ver1.hevc.ex-vtt.mpd', manifest_url ) # Parse DASH manifest tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language) # Add enhanced subtitles (forced/SDH detection) subtitles = [] for sub in secure_data.get("text_track_urls", []): lang_code = sub.get("language_code", "und") vtt_url = sub.get("url") role = sub.get("role") forced = False sdh = False if not vtt_url: continue try: disp_name = (next(filter(lambda x: x['id'] == sub["id"], text_tracks_reel), None))["display_name"] except: disp_name = sub.get("role", "") + " " + lang_code.upper() if role == "forced-subtitle": forced = True if role == "caption": sdh = True if "(SDH)" in disp_name: disp_name = disp_name.replace("(SDH)", "").strip() is_original = lang_code == title.language.language subtitles.append( Subtitle( id_=sub["id"], url=vtt_url, language=Language.get(lang_code), is_original_lang=is_original, codec=Subtitle.Codec.WebVTT, name=disp_name, forced=forced, sdh=sdh, ) ) tracks.subtitles = subtitles return tracks def search(self) -> Generator[SearchResult, None, None]: """ Search MUBI films using official API endpoint. Returns only playable films with proper metadata formatting. """ params = { "query": self.raw_title, "page": 1, "per_page": 24, "playable": "true", "all_films_on_zero_hits": "true" } response = self.session.get( url=self.config["endpoints"]["search"], params=params ) response.raise_for_status() results = response.json() for film in results.get("films", []): display_title = f"{film['title']} ({film['year']})" yield SearchResult( id_=film["id"], title=display_title, label="MOVIE", url=film["web_url"].rstrip() # Clean trailing spaces ) def get_chapters(self, title: Title_T) -> list[Chapter]: return [] def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.user_id: raise RuntimeError("user_id not set — authenticate first.") # Cookie-based license request (NO dtinfo - credentials removed) dt_custom_data = { "userId": self.user_id, "sessionId": self.lt_token, "merchant": "mubi" } dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode() headers = { "User-Agent": "Mozilla/5.0 (Linux; Android 13; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", "Accept": "*/*", "Origin": "https://mubi.com", "Referer": "https://mubi.com/", "dt-custom-data": dt_custom_data_b64, } r = self.session.post( self.config["endpoints"]["license"], data=challenge, headers=headers, ) r.raise_for_status() license_data = r.json() if license_data.get("status") != "OK": raise PermissionError(f"DRM license error: {license_data}") return base64.b64decode(license_data["license"]) def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.user_id: raise RuntimeError("user_id not set — authenticate first.") # Cookie-based PlayReady license request (NO dtinfo - credentials removed) dt_custom_data = { "userId": self.user_id, "sessionId": self.lt_token, "merchant": "mubi" } dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0", "Accept": "*/*", "Origin": "https://mubi.com", "Referer": "https://mubi.com/", "dt-custom-data": dt_custom_data_b64, } r = self.session.post( self.config["endpoints"]["license_pr"], data=challenge, headers=headers, ) r.raise_for_status() if r.status_code != 200: raise PermissionError(f"DRM license error") return r.content