import re import uuid import base64 from typing import Optional, Union from http.cookiejar import CookieJar from langcodes import Language import click from unshackle.core.search_result import SearchResult from unshackle.core.credential import Credential from unshackle.core.manifests import HLS, DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Tracks from unshackle.core.constants import AnyTrack from datetime import datetime, timezone class VIDO(Service): """ Vidio.com service, Series and Movies, login required. Version: 2.1.0 Supports URLs like: • https://www.vidio.com/premier/2978/giligilis (Series) • https://www.vidio.com/watch/7454613-marantau-short-movie (Movie) Security: HD@L3 (Widevine DRM when available) Note: Login is mandatory. Even free content requires valid session tokens for stream access (as per API behavior). """ # Updated regex to support both series and movies TITLE_RE = r"^https?://(?:www\.)?vidio\.com/(?:premier|series|watch)/(?P\d+)" NO_SUBTITLES = True GEOFENCE = ("ID",) @staticmethod @click.command(name="VIDO", short_help="https://vidio.com (login required)") @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return VIDO(ctx, **kwargs) def __init__(self, ctx, title: str): super().__init__(ctx) match = re.match(self.TITLE_RE, title) if not match: raise ValueError(f"Unsupported or invalid Vidio URL: {title}") self.content_id = match.group("id") # Determine if it's a movie or series based on URL pattern self.is_movie = "watch" in title # Static app identifiers from Android traffic self.API_AUTH = "laZOmogezono5ogekaso5oz4Mezimew1" self.USER_AGENT = "vidioandroid/7.14.6-e4d1de87f2 (3191683)" self.API_APP_INFO = "android/15/7.14.6-e4d1de87f2-3191683" self.VISITOR_ID = str(uuid.uuid4()) # Auth state self._email = None self._user_token = None self._access_token = None # DRM state self.license_url = None self.custom_data = None self.cdm = ctx.obj.cdm def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: if not credential or not credential.username or not credential.password: raise ValueError("Vidio requires email and password login.") self._email = credential.username password = credential.password # Define a unique key for this user's authentication tokens cache_key = f"auth_tokens_{self._email}" # Get a specific cache object for this key cache = self.cache.get(cache_key) # Check if valid tokens are already in the cache if cache and not cache.expired: self.log.info("Using cached authentication tokens") cached_data = cache.data self._user_token = cached_data.get("user_token") self._access_token = cached_data.get("access_token") # If tokens were successfully loaded, we're done if self._user_token and self._access_token: return # If no valid cache, proceed with login self.log.info("Authenticating with username and password") headers = { "referer": "android-app://com.vidio.android", "x-api-platform": "app-android", "x-api-auth": self.API_AUTH, "user-agent": self.USER_AGENT, "x-api-app-info": self.API_APP_INFO, "accept-language": "en", "content-type": "application/x-www-form-urlencoded", "x-visitor-id": self.VISITOR_ID, } data = f"login={self._email}&password={password}" r = self.session.post("https://api.vidio.com/api/login", headers=headers, data=data) r.raise_for_status() auth_data = r.json() self._user_token = auth_data["auth"]["authentication_token"] self._access_token = auth_data["auth_tokens"]["access_token"] self.log.info(f"Authenticated as {self._email}") try: expires_at_str = auth_data["auth_tokens"]["access_token_expires_at"] expires_at_dt = datetime.fromisoformat(expires_at_str) now_utc = datetime.now(timezone.utc) expiration_in_seconds = max(0, int((expires_at_dt - now_utc).total_seconds())) self.log.info(f"Token expires in {expiration_in_seconds / 60:.2f} minutes. Caching for this duration.") except (KeyError, ValueError) as e: self.log.warning(f"Could not parse token expiration time from API: {e}. Defaulting to 1 hour.") expiration_in_seconds = 3600 # Fallback to 1 hour cache.set({ "user_token": self._user_token, "access_token": self._access_token }, expiration=expiration_in_seconds) def _headers(self): if not self._user_token or not self._access_token: raise RuntimeError("Not authenticated. Call authenticate() first.") return { "referer": "android-app://com.vidio.android", "x-api-platform": "app-android", "x-api-auth": self.API_AUTH, "user-agent": self.USER_AGENT, "x-api-app-info": self.API_APP_INFO, "x-visitor-id": self.VISITOR_ID, "x-user-email": self._email, "x-user-token": self._user_token, "x-authorization": self._access_token, "accept-language": "en", "accept": "application/json", "accept-charset": "UTF-8", "content-type": "application/vnd.api+json", } def get_titles(self) -> Titles_T: headers = self._headers() if self.is_movie: r = self.session.get(f"https://api.vidio.com/api/videos/{self.content_id}/detail", headers=headers) r.raise_for_status() video_data = r.json()["video"] year = None if video_data.get("publish_date"): try: year = int(video_data["publish_date"][:4]) except (ValueError, TypeError): pass return Movies([ Movie( id_=video_data["id"], service=self.__class__, name=video_data["title"], description=video_data.get("description", ""), year=year, language=Language.get("id"), data=video_data, ) ]) else: # Fetch the main content profile r = self.session.get(f"https://api.vidio.com/content_profiles/{self.content_id}", headers=headers) r.raise_for_status() root = r.json()["data"] series_title = root["attributes"]["title"] # Fetch all playlists (seasons + extras) r_playlists = self.session.get( f"https://api.vidio.com/content_profiles/{self.content_id}/playlists", headers=headers ) r_playlists.raise_for_status() playlists_data = r_playlists.json() # Use metadata to identify season playlists season_playlist_ids = set() if "meta" in playlists_data and "playlist_group" in playlists_data["meta"]: for group in playlists_data["meta"]["playlist_group"]: if group.get("type") == "season": season_playlist_ids.update(group.get("playlist_ids", [])) # If no metadata, fall back to name-based detection season_playlists = [] for pl in playlists_data["data"]: playlist_id = int(pl["id"]) name = pl["attributes"]["name"].lower() # Use metadata if available, otherwise use name matching if season_playlist_ids: if playlist_id in season_playlist_ids: season_playlists.append(pl) else: # Fallback: match "season" but exclude "trailer" and "extra" if ("season" in name or name == "episode" or name == "episodes") and \ "trailer" not in name and "extra" not in name: season_playlists.append(pl) if not season_playlists: raise ValueError("No season playlists found for this series.") # Sort seasons and extract season numbers def extract_season_number(pl): name = pl["attributes"]["name"] # Try to extract number after "Season" match = re.search(r"season\s*(\d+)", name, re.IGNORECASE) if match: return int(match.group(1)) # If it's just "Season" or "Episodes", treat as Season 1 elif name.lower() in ["season", "episodes", "episode"]: return 1 else: return 0 season_playlists.sort(key=extract_season_number) all_episodes = [] for playlist in season_playlists: playlist_id = playlist["id"] season_number = extract_season_number(playlist) # If season_number is 0, default to 1 if season_number == 0: season_number = 1 self.log.debug(f"Processing playlist '{playlist['attributes']['name']}' as Season {season_number}") page = 1 while True: r_eps = self.session.get( f"https://api.vidio.com/content_profiles/{self.content_id}/playlists/{playlist_id}/videos", params={ "page[number]": page, "page[size]": 20, "sort": "order", "included": "upcoming_videos" }, headers=headers, ) r_eps.raise_for_status() page_data = r_eps.json() for raw_ep in page_data["data"]: attrs = raw_ep["attributes"] # Count episodes within the same season ep_number = len([e for e in all_episodes if e.season == season_number]) + 1 all_episodes.append( Episode( id_=int(raw_ep["id"]), service=self.__class__, title=series_title, season=season_number, number=ep_number, name=attrs["title"], description=attrs.get("description", ""), language=Language.get("id"), data=raw_ep, ) ) if not page_data["links"].get("next"): break page += 1 if not all_episodes: raise ValueError("No episodes found in any season.") return Series(all_episodes) def get_tracks(self, title: Title_T) -> Tracks: headers = self._headers() headers.update({ "x-device-brand": "samsung", "x-device-model": "SM-A525F", "x-device-form-factor": "phone", "x-device-soc": "Qualcomm SM7125", "x-device-os": "Android 15 (API 35)", "x-device-android-mpc": "0", "x-device-cpu-arch": "arm64-v8a", "x-device-platform": "android", "x-app-version": "7.14.6-e4d1de87f2-3191683", }) video_id = str(title.id) url = f"https://api.vidio.com/api/stream/v1/video_data/{video_id}?initialize=true" r = self.session.get(url, headers=headers) r.raise_for_status() stream = r.json() # Safety check: ensure stream is a valid dict if not isinstance(stream, dict): raise ValueError("Vidio returned invalid stream data (not a JSON object). " "Content may be geo-blocked, subscription-restricted, or session expired.") custom_data = stream.get("custom_data") or {} license_servers = stream.get("license_servers") or {} widevine_data = custom_data.get("widevine") if isinstance(custom_data, dict) else None license_url = license_servers.get("drm_license_url") if isinstance(license_servers, dict) else None dash_url = stream.get("stream_dash_url") has_valid_drm = bool(widevine_data and license_url and dash_url and isinstance(widevine_data, str)) if has_valid_drm: self.log.info("Widevine DRM detected, using DASH") self.custom_data = widevine_data self.license_url = license_url tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language) else: # Prefer HLS for non-DRM (more reliable metadata, avoids frame_rate=None) self.log.info("No valid Widevine DRM, using HLS") hls_url = stream.get("stream_hls_url") or stream.get("stream_token_hls_url") if hls_url: self.log.debug(f"HLS URL: {hls_url}") tracks = HLS.from_url(hls_url, session=self.session).to_tracks(language=title.language) else: # Last resort: non-DRM DASH (e.g., VP9), but warn user dash_url = stream.get("stream_token_dash_url") if dash_url: self.log.warning("HLS unavailable, falling back to non-DRM DASH (may lack frame rate metadata)") tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language) else: raise ValueError( "No playable stream (HLS or DASH) available. " "This episode may be restricted, unavailable, or require a higher subscription tier." ) self.log.info(f"Found {len(tracks.videos)} video tracks, {len(tracks.audio)} audio tracks") return tracks def get_chapters(self, title: Title_T) -> list[Chapter]: return [] def search(self): raise NotImplementedError("Search not implemented for Vidio.") def get_widevine_service_certificate(self, **_) -> Union[bytes, str, None]: return None def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: if not self.license_url or not self.custom_data: raise ValueError("DRM license info missing.") headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0", "Referer": "https://www.vidio.com/", "Origin": "https://www.vidio.com", "pallycon-customdata-v2": self.custom_data, "Content-Type": "application/octet-stream", } self.log.debug(f"Requesting Widevine license from: {self.license_url}") response = self.session.post( self.license_url, data=challenge, headers=headers ) if not response.ok: error_summary = response.text[:200] if response.text else "No response body" raise Exception(f"License request failed ({response.status_code}): {error_summary}") return response.content