feat(services): Add ITV service

This commit is contained in:
stabbedbybrick 2024-05-30 17:07:54 +02:00
parent 79ad7516d1
commit 013c5f028c
2 changed files with 313 additions and 0 deletions

305
services/ITV/__init__.py Normal file
View File

@ -0,0 +1,305 @@
from __future__ import annotations
import hashlib
import json
import re
import time
from collections.abc import Generator
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
from bs4 import BeautifulSoup
from click import Context
from devine.core.credential import Credential
from devine.core.manifests.dash import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks
class ITV(Service):
"""
Service code for ITVx streaming service (https://www.itv.com/).
\b
Author: stabbedbybrick
Authorization: Credentials (Optional for free content | Required for premium content)
Robustness:
L1: 1080p
L3: 720p
\b
Tips:
- Use complete title URL as input (pay attention to the URL format):
SERIES: https://www.itv.com/watch/bay-of-fires/10a5270
EPISODE: https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001
FILM: https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095
\b
Examples:
- SERIES: devine dl -w s01e01 itv https://www.itv.com/watch/bay-of-fires/10a5270
- EPISODE: devine dl itv https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001
- FILM: devine dl itv https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095
\b
Notes:
ITV seem to detect and throttle multiple connections against the server.
It's recommended to use requests as downloader, with few workers.
"""
GEOFENCE = ("gb",)
ALIASES = ("itvx",)
@staticmethod
@click.command(name="ITV", short_help="https://www.itv.com/", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> ITV:
return ITV(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
super().__init__(ctx)
self.session.headers.update(self.config["headers"])
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
self.authorization = None
if credential is not None:
cache = self.cache.get(f"tokens_{credential.sha1}")
headers = {
"Host": "auth.prd.user.itv.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"Accept": "application/vnd.user.auth.v2+json",
"Accept-Language": "en-US,en;q=0.8",
"Origin": "https://www.itv.com",
"Connection": "keep-alive",
"Referer": "https://www.itv.com/",
}
if cache:
self.log.info(" + Using cached Tokens...")
r = self.session.get(
self.config["endpoints"]["refresh"],
headers=headers,
params={"refresh": cache.data["refresh_token"]},
)
if r.status_code != 200:
raise ConnectionError(f"Failed to refresh tokens: {r.text}")
tokens = r.json()
else:
r = self.session.post(
self.config["endpoints"]["login"],
headers=headers,
data=json.dumps(
{
"username": credential.username,
"password": credential.password,
"scope": "content",
"grant_type": "password",
"nonce": f"cerberus-auth-request-{int(time.time())}",
}
),
)
if r.status_code != 200:
raise ConnectionError(f"Failed to authenticate: {r.text}")
tokens = r.json()
self.log.info(" + Acquired Tokens...")
cache.set(tokens)
self.authorization = tokens["access_token"]
def search(self) -> Generator[SearchResult, None, None]:
params = {
"broadcaster": "itv",
"featureSet": "clearkey,outband-webvtt,hls,aes,playready,widevine,fairplay,bbts,progressive,hd,rtmpe",
"onlyFree": "false",
"platform": "dotcom",
"query": self.title,
}
r = self.session.get(self.config["endpoints"]["search"], params=params)
r.raise_for_status()
results = r.json()["results"]
if isinstance(results, list):
for result in results:
special = result["data"].get("specialTitle")
standard = result["data"].get("programmeTitle")
film = result["data"].get("filmTitle")
title = special if special else standard if standard else film
tier = result["data"].get("tier")
slug = self._sanitize(title)
_id = result["data"]["legacyId"]["apiEncoded"]
_id = "_".join(_id.split("_")[:2]).replace("_", "a")
_id = re.sub(r"a000\d+", "", _id)
yield SearchResult(
id_=f"https://www.itv.com/watch/{slug}/{_id}",
title=title,
description=result["data"].get("synopsis"),
label=result.get("entityType") + f" {tier}",
url=f"https://www.itv.com/watch/{slug}/{_id}",
)
def get_titles(self) -> Union[Movies, Series]:
data = self.get_data(self.title)
kind = data["seriesList"][0]["seriesType"]
if kind == "SERIES" and data.get("episode"):
episode = data.get("episode")
return Series(
[
Episode(
id_=episode["episodeId"],
service=self.__class__,
title=data["programme"]["title"],
season=episode.get("series") if isinstance(episode.get("series"), int) else 0,
number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0,
name=episode["episodeTitle"],
language="en", # TODO: language detection
data=episode,
)
]
)
elif kind == "SERIES":
return Series(
[
Episode(
id_=episode["episodeId"],
service=self.__class__,
title=data["programme"]["title"],
season=episode.get("series") if isinstance(episode.get("series"), int) else 0,
number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0,
name=episode["episodeTitle"],
language="en", # TODO: language detection
data=episode,
)
for series in data["seriesList"]
if "Latest episodes" not in series["seriesLabel"]
for episode in series["titles"]
]
)
elif kind == "FILM":
return Movies(
[
Movie(
id_=movie["episodeId"],
service=self.__class__,
name=data["programme"]["title"],
year=movie.get("productionYear"),
language="en", # TODO: language detection
data=movie,
)
for movies in data["seriesList"]
for movie in movies["titles"]
]
)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
playlist = title.data.get("playlistUrl")
featureset = {k: ("mpeg-dash", "widevine", "outband-webvtt", "hd", "single-track") for k in ("min", "max")}
payload = {
"client": {"id": "browser"},
"variantAvailability": {"featureset": featureset, "platformTag": "dotcom"},
}
if self.authorization:
payload["user"] = {"token": self.authorization}
r = self.session.post(playlist, json=payload)
if r.status_code != 200:
raise ConnectionError(r.text)
data = r.json()
video = data["Playlist"]["Video"]
subtitles = video.get("Subtitles")
self.manifest = video.get("Base") + video["MediaFiles"][0].get("Href")
self.license = video["MediaFiles"][0].get("KeyServiceUrl")
tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language)
tracks.videos[0].data = data
if subtitles is not None:
for subtitle in subtitles:
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle.get("Href", "").encode()).hexdigest()[0:6],
url=subtitle.get("Href", ""),
codec=Subtitle.Codec.from_mime(subtitle.get("Href", "")[-3:]),
language=title.language,
forced=False,
)
)
for track in tracks.audio:
role = track.data["dash"]["representation"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
track = title.tracks.videos[0]
if not track.data["Playlist"].get("ContentBreaks"):
return Chapters()
breaks = track.data["Playlist"]["ContentBreaks"]
timecodes = [".".join(x.get("TimeCode").rsplit(":", 1)) for x in breaks if x.get("TimeCode") != "00:00:00:000"]
# End credits are sometimes listed before the last chapter, so we skip those for now
return Chapters([Chapter(timecode) for timecode in timecodes])
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
raise ConnectionError(r.text)
return r.content
# Service specific functions
def get_data(self, url: str) -> dict:
# TODO: Find a proper endpoint for this
r = self.session.get(url)
if r.status_code != 200:
raise ConnectionError(r.text)
soup = BeautifulSoup(r.text, "html.parser")
props = soup.select_one("#__NEXT_DATA__").text
try:
data = json.loads(props)
except Exception as e:
raise ValueError(f"Failed to parse JSON: {e}")
return data["props"]["pageProps"]
@staticmethod
def _sanitize(title: str) -> str:
title = title.lower()
title = title.replace("&", "and")
title = re.sub(r"[:;/()]", "", title)
title = re.sub(r"[ ]", "-", title)
title = re.sub(r"[\\*!?¿,'\"<>|$#`]", "", title)
title = re.sub(rf"[{'.'}]{{2,}}", ".", title)
title = re.sub(rf"[{'_'}]{{2,}}", "_", title)
title = re.sub(rf"[{'-'}]{{2,}}", "-", title)
title = re.sub(rf"[{' '}]{{2,}}", " ", title)
return title

8
services/ITV/config.yaml Normal file
View File

@ -0,0 +1,8 @@
headers:
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0
accept-language: en-US,en;q=0.8
endpoints:
login: https://auth.prd.user.itv.com/v2/auth
refresh: https://auth.prd.user.itv.com/token
search: https://textsearch.prd.oasvc.itv.com/search