unshackle-services/VIDO/__init__.py

445 lines
18 KiB
Python

import re
import uuid
import xml.etree.ElementTree as ET
from urllib.parse import urljoin
from hashlib import md5
from typing import Optional, Union
from http.cookiejar import CookieJar
from langcodes import Language
import click
from unshackle.core.credential import Credential
from unshackle.core.manifests import HLS, DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle
from unshackle.core.constants import AnyTrack
from datetime import datetime, timezone
class VIDO(Service):
"""
Vidio.com service, Series and Movies, login required.
Version: 2.2.0
Supports URLs like:
• https://www.vidio.com/premier/2978/giligilis (Series)
• https://www.vidio.com/watch/7454613-marantau-short-movie (Movie)
Security: HD@L3 (Widevine DRM when available)
"""
TITLE_RE = r"^https?://(?:www\.)?vidio\.com/(?:premier|series|watch)/(?P<id>\d+)"
GEOFENCE = ("ID",)
@staticmethod
@click.command(name="VIDO", short_help="https://vidio.com (login required)")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return VIDO(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
match = re.match(self.TITLE_RE, title)
if not match:
raise ValueError(f"Unsupported or invalid Vidio URL: {title}")
self.content_id = match.group("id")
self.is_movie = "watch" in title
# Static app identifiers from Android traffic
self.API_AUTH = "laZOmogezono5ogekaso5oz4Mezimew1"
self.USER_AGENT = "vidioandroid/7.14.6-e4d1de87f2 (3191683)"
self.API_APP_INFO = "android/15/7.14.6-e4d1de87f2-3191683"
self.VISITOR_ID = str(uuid.uuid4())
# Auth state
self._email = None
self._user_token = None
self._access_token = None
# DRM state
self.license_url = None
self.custom_data = None
self.cdm = ctx.obj.cdm
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential or not credential.username or not credential.password:
raise ValueError("Vidio requires email and password login.")
self._email = credential.username
password = credential.password
cache_key = f"auth_tokens_{self._email}"
cache = self.cache.get(cache_key)
# Check if valid tokens are already in the cache
if cache and not cache.expired:
self.log.info("Using cached authentication tokens")
cached_data = cache.data
self._user_token = cached_data.get("user_token")
self._access_token = cached_data.get("access_token")
if self._user_token and self._access_token:
return
# If no valid cache, proceed with login
self.log.info("Authenticating with username and password")
headers = {
"referer": "android-app://com.vidio.android",
"x-api-platform": "app-android",
"x-api-auth": self.API_AUTH,
"user-agent": self.USER_AGENT,
"x-api-app-info": self.API_APP_INFO,
"accept-language": "en",
"content-type": "application/x-www-form-urlencoded",
"x-visitor-id": self.VISITOR_ID,
}
data = f"login={self._email}&password={password}"
r = self.session.post("https://api.vidio.com/api/login", headers=headers, data=data)
r.raise_for_status()
auth_data = r.json()
self._user_token = auth_data["auth"]["authentication_token"]
self._access_token = auth_data["auth_tokens"]["access_token"]
self.log.info(f"Authenticated as {self._email}")
try:
expires_at_str = auth_data["auth_tokens"]["access_token_expires_at"]
expires_at_dt = datetime.fromisoformat(expires_at_str)
now_utc = datetime.now(timezone.utc)
expiration_in_seconds = max(0, int((expires_at_dt - now_utc).total_seconds()))
self.log.info(f"Token expires in {expiration_in_seconds / 60:.2f} minutes.")
except (KeyError, ValueError) as e:
self.log.warning(f"Could not parse token expiration: {e}. Defaulting to 1 hour.")
expiration_in_seconds = 3600
cache.set({
"user_token": self._user_token,
"access_token": self._access_token
}, expiration=expiration_in_seconds)
def _headers(self):
if not self._user_token or not self._access_token:
raise RuntimeError("Not authenticated. Call authenticate() first.")
return {
"referer": "android-app://com.vidio.android",
"x-api-platform": "app-android",
"x-api-auth": self.API_AUTH,
"user-agent": self.USER_AGENT,
"x-api-app-info": self.API_APP_INFO,
"x-visitor-id": self.VISITOR_ID,
"x-user-email": self._email,
"x-user-token": self._user_token,
"x-authorization": self._access_token,
"accept-language": "en",
"accept": "application/json",
"accept-charset": "UTF-8",
"content-type": "application/vnd.api+json",
}
def _extract_subtitles_from_mpd(self, mpd_url: str) -> list[Subtitle]:
"""
Manually parse the MPD to extract subtitle tracks.
Handles plain VTT format (for free content).
"""
subtitles = []
try:
r = self.session.get(mpd_url)
r.raise_for_status()
mpd_content = r.text
# Get base URL for resolving relative paths
base_url = mpd_url.rsplit('/', 1)[0] + '/'
# Remove namespace for easier parsing
mpd_content_clean = re.sub(r'\sxmlns="[^"]+"', '', mpd_content)
root = ET.fromstring(mpd_content_clean)
for adaptation_set in root.findall('.//AdaptationSet'):
content_type = adaptation_set.get('contentType', '')
if content_type != 'text':
continue
lang = adaptation_set.get('lang', 'und')
for rep in adaptation_set.findall('Representation'):
mime_type = rep.get('mimeType', '')
# Handle plain VTT (free content)
if mime_type == 'text/vtt':
segment_list = rep.find('SegmentList')
if segment_list is not None:
for segment_url in segment_list.findall('SegmentURL'):
media = segment_url.get('media')
if media:
full_url = urljoin(base_url, media)
# Determine if auto-generated
is_auto = '-auto' in lang
clean_lang = lang.replace('-auto', '')
subtitle = Subtitle(
id_=md5(full_url.encode()).hexdigest()[0:16],
url=full_url,
codec=Subtitle.Codec.WebVTT,
language=Language.get(clean_lang),
forced=False,
sdh=False,
)
subtitles.append(subtitle)
self.log.debug(f"Found VTT subtitle: {lang} -> {full_url}")
except Exception as e:
self.log.warning(f"Failed to extract subtitles from MPD: {e}")
return subtitles
def get_titles(self) -> Titles_T:
headers = self._headers()
if self.is_movie:
r = self.session.get(f"https://api.vidio.com/api/videos/{self.content_id}/detail", headers=headers)
r.raise_for_status()
video_data = r.json()["video"]
year = None
if video_data.get("publish_date"):
try:
year = int(video_data["publish_date"][:4])
except (ValueError, TypeError):
pass
return Movies([
Movie(
id_=video_data["id"],
service=self.__class__,
name=video_data["title"],
description=video_data.get("description", ""),
year=year,
language=Language.get("id"),
data=video_data,
)
])
else:
r = self.session.get(f"https://api.vidio.com/content_profiles/{self.content_id}", headers=headers)
r.raise_for_status()
root = r.json()["data"]
series_title = root["attributes"]["title"]
r_playlists = self.session.get(
f"https://api.vidio.com/content_profiles/{self.content_id}/playlists",
headers=headers
)
r_playlists.raise_for_status()
playlists_data = r_playlists.json()
# Use metadata to identify season playlists
season_playlist_ids = set()
if "meta" in playlists_data and "playlist_group" in playlists_data["meta"]:
for group in playlists_data["meta"]["playlist_group"]:
if group.get("type") == "season":
season_playlist_ids.update(group.get("playlist_ids", []))
season_playlists = []
for pl in playlists_data["data"]:
playlist_id = int(pl["id"])
name = pl["attributes"]["name"].lower()
if season_playlist_ids:
if playlist_id in season_playlist_ids:
season_playlists.append(pl)
else:
if ("season" in name or name == "episode" or name == "episodes") and \
"trailer" not in name and "extra" not in name:
season_playlists.append(pl)
if not season_playlists:
raise ValueError("No season playlists found for this series.")
def extract_season_number(pl):
name = pl["attributes"]["name"]
match = re.search(r"season\s*(\d+)", name, re.IGNORECASE)
if match:
return int(match.group(1))
elif name.lower() in ["season", "episodes", "episode"]:
return 1
else:
return 0
season_playlists.sort(key=extract_season_number)
all_episodes = []
for playlist in season_playlists:
playlist_id = playlist["id"]
season_number = extract_season_number(playlist)
if season_number == 0:
season_number = 1
self.log.debug(f"Processing playlist '{playlist['attributes']['name']}' as Season {season_number}")
page = 1
while True:
r_eps = self.session.get(
f"https://api.vidio.com/content_profiles/{self.content_id}/playlists/{playlist_id}/videos",
params={
"page[number]": page,
"page[size]": 20,
"sort": "order",
"included": "upcoming_videos"
},
headers=headers,
)
r_eps.raise_for_status()
page_data = r_eps.json()
for raw_ep in page_data["data"]:
attrs = raw_ep["attributes"]
ep_number = len([e for e in all_episodes if e.season == season_number]) + 1
all_episodes.append(
Episode(
id_=int(raw_ep["id"]),
service=self.__class__,
title=series_title,
season=season_number,
number=ep_number,
name=attrs["title"],
description=attrs.get("description", ""),
language=Language.get("id"),
data=raw_ep,
)
)
if not page_data["links"].get("next"):
break
page += 1
if not all_episodes:
raise ValueError("No episodes found in any season.")
return Series(all_episodes)
def get_tracks(self, title: Title_T) -> Tracks:
headers = self._headers()
headers.update({
"x-device-brand": "samsung",
"x-device-model": "SM-A525F",
"x-device-form-factor": "phone",
"x-device-soc": "Qualcomm SM7125",
"x-device-os": "Android 15 (API 35)",
"x-device-android-mpc": "0",
"x-device-cpu-arch": "arm64-v8a",
"x-device-platform": "android",
"x-app-version": "7.14.6-e4d1de87f2-3191683",
})
video_id = str(title.id)
url = f"https://api.vidio.com/api/stream/v1/video_data/{video_id}?initialize=true"
r = self.session.get(url, headers=headers)
r.raise_for_status()
stream = r.json()
if not isinstance(stream, dict):
raise ValueError("Vidio returned invalid stream data.")
# Extract DRM info
custom_data = stream.get("custom_data") or {}
license_servers = stream.get("license_servers") or {}
widevine_data = custom_data.get("widevine") if isinstance(custom_data, dict) else None
license_url = license_servers.get("drm_license_url") if isinstance(license_servers, dict) else None
# Get stream URLs
dash_url = stream.get("stream_dash_url") or stream.get("stream_token_dash_url")
hls_url = stream.get("stream_hls_url") or stream.get("stream_token_hls_url")
has_drm = widevine_data and license_url and dash_url and isinstance(widevine_data, str)
if has_drm:
# DRM content: use DASH
self.log.info("Widevine DRM detected, using DASH")
self.custom_data = widevine_data
self.license_url = license_url
tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language)
elif hls_url:
# Non-DRM: use HLS for video/audio
self.log.info("No DRM detected, using HLS for video/audio")
self.custom_data = None
self.license_url = None
tracks = HLS.from_url(hls_url, session=self.session).to_tracks(language=title.language)
# Clear HLS subtitles (they're segmented and incompatible)
if tracks.subtitles:
self.log.debug("Clearing HLS subtitles (incompatible format)")
tracks.subtitles.clear()
# Get subtitles from DASH manifest (plain VTT)
if dash_url:
self.log.debug("Extracting subtitles from DASH manifest")
manual_subs = self._extract_subtitles_from_mpd(dash_url)
if manual_subs:
for sub in manual_subs:
tracks.add(sub)
self.log.info(f"Added {len(manual_subs)} subtitle tracks from DASH")
elif dash_url:
# Fallback to DASH
self.log.warning("No HLS available, using DASH (VP9 codec)")
self.custom_data = None
self.license_url = None
tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language)
# Try manual subtitle extraction for non-DRM DASH
if not tracks.subtitles:
manual_subs = self._extract_subtitles_from_mpd(dash_url)
if manual_subs:
for sub in manual_subs:
tracks.add(sub)
else:
raise ValueError("No playable stream (DASH or HLS) available.")
self.log.info(f"Found {len(tracks.videos)} video tracks, {len(tracks.audio)} audio tracks, {len(tracks.subtitles)} subtitle tracks")
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def search(self):
raise NotImplementedError("Search not implemented for Vidio.")
def get_widevine_service_certificate(self, **_) -> Union[bytes, str, None]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
if not self.license_url or not self.custom_data:
raise ValueError("DRM license info missing.")
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0",
"Referer": "https://www.vidio.com/",
"Origin": "https://www.vidio.com",
"pallycon-customdata-v2": self.custom_data,
"Content-Type": "application/octet-stream",
}
self.log.debug(f"Requesting Widevine license from: {self.license_url}")
response = self.session.post(
self.license_url,
data=challenge,
headers=headers
)
if not response.ok:
error_summary = response.text[:200] if response.text else "No response body"
raise Exception(f"License request failed ({response.status_code}): {error_summary}")
return response.content