diff --git a/services/ITV/__init__.py b/services/ITV/__init__.py new file mode 100644 index 0000000..3d0dc3e --- /dev/null +++ b/services/ITV/__init__.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import hashlib +import json +import re +import time +from collections.abc import Generator +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union + +import click +from bs4 import BeautifulSoup +from click import Context +from devine.core.credential import Credential +from devine.core.manifests.dash import DASH +from devine.core.search_result import SearchResult +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks + + +class ITV(Service): + """ + Service code for ITVx streaming service (https://www.itv.com/). + + \b + Author: stabbedbybrick + Authorization: Credentials (Optional for free content | Required for premium content) + Robustness: + L1: 1080p + L3: 720p + + \b + Tips: + - Use complete title URL as input (pay attention to the URL format): + SERIES: https://www.itv.com/watch/bay-of-fires/10a5270 + EPISODE: https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001 + FILM: https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095 + + \b + Examples: + - SERIES: devine dl -w s01e01 itv https://www.itv.com/watch/bay-of-fires/10a5270 + - EPISODE: devine dl itv https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001 + - FILM: devine dl itv https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095 + + \b + Notes: + ITV seem to detect and throttle multiple connections against the server. + It's recommended to use requests as downloader, with few workers. + + """ + + GEOFENCE = ("gb",) + ALIASES = ("itvx",) + + @staticmethod + @click.command(name="ITV", short_help="https://www.itv.com/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> ITV: + return ITV(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + self.session.headers.update(self.config["headers"]) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + self.authorization = None + + if credential is not None: + cache = self.cache.get(f"tokens_{credential.sha1}") + + headers = { + "Host": "auth.prd.user.itv.com", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", + "Accept": "application/vnd.user.auth.v2+json", + "Accept-Language": "en-US,en;q=0.8", + "Origin": "https://www.itv.com", + "Connection": "keep-alive", + "Referer": "https://www.itv.com/", + } + + if cache: + self.log.info(" + Using cached Tokens...") + r = self.session.get( + self.config["endpoints"]["refresh"], + headers=headers, + params={"refresh": cache.data["refresh_token"]}, + ) + if r.status_code != 200: + raise ConnectionError(f"Failed to refresh tokens: {r.text}") + + tokens = r.json() + else: + r = self.session.post( + self.config["endpoints"]["login"], + headers=headers, + data=json.dumps( + { + "username": credential.username, + "password": credential.password, + "scope": "content", + "grant_type": "password", + "nonce": f"cerberus-auth-request-{int(time.time())}", + } + ), + ) + if r.status_code != 200: + raise ConnectionError(f"Failed to authenticate: {r.text}") + + tokens = r.json() + self.log.info(" + Acquired Tokens...") + + cache.set(tokens) + + self.authorization = tokens["access_token"] + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "broadcaster": "itv", + "featureSet": "clearkey,outband-webvtt,hls,aes,playready,widevine,fairplay,bbts,progressive,hd,rtmpe", + "onlyFree": "false", + "platform": "dotcom", + "query": self.title, + } + + r = self.session.get(self.config["endpoints"]["search"], params=params) + r.raise_for_status() + + results = r.json()["results"] + if isinstance(results, list): + for result in results: + special = result["data"].get("specialTitle") + standard = result["data"].get("programmeTitle") + film = result["data"].get("filmTitle") + title = special if special else standard if standard else film + tier = result["data"].get("tier") + + slug = self._sanitize(title) + + _id = result["data"]["legacyId"]["apiEncoded"] + _id = "_".join(_id.split("_")[:2]).replace("_", "a") + _id = re.sub(r"a000\d+", "", _id) + + yield SearchResult( + id_=f"https://www.itv.com/watch/{slug}/{_id}", + title=title, + description=result["data"].get("synopsis"), + label=result.get("entityType") + f" {tier}", + url=f"https://www.itv.com/watch/{slug}/{_id}", + ) + + def get_titles(self) -> Union[Movies, Series]: + data = self.get_data(self.title) + kind = data["seriesList"][0]["seriesType"] + + if kind == "SERIES" and data.get("episode"): + episode = data.get("episode") + return Series( + [ + Episode( + id_=episode["episodeId"], + service=self.__class__, + title=data["programme"]["title"], + season=episode.get("series") if isinstance(episode.get("series"), int) else 0, + number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0, + name=episode["episodeTitle"], + language="en", # TODO: language detection + data=episode, + ) + ] + ) + + elif kind == "SERIES": + return Series( + [ + Episode( + id_=episode["episodeId"], + service=self.__class__, + title=data["programme"]["title"], + season=episode.get("series") if isinstance(episode.get("series"), int) else 0, + number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0, + name=episode["episodeTitle"], + language="en", # TODO: language detection + data=episode, + ) + for series in data["seriesList"] + if "Latest episodes" not in series["seriesLabel"] + for episode in series["titles"] + ] + ) + + elif kind == "FILM": + return Movies( + [ + Movie( + id_=movie["episodeId"], + service=self.__class__, + name=data["programme"]["title"], + year=movie.get("productionYear"), + language="en", # TODO: language detection + data=movie, + ) + for movies in data["seriesList"] + for movie in movies["titles"] + ] + ) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + playlist = title.data.get("playlistUrl") + + featureset = {k: ("mpeg-dash", "widevine", "outband-webvtt", "hd", "single-track") for k in ("min", "max")} + payload = { + "client": {"id": "browser"}, + "variantAvailability": {"featureset": featureset, "platformTag": "dotcom"}, + } + if self.authorization: + payload["user"] = {"token": self.authorization} + + r = self.session.post(playlist, json=payload) + if r.status_code != 200: + raise ConnectionError(r.text) + + data = r.json() + video = data["Playlist"]["Video"] + subtitles = video.get("Subtitles") + self.manifest = video.get("Base") + video["MediaFiles"][0].get("Href") + self.license = video["MediaFiles"][0].get("KeyServiceUrl") + + tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language) + tracks.videos[0].data = data + + if subtitles is not None: + for subtitle in subtitles: + tracks.add( + Subtitle( + id_=hashlib.md5(subtitle.get("Href", "").encode()).hexdigest()[0:6], + url=subtitle.get("Href", ""), + codec=Subtitle.Codec.from_mime(subtitle.get("Href", "")[-3:]), + language=title.language, + forced=False, + ) + ) + + for track in tracks.audio: + role = track.data["dash"]["representation"].find("Role") + if role is not None and role.get("value") in ["description", "alternative", "alternate"]: + track.descriptive = True + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> Chapters: + track = title.tracks.videos[0] + if not track.data["Playlist"].get("ContentBreaks"): + return Chapters() + + breaks = track.data["Playlist"]["ContentBreaks"] + timecodes = [".".join(x.get("TimeCode").rsplit(":", 1)) for x in breaks if x.get("TimeCode") != "00:00:00:000"] + + # End credits are sometimes listed before the last chapter, so we skip those for now + return Chapters([Chapter(timecode) for timecode in timecodes]) + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes: + r = self.session.post(url=self.license, data=challenge) + if r.status_code != 200: + raise ConnectionError(r.text) + return r.content + + # Service specific functions + + def get_data(self, url: str) -> dict: + # TODO: Find a proper endpoint for this + + r = self.session.get(url) + if r.status_code != 200: + raise ConnectionError(r.text) + + soup = BeautifulSoup(r.text, "html.parser") + props = soup.select_one("#__NEXT_DATA__").text + + try: + data = json.loads(props) + except Exception as e: + raise ValueError(f"Failed to parse JSON: {e}") + + return data["props"]["pageProps"] + + @staticmethod + def _sanitize(title: str) -> str: + title = title.lower() + title = title.replace("&", "and") + title = re.sub(r"[:;/()]", "", title) + title = re.sub(r"[ ]", "-", title) + title = re.sub(r"[\\*!?¿,'\"<>|$#`’]", "", title) + title = re.sub(rf"[{'.'}]{{2,}}", ".", title) + title = re.sub(rf"[{'_'}]{{2,}}", "_", title) + title = re.sub(rf"[{'-'}]{{2,}}", "-", title) + title = re.sub(rf"[{' '}]{{2,}}", " ", title) + return title diff --git a/services/ITV/config.yaml b/services/ITV/config.yaml new file mode 100644 index 0000000..5ab82ce --- /dev/null +++ b/services/ITV/config.yaml @@ -0,0 +1,8 @@ +headers: + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + accept-language: en-US,en;q=0.8 + +endpoints: + login: https://auth.prd.user.itv.com/v2/auth + refresh: https://auth.prd.user.itv.com/token + search: https://textsearch.prd.oasvc.itv.com/search \ No newline at end of file