devine-services/services/STV/__init__.py

231 lines
8.1 KiB
Python
Raw Normal View History

2024-07-08 09:40:01 +00:00
from __future__ import annotations
import re
from collections.abc import Generator
from datetime import timedelta
from typing import Any, Union
from urllib.parse import urlparse
import click
from click import Context
from devine.core.manifests.dash import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Tracks
from lxml import etree
class STV(Service):
"""
Service code for STV Player streaming service (https://player.stv.tv/).
\b
Author: stabbedbybrick
Authorization: None
Robustness:
L3: 1080p
\b
Tips:
- Use complete title URL as input:
SERIES: https://player.stv.tv/summary/rebus
EPISODE: https://player.stv.tv/episode/2ro8/rebus
- Use the episode URL for movies:
MOVIE: https://player.stv.tv/episode/4lw7/wonder-woman-1984
"""
GEOFENCE = ("gb",)
ALIASES = ("stvplayer",)
@staticmethod
@click.command(name="STV", short_help="https://player.stv.tv/", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> STV:
return STV(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
super().__init__(ctx)
self.session.headers.update({"user-agent": "okhttp/4.11.0"})
self.base = self.config["endpoints"]["base"]
def search(self) -> Generator[SearchResult, None, None]:
data = {
"engine_key": "S1jgssBHdk8ZtMWngK_y",
"q": self.title,
}
r = self.session.post(self.config["endpoints"]["search"], data=data)
r.raise_for_status()
results = r.json()["records"]["page"]
for result in results:
label = result.get("category")
if label and isinstance(label, list):
label = result["category"][0]
yield SearchResult(
id_=result.get("url"),
title=result.get("title"),
description=result.get("body"),
label=label,
url=result.get("url"),
)
def get_titles(self) -> Union[Movies, Series]:
kind, slug = self.parse_title(self.title)
self.session.headers.update({"stv-drm": "true"})
if kind == "episode":
r = self.session.get(self.base + f"episodes/{slug}")
r.raise_for_status()
episode = r.json()["results"]
if episode.get("genre").lower() == "movie":
return Movies(
[
Movie(
id_=episode["video"].get("id"),
service=self.__class__,
year=None,
name=episode.get("title"),
language="en",
data=episode,
)
]
)
episodes = [
Episode(
id_=episode["video"].get("id"),
service=self.__class__,
title=episode["programme"].get("name"),
season=int(episode["playerSeries"]["name"].split(" ")[1])
if episode.get("playerSeries") and re.match(r"Series \d+", episode["playerSeries"]["name"])
else 0,
number=int(episode.get("number", 0)),
name=episode.get("title"),
language="en",
data=episode,
)
]
elif kind == "summary":
r = self.session.get(self.base + f"programmes/{slug}")
r.raise_for_status()
data = r.json()
series = [series.get("guid") for series in data["results"]["series"]]
seasons = [self.session.get(self.base + f"episodes?series.guid={i}").json() for i in series]
episodes = [
Episode(
id_=episode["video"].get("id"),
service=self.__class__,
title=data["results"].get("name"),
season=int(episode["playerSeries"]["name"].split(" ")[1])
if episode.get("playerSeries") and re.match(r"Series \d+", episode["playerSeries"]["name"])
else 0,
number=int(episode.get("number", 0)),
name=episode.get("title"),
language="en",
data=episode,
)
for season in seasons
for episode in season["results"]
]
self.session.headers.pop("stv-drm")
return Series(episodes)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
self.drm = title.data["programme"].get("drmEnabled")
headers = self.config["headers"]["drm"] if self.drm else self.config["headers"]["clear"]
accounts = self.config["accounts"]["drm"] if self.drm else self.config["accounts"]["clear"]
r = self.session.get(
self.config["endpoints"]["playback"].format(accounts=accounts, id=title.id),
headers=headers,
)
if not r.ok:
raise ConnectionError(r.text)
data = r.json()
source_manifest = next(
(source["src"] for source in data["sources"] if source.get("type") == "application/dash+xml"),
None,
)
self.license = None
if self.drm:
key_systems = next((
source
for source in data["sources"]
if source.get("type") == "application/dash+xml"
and source.get("key_systems").get("com.widevine.alpha")),
None,
)
self.license = key_systems["key_systems"]["com.widevine.alpha"]["license_url"] if key_systems else None
manifest = self.trim_duration(source_manifest)
tracks = DASH.from_text(manifest, source_manifest).to_tracks(title.language)
for track in tracks.audio:
role = track.data["dash"]["representation"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
cue_points = title.data.get("_cuePoints")
if not cue_points:
return Chapters()
return Chapters([Chapter(timestamp=int(cue)) for cue in cue_points])
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
if not self.license:
return None
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
raise ConnectionError(r.text)
return r.content
# Service specific functions
@staticmethod
def parse_title(title: str) -> tuple[str, str]:
parsed_url = urlparse(title).path.split("/")
kind, slug = parsed_url[1], parsed_url[2]
if kind not in ["episode", "summary"]:
raise ValueError("Failed to parse title - is the URL correct?")
return kind, slug
@staticmethod
def trim_duration(source_manifest: str) -> str:
"""
The last segment on all tracks return a 404 for some reason, causing a failed download.
So we trim the duration by exactly one segment to account for that.
TODO: Calculate the segment duration instead of assuming length.
"""
manifest = DASH.from_url(source_manifest).manifest
period_duration = manifest.get("mediaPresentationDuration")
period_duration = DASH.pt_to_sec(period_duration)
hours, minutes, seconds = str(timedelta(seconds=period_duration - 6)).split(":")
new_duration = f"PT{hours}H{minutes}M{seconds}S"
manifest.set("mediaPresentationDuration", new_duration)
return etree.tostring(manifest, encoding="unicode")