unshackle-services/SHUD/__init__.py

719 lines
27 KiB
Python
Raw Normal View History

2026-04-20 10:40:20 +02:00
import base64
import hashlib
import json
import re
from collections.abc import Generator
from datetime import datetime, timedelta
from http.cookiejar import CookieJar
from typing import Optional, Union
import click
from langcodes import Language
from urllib.parse import parse_qs, urlparse
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH, HLS
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video
class SHUD(Service):
"""
Service code for watch.shudder.com
Version: 1.0.0
Authorization: Bearer JWT Token
Security: FHD@L3
Use full URL (for example - https://watch.shudder.com/watch/927436) or title ID (for example - 927436).
"""
TITLE_RE = r"^(?:https?://watch\.shudder\.com/[^/]+/)?(?P<title_id>\d+)"
GEOFENCE = ("US", "CA", "GB", "AU", "IE", "NZ")
NO_SUBTITLES = False
VIDEO_RANGE_MAP = {
"SDR": "sdr",
"HDR10": "hdr10",
"DV": "dolby_vision",
}
@staticmethod
@click.command(name="SHUD", short_help="https://watch.shudder.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.option("-d", "--device", type=str, default="web", help="Select device from the config file")
@click.pass_context
def cli(ctx, **kwargs):
return SHUD(ctx, **kwargs)
def __init__(self, ctx, title, movie, device):
super().__init__(ctx)
self.title = title
self.movie = movie
self.device = device
self.cdm = ctx.obj.cdm
# Track request overrides based on device/CDM capabilities
if any(r != Video.Range.SDR for r in self.track_request.ranges):
self.track_request.codecs = [Video.Codec.HEVC]
if self.cdm and self.cdm.security_level == 3:
self.track_request.codecs = [Video.Codec.AVC]
self.track_request.ranges = [Video.Range.SDR]
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
self.profile = profile_name or "default"
self.license_data = {}
self.realm = "dce.shudder"
self.api_key = self.config["api_key"]
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
# Set required headers for all requests
self.session.headers.update({
"User-Agent": self.config["client"][self.device]["user_agent"],
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Referer": "https://watch.shudder.com/",
"Content-Type": "application/json",
"x-api-key": self.api_key,
"app": "dice",
"x-app-var": self.config["client"][self.device]["app_version"],
"Origin": "https://watch.shudder.com",
"Connection": "keep-alive",
})
# Handle credential-based auth (email/password)
if credential:
self.log.info("Authenticating with credentials")
# First get init tokens
init_params = {
"lk": "language",
"pk": "subTitleLanguage,subtitlePreferenceMode,subtitlePreferenceMap,audioLanguage,autoAdvance,pluginAccessTokens,videoBackgroundAutoPlay",
"readLicences": "true",
"countEvents": "LIVE",
"menuTargetPlatform": "WEB",
"readIconStore": "ENABLED",
"readUserProfiles": "true"
}
init_resp = self.session.get(
url=self.config["endpoints"]["init"],
params=init_params
)
init_resp.raise_for_status()
init_data = init_resp.json()
# Login with credentials
login_resp = self.session.post(
url=self.config["endpoints"]["login"],
headers={
"Authorization": f"Bearer {init_data.get('authentication', {}).get('authorisationToken', '')}",
"Realm": self.realm,
},
json={
"id": credential.username,
"secret": credential.password,
}
)
login_resp.raise_for_status()
login_data = login_resp.json()
self.auth_token = login_data.get("authorisationToken")
self.refresh_token = login_data.get("refreshToken")
if not self.auth_token:
raise ValueError("Authentication failed - no token received")
self.session.headers.update({
"Authorization": f"Bearer {self.auth_token}",
"Realm": self.realm,
})
self.log.info("Authentication successful")
return
if cookies:
self.log.info("Authenticating with cookies")
for cookie in cookies:
if cookie.name == "auth_token":
self.auth_token = cookie.value
self.session.headers.update({
"Authorization": f"Bearer {self.auth_token}",
"Realm": self.realm,
})
return
raise ValueError("No valid auth_token cookie found")
raise EnvironmentError("Service requires Credentials or Cookies for Authentication.")
def search(self) -> Generator[SearchResult, None, None]:
"""Search for titles on Shudder"""
search_resp = self.session.get(
url=self.config["endpoints"]["search"],
params={
"query": self.title,
"timezone": self.config.get("timezone", "UTC"),
}
)
search_resp.raise_for_status()
search_data = search_resp.json()
cards = []
for element in search_data.get("elements", []):
if element.get("$type") != "cardList":
continue
cards.extend(element.get("attributes", {}).get("cards", []))
for card in cards:
attrs = card.get("attributes", {})
action = attrs.get("action", {})
route = action.get("data", {}) if action.get("type") == "route" else {}
if not route:
continue
content_type = str(route.get("type", "")).upper()
if content_type not in ("VOD", "SERIES"):
continue
raw_id = str(route.get("id", ""))
if not raw_id:
continue
# "VOD#877410" -> "877410"
# "SERIES#3311" -> "3311"
title_id = raw_id.split("#", 1)[-1].strip()
if not title_id:
continue
is_series = content_type == "SERIES"
yield SearchResult(
id_=title_id,
title=route.get("title", ""),
label="SERIES" if is_series else "MOVIE",
url=f"https://watch.shudder.com/{'series' if is_series else 'watch'}/{title_id}",
)
def _parse_title_input(self) -> tuple[str, Optional[str], Optional[str]]:
"""
Returns:
(title_id, kind, season_id)
kind:
- "watch" for movie/episode URLs like /watch/927436
- "series" for series URLs like /series/3713?seasonId=33510
- None for raw numeric ids
"""
raw = str(self.title).strip()
if raw.isdigit():
return raw, None, None
parsed = urlparse(raw)
if parsed.scheme and parsed.netloc:
parts = [p for p in parsed.path.split("/") if p]
kind = parts[0].lower() if parts else None
title_id = parts[1] if len(parts) > 1 else None
season_id = parse_qs(parsed.query).get("seasonId", [None])[0]
if title_id and title_id.isdigit():
return title_id, kind, season_id
match = re.match(self.TITLE_RE, raw)
if not match:
raise ValueError(f"Invalid Shudder title: {raw}")
return match.group("title_id"), None, None
def _build_manifest_payload(self, video_id: Union[str, int]) -> dict:
return {
"mediaCapabilities": [
{
"protocols": ["HLS", "DASH"],
"audioCodecs": ["aac"],
"videoCodecs": ["h264", "hevc"],
},
{
"keySystem": "WIDEVINE",
"robustness": "software",
"protocols": ["HLS", "DASH"],
"audioCodecs": ["aac"],
"encryptionMode": ["CBC", "CTR"],
"videoCodecs": ["h264"],
},
],
"macros": {
"CM-APP-NAME": "Website",
"CM-APP-VERSION": self.config["client"][self.device]["app_version"],
"CM-DVC-DNT": "0",
"CM-DVC-H": "1200",
"CM-DVC-W": "1920",
"CM-DVC-LANG": "en-US",
"CM-DVC-OS": "14",
"CM-DVC-TYPE": "2",
"CM-WEB-MBL": "0",
"CM-WEB-PAGE": f"/video/{video_id}",
"CM-CST-TCF": "",
"CM-CST-USP": "",
"CM-DVC-ATS": "",
},
}
def _get_video_metadata(self, video_id: str) -> Optional[dict]:
resp = self.session.post(
url=self.config["endpoints"]["video"].format(video_id=video_id),
params={"includePlaybackDetails": "URL", "displayGeoblocked": "HIDE"},
json=self._build_manifest_payload(video_id),
)
if resp.status_code in (404, 405):
return None
resp.raise_for_status()
return resp.json()
def _get_series_view(self, series_id: str, season_id: Optional[str] = None) -> dict:
"""
Supports both config styles:
1. full templated URL:
https://.../api/v1/view?type=series&id={series_id}&timezone=UTC
2. base URL:
https://.../api/v1/view
"""
endpoint = self.config["endpoints"]["series_view"]
params = {}
if "{series_id}" in endpoint:
url = endpoint.format(series_id=series_id)
else:
url = endpoint
params.update({
"type": "series",
"id": series_id,
"timezone": self.config.get("timezone", "UTC"),
})
if season_id:
params["seasonId"] = season_id
resp = self.session.get(url=url, params=params or None)
resp.raise_for_status()
return resp.json()
def _get_series_element(self, data: dict) -> dict:
for element in data.get("elements", []):
if element.get("$type") == "series":
return element.get("attributes", {})
return {}
def _get_season_bucket(self, data: dict) -> dict:
for element in data.get("elements", []):
if element.get("$type") != "bucket":
continue
attrs = element.get("attributes", {})
if attrs.get("tab") == "season" or attrs.get("type") == "season":
return attrs
return {}
def _extract_series_description(self, data: dict) -> str:
for element in data.get("elements", []):
if element.get("$type") != "hero":
continue
for item in element.get("attributes", {}).get("content", []):
if item.get("$type") == "textblock":
text = item.get("attributes", {}).get("text")
if text:
return text
return ""
def _extract_series_year(self, data: dict) -> Optional[int]:
for element in data.get("elements", []):
if element.get("$type") != "hero":
continue
for item in element.get("attributes", {}).get("content", []):
if item.get("$type") != "tagList":
continue
for tag in item.get("attributes", {}).get("tags", []):
text = str(tag.get("attributes", {}).get("text", "")).strip()
if re.fullmatch(r"\d{4}", text):
return int(text)
return None
@staticmethod
def _parse_episode_label(label: str, fallback_number: int) -> tuple[int, str]:
label = (label or "").strip()
if not label:
return fallback_number, f"Episode {fallback_number}"
m = re.match(r"^E(?P<number>\d+)\s*[-:]\s*(?P<name>.+)$", label, re.I)
if m:
return int(m.group("number")), m.group("name").strip()
m = re.match(r"^Episode\s+(?P<number>\d+)\s*[-:]\s*(?P<name>.+)$", label, re.I)
if m:
return int(m.group("number")), m.group("name").strip()
return fallback_number, label
def _get_series_titles(self, series_id: str, preferred_season_id: Optional[str] = None) -> Series:
"""
Important:
The /view response usually contains episode items only for the selected season.
So we fetch the initial page, then request each season explicitly with seasonId=...
"""
page = self._get_series_view(series_id, preferred_season_id)
series_element = self._get_series_element(page)
season_bucket = self._get_season_bucket(page)
metadata = page.get("metadata", {})
series_title = (
metadata.get("pageTitle")
or series_element.get("series", {}).get("title")
or ""
)
series_description = self._extract_series_description(page)
series_year = self._extract_series_year(page)
seasons = series_element.get("seasons", {}).get("items", [])
if not seasons:
raise ValueError(f"No seasons found for series {series_id}")
initial_season_id = str(
season_bucket.get("seasonId")
or season_bucket.get("id")
or series_element.get("seasonId")
or metadata.get("currentSeason", {}).get("seasonId")
or ""
)
cached_items = {}
if initial_season_id:
cached_items[initial_season_id] = season_bucket.get("items", [])
built_episodes = []
seen_episode_ids = set()
for season_index, season in enumerate(seasons, start=1):
season_id = str(season.get("id"))
season_number = season.get("seasonNumber")
if season_number is None:
m = re.search(r"(\d+)", str(season.get("title", "")))
season_number = int(m.group(1)) if m else season_index
else:
season_number = int(season_number)
items = cached_items.get(season_id)
if items is None:
season_page = self._get_series_view(series_id, season_id)
season_bucket = self._get_season_bucket(season_page)
items = season_bucket.get("items", [])
if not items:
self.log.warning(f"No episode items returned for series {series_id}, season {season_number}")
continue
for fallback_ep_num, item in enumerate(items, start=1):
episode_id = str(item["id"])
if episode_id in seen_episode_ids:
continue
seen_episode_ids.add(episode_id)
episode_number, episode_name = self._parse_episode_label(
item.get("title", ""),
fallback_ep_num,
)
built_episodes.append((
season_number,
episode_number,
Episode(
id_=episode_id,
service=self.__class__,
title=series_title,
season=season_number,
number=episode_number,
name=episode_name,
year=series_year,
language=Language.get("en"),
data={
**item,
"series_id": int(series_id),
"series_title": series_title,
"series_description": series_description,
"season_id": season.get("id"),
"season_title": season.get("title"),
"season_number": season_number,
"episode_number": episode_number,
},
),
))
if not built_episodes:
raise ValueError(f"No episodes found for series {series_id}")
return Series([
episode
for _, _, episode in sorted(built_episodes, key=lambda x: (x[0], x[1]))
])
def get_titles(self) -> Titles_T:
"""Get movie or series metadata"""
title_id, kind, season_id = self._parse_title_input()
self.title = title_id
# Explicit /series/... URL -> go straight to series handling
if not self.movie and kind == "series":
return self._get_series_titles(title_id, season_id)
# Try movie/video manifest first
metadata = self._get_video_metadata(title_id)
# If manifest lookup fails, try series view
if metadata is None:
if not self.movie:
self.log.info(f"Manifest lookup failed for {title_id}, trying series view")
return self._get_series_titles(title_id, season_id)
raise ValueError(f"Title {title_id} not found")
if metadata.get("contentDownload", {}).get("permission") == "DISALLOWED":
self.log.warning(f"Download not permitted for title {title_id}")
content_type = str(metadata.get("type", "")).upper()
# Movie path
if self.movie or content_type in ("VOD", "MOVIE"):
return Movies([
Movie(
id_=metadata["id"],
service=self.__class__,
name=metadata.get("title", ""),
description=metadata.get("description", metadata.get("longDescription", "")),
year=int(metadata.get("productionYear", 0)) if metadata.get("productionYear") else None,
language=Language.get("en"),
data=metadata,
)
])
# Direct episode ids are not ideal without the parent series context
if "SEASON" in content_type or "EPISODE" in content_type:
raise ValueError(
"Direct episode IDs are not supported yet. "
"Use the series URL or series id instead."
)
# Fallback to series handling
return self._get_series_titles(title_id, season_id)
def get_tracks(self, title: Title_T) -> Tracks:
"""Fetch and parse manifest tracks"""
def _fetch_variant(
title: Title_T,
codec: Optional[Video.Codec],
range_: Video.Range,
) -> Tracks:
vcodec_str = "hevc" if codec == Video.Codec.HEVC else "h264"
range_str = range_.name
video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr")
self.log.info(f" + Fetching {vcodec_str.upper()} {range_str} manifest")
# Build media capabilities payload
media_capabilities = [
{
"protocols": ["HLS", "DASH"],
"audioCodecs": ["aac"],
"videoCodecs": [vcodec_str],
}
]
# Add DRM capabilities for encrypted streams
if codec:
media_capabilities.append({
"keySystem": "WIDEVINE",
"robustness": "software",
"protocols": ["HLS", "DASH"],
"audioCodecs": ["aac"],
"encryptionMode": ["CBC", "CTR"],
"videoCodecs": [vcodec_str],
})
# Build macros for request
macros = {
"CM-APP-NAME": "Website",
"CM-APP-VERSION": self.config["client"][self.device]["app_version"],
"CM-DVC-DNT": "0",
"CM-DVC-H": "1080",
"CM-DVC-W": "1920",
"CM-DVC-LANG": "en-US",
"CM-DVC-OS": "14",
"CM-DVC-TYPE": "2",
"CM-WEB-MBL": "0",
f"CM-WEB-PAGE": f"/video/{title.id}",
}
# Inside _fetch_variant() in get_tracks():
manifest_resp = self.session.post(
url=self.config["endpoints"]["manifest"].format(video_id=title.id),
params={"includePlaybackDetails": "URL", "displayGeoblocked": "HIDE"},
json={
"mediaCapabilities": media_capabilities, # Same as above
"macros": macros, # Same as above, update CM-WEB-PAGE with title.id
}
)
manifest_resp.raise_for_status()
manifest_data = manifest_resp.json()
# Extract stream URL and DRM info
streams = manifest_data.get("streams", [])
if not streams:
raise ValueError("No streams available for this title")
stream = streams[0] # Take first available stream
stream_url = stream.get("url")
if not stream_url:
raise ValueError("No stream URL found in manifest")
# Store DRM/license data for later use
drm = stream.get("drm", {})
if drm:
self.license_data = {
"url": drm.get("url", self.config["endpoints"]["widevine_license"]),
"jwtToken": drm.get("jwtToken", ""),
"encryptionMode": drm.get("encryptionMode", "CBC"),
"keySystems": drm.get("keySystems", []),
}
# Parse manifest based on protocol
if "m3u8" in stream_url.lower():
tracks = HLS.from_url(url=stream_url, session=self.session).to_tracks(language=title.language)
else:
tracks = DASH.from_url(url=stream_url, session=self.session).to_tracks(language=title.language)
# Apply video range to tracks
range_enum = {
"hdr10": Video.Range.HDR10,
"dolby_vision": Video.Range.DV,
}.get(video_format, Video.Range.SDR)
for video in tracks.videos:
video.range = range_enum
# Filter audio tracks (remove clear/unencrypted if DRM present)
if drm:
tracks.audio = [
track for track in tracks.audio
if "clear" not in str(track.data).lower()
]
# Fix channel counts
for track in tracks.audio:
if track.channels == 6.0:
track.channels = 5.1
# Check for descriptive audio
label = track.data.get("label", "").lower() if isinstance(track.data, dict) else ""
if "audio description" in label or "descriptive" in label:
track.descriptive = True
return tracks
return self._get_tracks_for_variants(title, _fetch_variant)
def get_chapters(self, title: Title_T) -> list[Chapter]:
"""Extract chapter markers if available"""
chapters = []
# Check for skip markers in title data
skip_markers = title.data.get("skipMarkers", [])
for marker in skip_markers:
marker_type = marker.get("type", "").lower()
start = marker.get("start", marker.get("offset"))
end = marker.get("end")
if marker_type == "intro" and start is not None:
chapters.append(Chapter(timestamp=int(start), name="Opening"))
if end:
chapters.append(Chapter(timestamp=int(end)))
elif marker_type == "credits" and start is not None:
chapters.append(Chapter(timestamp=int(start), name="Credits"))
return chapters
def get_widevine_service_certificate(self, **_: any) -> str:
"""Return Widevine service certificate if configured"""
return self.config.get("certificate", "")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
"""Request Widevine license from Shudder's DRM server"""
license_url = self.license_data.get("url") or self.config["endpoints"]["widevine_license"]
if not license_url:
raise ValueError("Widevine license endpoint not configured")
# Build license request headers
headers = {
"User-Agent": self.config["client"][self.device]["user_agent"],
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Origin": "https://watch.shudder.com",
"Referer": "https://watch.shudder.com/",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
}
# Add DRM info header for Widevine
drm_info = {"system": "com.widevine.alpha"}
headers["X-DRM-INFO"] = base64.b64encode(json.dumps(drm_info).encode()).decode()
# Add authorization if we have JWT token
jwt_token = self.license_data.get("jwtToken")
if jwt_token:
headers["Authorization"] = f"Bearer {jwt_token}"
elif hasattr(self, "auth_token"):
headers["Authorization"] = f"Bearer {self.auth_token}"
# Send license request
response = self.session.post(
url=license_url,
data=challenge,
headers=headers,
)
response.raise_for_status()
# Handle JSON or binary license response
try:
license_data = response.json()
# Shudder may return license in different fields
return license_data.get("license") or license_data.get("data") or response.content
except ValueError:
return response.content