devine-services/services/DSCP/__init__.py
2024-08-01 17:05:49 +02:00

295 lines
11 KiB
Python

from __future__ import annotations
import json
import re
import sys
import uuid
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Any, Optional, Union
from urllib.parse import urljoin
import click
from click import Context
from devine.core.credential import Credential
from devine.core.manifests.dash import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Tracks
from requests import Request
class DSCP(Service):
"""
\b
Service code for Discovery Plus (https://discoveryplus.com).
\b
Author: stabbedbybrick
Authorization: Cookies
Robustness:
L3: 2160p, AAC2.0
\b
Tips:
- Input can be either complete title URL or just the path:
SHOW: /show/richard-hammonds-workshop
EPISODE: /video/richard-hammonds-workshop/new-beginnings
SPORT: /video/sport/tnt-sports-1/uefa-champions-league
- Use the --lang LANG_RANGE option to request non-english tracks
- use -v H.265 to request H.265 tracks
\b
Known issues:
- Devine can't properly parse certain manifests, causing an error in download workers.
- Sport streams specifically seem to be the most affected by this.
"""
ALIASES = ("dplus", "discoveryplus", "discovery+")
TITLE_RE = r"^(?:https?://(?:www\.)?discoveryplus\.com(?:/[a-z]{2})?)?/(?P<type>show|video)/(?P<id>[a-z0-9-/]+)"
@staticmethod
@click.command(name="DSCP", short_help="https://discoveryplus.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> DSCP:
return DSCP(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
self.vcodec = ctx.parent.params.get("vcodec")
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.session.cookies.update(cookies)
self.base_url = None
info = self._request("GET", "https://global-prod.disco-api.com/bootstrapInfo")
self.base_url = info["data"]["attributes"].get("baseApiUrl")
user = self._request("GET", "/users/me")
self.territory = user["data"]["attributes"]["currentLocationTerritory"]
self.user_language = user["data"]["attributes"]["clientTranslationLanguageTags"][0]
self.site_id = user["meta"]["site"]["id"]
def search(self) -> Generator[SearchResult, None, None]:
params = {
"include": "default",
"decorators": "viewingHistory,isFavorite,playbackAllowed,contentAction,badges",
"contentFilter[query]": self.title,
"page[items.number]": "1",
"page[items.size]": "8",
}
data = self._request("GET", "/cms/routes/search/result", params=params)
results = [x.get("attributes") for x in data["included"] if x.get("type") == "show"]
for result in results:
yield SearchResult(
id_=f"/show/{result.get('alternateId')}",
title=result.get("name"),
description=result.get("description"),
label="show",
url=f"/show/{result.get('alternateId')}",
)
def get_titles(self) -> Union[Movies, Series]:
try:
kind, content_id = (re.match(self.TITLE_RE, self.title).group(i) for i in ("type", "id"))
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if kind == "video":
episodes = self._episode(content_id)
if kind == "show":
episodes = self._show(content_id)
return Series(episodes)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
platform = "firetv" if self.vcodec == "H.265" else "desktop"
res = self._request(
"POST",
"/playback/v3/videoPlaybackInfo",
payload={
"videoId": title.id,
"deviceInfo": {
"adBlocker": "false",
"drmSupported": "true",
"hwDecodingCapabilities": ["H264", "H265"],
"screen": {"width": 3840, "height": 2160},
"player": {"width": 3840, "height": 2160},
},
"wisteriaProperties": {
"advertiser": {"firstPlay": 0, "fwIsLat": 0},
"device": {"browser": {"name": "chrome", "version": "96.0.4664.55"}, "type": platform},
"platform": platform,
"product": "dplus_emea",
"sessionId": str(uuid.uuid1()),
"streamProvider": {"suspendBeaconing": 0, "hlsVersion": 7, "pingConfig": 1},
},
},
)
self.license = None
streaming = res["data"]["attributes"]["streaming"][0]
manifest = streaming["url"]
if streaming["protection"]["drmEnabled"]:
self.token = streaming["protection"]["drmToken"]
self.license = streaming["protection"]["schemes"]["widevine"]["licenseUrl"]
tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> str:
if not self.license:
return None
r = self.session.post(self.license, headers={"Preauthorization": self.token}, data=challenge)
if not r.ok:
raise ConnectionError(r.text)
return r.content
# Service specific functions
def _show(self, title: str) -> Episode:
params = {
"include": "default",
"decorators": "playbackAllowed,contentAction,badges",
}
data = self._request("GET", "/cms/routes/show/{}".format(title), params=params)
content = next(x for x in data["included"] if x["attributes"].get("alias") == "generic-show-episodes")
content_id = content["id"]
show_id = content["attributes"]["component"]["mandatoryParams"]
season_params = [x.get("parameter") for x in content["attributes"]["component"]["filters"][0]["options"]]
page = next(x for x in data["included"] if x.get("type", "") == "page")
seasons = [
self._request(
"GET",
"/cms/collections/{}?{}&{}".format(content_id, season, show_id),
params={
"include": "default",
"decorators": "playbackAllowed,contentAction,badges",
},
)
for season in season_params
]
videos = [[x for x in season["included"] if x["type"] == "video"] for season in seasons]
return [
Episode(
id_=ep["id"],
service=self.__class__,
title=page["attributes"]["title"],
year=ep["attributes"]["airDate"][:4],
season=ep["attributes"].get("seasonNumber"),
number=ep["attributes"].get("episodeNumber"),
name=ep["attributes"]["name"],
language=ep["attributes"]["audioTracks"][0]
if ep["attributes"].get("audioTracks")
else self.user_language,
data=ep,
)
for episodes in videos
for ep in episodes
if ep["attributes"]["videoType"] == "EPISODE"
]
def _episode(self, title: str) -> Episode:
params = {
"include": "default",
"decorators": "playbackAllowed,contentAction,badges",
}
data = self._request("GET", "/cms/routes/video/{}".format(title), params=params)
page = next((x for x in data["included"] if x.get("type", "") == "page"), None)
if not page:
raise IndexError("Episode page not found")
video_id = page["relationships"].get("primaryContent", {}).get("data", {}).get("id")
if not video_id:
raise IndexError("Episode id not found")
params = {
"decorators": "isFavorite",
"include": "primaryChannel",
}
content = self._request("GET", "/content/videos/{}".format(video_id), params=params)
episode = content["data"]["attributes"]
name = episode.get("name")
if episode.get("secondaryTitle"):
name += " - " + episode.get("secondaryTitle")
return [
Episode(
id_=content["data"].get("id"),
service=self.__class__,
title=page["attributes"]["title"],
year=int(episode.get("airDate")[:4]) if episode.get("airDate") else None,
season=episode.get("seasonNumber") or 0,
number=episode.get("episodeNumber") or 0,
name=name,
language=episode["audioTracks"][0] if episode.get("audioTracks") else self.user_language,
data=episode,
)
]
def _request(
self,
method: str,
api: str,
params: dict = None,
headers: dict = None,
payload: dict = None,
) -> Any[dict | str]:
url = urljoin(self.base_url, api)
self.session.headers.update(self.config["headers"])
if params:
self.session.params.update(params)
if headers:
self.session.headers.update(headers)
prep = self.session.prepare_request(Request(method, url, json=payload))
response = self.session.send(prep)
try:
data = json.loads(response.content)
if data.get("errors"):
if "invalid.token" in data["errors"][0]["code"]:
self.log.error("- Invalid Token. Cookies are invalid or may have expired.")
sys.exit(1)
if "missingpackage" in data["errors"][0]["code"]:
self.log.error("- Access Denied. Title is not available for this subscription.")
sys.exit(1)
raise ConnectionError(data["errors"])
return data
except Exception as e:
raise ConnectionError("Request failed: {}".format(e))