231 lines
8.1 KiB
Python
231 lines
8.1 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from collections.abc import Generator
|
|
from datetime import timedelta
|
|
from typing import Any, Union
|
|
from urllib.parse import urlparse
|
|
|
|
import click
|
|
from click import Context
|
|
from devine.core.manifests.dash import DASH
|
|
from devine.core.search_result import SearchResult
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series
|
|
from devine.core.tracks import Chapter, Chapters, Tracks
|
|
from lxml import etree
|
|
|
|
|
|
class STV(Service):
|
|
"""
|
|
Service code for STV Player streaming service (https://player.stv.tv/).
|
|
|
|
\b
|
|
Author: stabbedbybrick
|
|
Authorization: None
|
|
Robustness:
|
|
L3: 1080p
|
|
|
|
\b
|
|
Tips:
|
|
- Use complete title URL as input:
|
|
SERIES: https://player.stv.tv/summary/rebus
|
|
EPISODE: https://player.stv.tv/episode/2ro8/rebus
|
|
- Use the episode URL for movies:
|
|
MOVIE: https://player.stv.tv/episode/4lw7/wonder-woman-1984
|
|
|
|
"""
|
|
|
|
GEOFENCE = ("gb",)
|
|
ALIASES = ("stvplayer",)
|
|
|
|
@staticmethod
|
|
@click.command(name="STV", short_help="https://player.stv.tv/", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.pass_context
|
|
def cli(ctx: Context, **kwargs: Any) -> STV:
|
|
return STV(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx: Context, title: str):
|
|
self.title = title
|
|
super().__init__(ctx)
|
|
|
|
self.session.headers.update({"user-agent": "okhttp/4.11.0"})
|
|
self.base = self.config["endpoints"]["base"]
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
data = {
|
|
"engine_key": "S1jgssBHdk8ZtMWngK_y",
|
|
"q": self.title,
|
|
}
|
|
r = self.session.post(self.config["endpoints"]["search"], data=data)
|
|
r.raise_for_status()
|
|
results = r.json()["records"]["page"]
|
|
|
|
for result in results:
|
|
label = result.get("category")
|
|
if label and isinstance(label, list):
|
|
label = result["category"][0]
|
|
|
|
yield SearchResult(
|
|
id_=result.get("url"),
|
|
title=result.get("title"),
|
|
description=result.get("body"),
|
|
label=label,
|
|
url=result.get("url"),
|
|
)
|
|
|
|
def get_titles(self) -> Union[Movies, Series]:
|
|
kind, slug = self.parse_title(self.title)
|
|
self.session.headers.update({"stv-drm": "true"})
|
|
|
|
if kind == "episode":
|
|
r = self.session.get(self.base + f"episodes/{slug}")
|
|
r.raise_for_status()
|
|
episode = r.json()["results"]
|
|
|
|
if episode.get("genre").lower() == "movie":
|
|
return Movies(
|
|
[
|
|
Movie(
|
|
id_=episode["video"].get("id"),
|
|
service=self.__class__,
|
|
year=None,
|
|
name=episode.get("title"),
|
|
language="en",
|
|
data=episode,
|
|
)
|
|
]
|
|
)
|
|
|
|
episodes = [
|
|
Episode(
|
|
id_=episode["video"].get("id"),
|
|
service=self.__class__,
|
|
title=episode["programme"].get("name"),
|
|
season=int(episode["playerSeries"]["name"].split(" ")[1])
|
|
if episode.get("playerSeries") and re.match(r"Series \d+", episode["playerSeries"]["name"])
|
|
else 0,
|
|
number=int(episode.get("number", 0)),
|
|
name=episode.get("title"),
|
|
language="en",
|
|
data=episode,
|
|
)
|
|
]
|
|
|
|
elif kind == "summary":
|
|
r = self.session.get(self.base + f"programmes/{slug}")
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
series = [series.get("guid") for series in data["results"]["series"]]
|
|
seasons = [self.session.get(self.base + f"episodes?series.guid={i}").json() for i in series]
|
|
|
|
episodes = [
|
|
Episode(
|
|
id_=episode["video"].get("id"),
|
|
service=self.__class__,
|
|
title=data["results"].get("name"),
|
|
season=int(episode["playerSeries"]["name"].split(" ")[1])
|
|
if episode.get("playerSeries") and re.match(r"Series \d+", episode["playerSeries"]["name"])
|
|
else 0,
|
|
number=int(episode.get("number", 0)),
|
|
name=episode.get("title"),
|
|
language="en",
|
|
data=episode,
|
|
)
|
|
for season in seasons
|
|
for episode in season["results"]
|
|
]
|
|
|
|
self.session.headers.pop("stv-drm")
|
|
return Series(episodes)
|
|
|
|
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
|
|
self.drm = title.data["programme"].get("drmEnabled")
|
|
headers = self.config["headers"]["drm"] if self.drm else self.config["headers"]["clear"]
|
|
accounts = self.config["accounts"]["drm"] if self.drm else self.config["accounts"]["clear"]
|
|
|
|
r = self.session.get(
|
|
self.config["endpoints"]["playback"].format(accounts=accounts, id=title.id),
|
|
headers=headers,
|
|
)
|
|
if not r.ok:
|
|
raise ConnectionError(r.text)
|
|
data = r.json()
|
|
|
|
source_manifest = next(
|
|
(source["src"] for source in data["sources"] if source.get("type") == "application/dash+xml"),
|
|
None,
|
|
)
|
|
|
|
self.license = None
|
|
if self.drm:
|
|
key_systems = next((
|
|
source
|
|
for source in data["sources"]
|
|
if source.get("type") == "application/dash+xml"
|
|
and source.get("key_systems").get("com.widevine.alpha")),
|
|
None,
|
|
)
|
|
|
|
self.license = key_systems["key_systems"]["com.widevine.alpha"]["license_url"] if key_systems else None
|
|
|
|
manifest = self.trim_duration(source_manifest)
|
|
tracks = DASH.from_text(manifest, source_manifest).to_tracks(title.language)
|
|
|
|
for track in tracks.audio:
|
|
role = track.data["dash"]["representation"].find("Role")
|
|
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
|
|
track.descriptive = True
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
|
|
cue_points = title.data.get("_cuePoints")
|
|
if not cue_points:
|
|
return Chapters()
|
|
|
|
return Chapters([Chapter(timestamp=int(cue)) for cue in cue_points])
|
|
|
|
def get_widevine_service_certificate(self, **_: Any) -> str:
|
|
return None
|
|
|
|
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
|
|
if not self.license:
|
|
return None
|
|
|
|
r = self.session.post(url=self.license, data=challenge)
|
|
if r.status_code != 200:
|
|
raise ConnectionError(r.text)
|
|
return r.content
|
|
|
|
# Service specific functions
|
|
|
|
@staticmethod
|
|
def parse_title(title: str) -> tuple[str, str]:
|
|
parsed_url = urlparse(title).path.split("/")
|
|
kind, slug = parsed_url[1], parsed_url[2]
|
|
if kind not in ["episode", "summary"]:
|
|
raise ValueError("Failed to parse title - is the URL correct?")
|
|
|
|
return kind, slug
|
|
|
|
@staticmethod
|
|
def trim_duration(source_manifest: str) -> str:
|
|
"""
|
|
The last segment on all tracks return a 404 for some reason, causing a failed download.
|
|
So we trim the duration by exactly one segment to account for that.
|
|
|
|
TODO: Calculate the segment duration instead of assuming length.
|
|
"""
|
|
manifest = DASH.from_url(source_manifest).manifest
|
|
period_duration = manifest.get("mediaPresentationDuration")
|
|
period_duration = DASH.pt_to_sec(period_duration)
|
|
|
|
hours, minutes, seconds = str(timedelta(seconds=period_duration - 6)).split(":")
|
|
new_duration = f"PT{hours}H{minutes}M{seconds}S"
|
|
manifest.set("mediaPresentationDuration", new_duration)
|
|
|
|
return etree.tostring(manifest, encoding="unicode")
|