2024-05-30 15:07:54 +00:00
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
|
import re
|
2024-06-29 12:58:48 +00:00
|
|
|
|
import sys
|
2024-05-30 15:07:54 +00:00
|
|
|
|
from collections.abc import Generator
|
|
|
|
|
from http.cookiejar import MozillaCookieJar
|
|
|
|
|
from typing import Any, Optional, Union
|
|
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
from click import Context
|
|
|
|
|
from devine.core.credential import Credential
|
|
|
|
|
from devine.core.manifests.dash import DASH
|
|
|
|
|
from devine.core.search_result import SearchResult
|
|
|
|
|
from devine.core.service import Service
|
|
|
|
|
from devine.core.titles import Episode, Movie, Movies, Series
|
|
|
|
|
from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ITV(Service):
|
|
|
|
|
"""
|
|
|
|
|
Service code for ITVx streaming service (https://www.itv.com/).
|
|
|
|
|
|
|
|
|
|
\b
|
|
|
|
|
Author: stabbedbybrick
|
2024-06-29 12:58:48 +00:00
|
|
|
|
Authorization: Cookies (Optional for free content | Required for premium content)
|
2024-05-30 15:07:54 +00:00
|
|
|
|
Robustness:
|
|
|
|
|
L1: 1080p
|
|
|
|
|
L3: 720p
|
|
|
|
|
|
|
|
|
|
\b
|
|
|
|
|
Tips:
|
|
|
|
|
- Use complete title URL as input (pay attention to the URL format):
|
|
|
|
|
SERIES: https://www.itv.com/watch/bay-of-fires/10a5270
|
|
|
|
|
EPISODE: https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001
|
|
|
|
|
FILM: https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095
|
2024-06-02 19:39:54 +00:00
|
|
|
|
- Some shows aren't listed as series, only as "Latest episodes"
|
|
|
|
|
Download by SERIES URL for those titles, not by EPISODE URL
|
2024-05-30 15:07:54 +00:00
|
|
|
|
|
|
|
|
|
\b
|
|
|
|
|
Examples:
|
|
|
|
|
- SERIES: devine dl -w s01e01 itv https://www.itv.com/watch/bay-of-fires/10a5270
|
|
|
|
|
- EPISODE: devine dl itv https://www.itv.com/watch/bay-of-fires/10a5270/10a5270a0001
|
|
|
|
|
- FILM: devine dl itv https://www.itv.com/watch/mad-max-beyond-thunderdome/2a7095
|
|
|
|
|
|
|
|
|
|
\b
|
|
|
|
|
Notes:
|
|
|
|
|
ITV seem to detect and throttle multiple connections against the server.
|
|
|
|
|
It's recommended to use requests as downloader, with few workers.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
GEOFENCE = ("gb",)
|
|
|
|
|
ALIASES = ("itvx",)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
@click.command(name="ITV", short_help="https://www.itv.com/", help=__doc__)
|
|
|
|
|
@click.argument("title", type=str)
|
|
|
|
|
@click.pass_context
|
|
|
|
|
def cli(ctx: Context, **kwargs: Any) -> ITV:
|
|
|
|
|
return ITV(ctx, **kwargs)
|
|
|
|
|
|
|
|
|
|
def __init__(self, ctx: Context, title: str):
|
|
|
|
|
self.title = title
|
|
|
|
|
super().__init__(ctx)
|
|
|
|
|
|
2024-06-29 12:58:48 +00:00
|
|
|
|
self.profile = ctx.parent.params.get("profile")
|
|
|
|
|
if not self.profile:
|
|
|
|
|
self.profile = "default"
|
|
|
|
|
|
2024-05-30 15:07:54 +00:00
|
|
|
|
self.session.headers.update(self.config["headers"])
|
|
|
|
|
|
|
|
|
|
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
|
|
|
super().authenticate(cookies, credential)
|
|
|
|
|
self.authorization = None
|
|
|
|
|
|
2024-06-29 12:58:48 +00:00
|
|
|
|
if credential and not cookies:
|
|
|
|
|
self.log.error(" - Error: This service requires cookies for authentication.")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
if cookies is not None:
|
|
|
|
|
self.log.info(f"\n + Cookies for '{self.profile}' profile found, authenticating...")
|
|
|
|
|
itv_session = next((cookie.value for cookie in cookies if cookie.name == "Itv.Session"), None)
|
|
|
|
|
if not itv_session:
|
|
|
|
|
self.log.error(" - Error: Session cookie not found. Cookies may be invalid.")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
itv_session = json.loads(itv_session)
|
|
|
|
|
refresh_token = itv_session["tokens"]["content"].get("refresh_token")
|
|
|
|
|
if not refresh_token:
|
|
|
|
|
self.log.error(" - Error: Access tokens not found. Try refreshing your cookies.")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
cache = self.cache.get(f"tokens_{self.profile}")
|
2024-05-30 15:07:54 +00:00
|
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
|
"Host": "auth.prd.user.itv.com",
|
|
|
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
|
|
|
|
|
"Accept": "application/vnd.user.auth.v2+json",
|
|
|
|
|
"Accept-Language": "en-US,en;q=0.8",
|
|
|
|
|
"Origin": "https://www.itv.com",
|
|
|
|
|
"Connection": "keep-alive",
|
|
|
|
|
"Referer": "https://www.itv.com/",
|
|
|
|
|
}
|
|
|
|
|
|
2024-06-29 12:58:48 +00:00
|
|
|
|
params = {"refresh": cache.data["refresh_token"]} if cache else {"refresh": refresh_token}
|
|
|
|
|
|
|
|
|
|
r = self.session.get(
|
|
|
|
|
self.config["endpoints"]["refresh"],
|
|
|
|
|
headers=headers,
|
|
|
|
|
params=params,
|
|
|
|
|
)
|
|
|
|
|
if r.status_code != 200:
|
|
|
|
|
raise ConnectionError(f"Failed to refresh tokens: {r.text}")
|
2024-05-30 15:07:54 +00:00
|
|
|
|
|
2024-06-29 12:58:48 +00:00
|
|
|
|
tokens = r.json()
|
2024-05-30 15:07:54 +00:00
|
|
|
|
cache.set(tokens)
|
2024-06-29 12:58:48 +00:00
|
|
|
|
self.log.info(" + Tokens refreshed and placed in cache\n")
|
2024-05-30 15:07:54 +00:00
|
|
|
|
|
|
|
|
|
self.authorization = tokens["access_token"]
|
|
|
|
|
|
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
|
|
|
params = {
|
|
|
|
|
"broadcaster": "itv",
|
|
|
|
|
"featureSet": "clearkey,outband-webvtt,hls,aes,playready,widevine,fairplay,bbts,progressive,hd,rtmpe",
|
|
|
|
|
"onlyFree": "false",
|
|
|
|
|
"platform": "dotcom",
|
|
|
|
|
"query": self.title,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r = self.session.get(self.config["endpoints"]["search"], params=params)
|
|
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
|
|
results = r.json()["results"]
|
|
|
|
|
if isinstance(results, list):
|
|
|
|
|
for result in results:
|
|
|
|
|
special = result["data"].get("specialTitle")
|
|
|
|
|
standard = result["data"].get("programmeTitle")
|
|
|
|
|
film = result["data"].get("filmTitle")
|
|
|
|
|
title = special if special else standard if standard else film
|
|
|
|
|
tier = result["data"].get("tier")
|
|
|
|
|
|
|
|
|
|
slug = self._sanitize(title)
|
|
|
|
|
|
|
|
|
|
_id = result["data"]["legacyId"]["apiEncoded"]
|
|
|
|
|
_id = "_".join(_id.split("_")[:2]).replace("_", "a")
|
|
|
|
|
_id = re.sub(r"a000\d+", "", _id)
|
|
|
|
|
|
|
|
|
|
yield SearchResult(
|
|
|
|
|
id_=f"https://www.itv.com/watch/{slug}/{_id}",
|
|
|
|
|
title=title,
|
|
|
|
|
description=result["data"].get("synopsis"),
|
|
|
|
|
label=result.get("entityType") + f" {tier}",
|
|
|
|
|
url=f"https://www.itv.com/watch/{slug}/{_id}",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def get_titles(self) -> Union[Movies, Series]:
|
|
|
|
|
data = self.get_data(self.title)
|
2024-06-02 19:39:54 +00:00
|
|
|
|
kind = next(
|
|
|
|
|
(x.get("seriesType") for x in data.get("seriesList") if x.get("seriesType") in ["SERIES", "FILM"]), None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Some shows are not listed as "SERIES" or "FILM", only as "Latest episodes"
|
|
|
|
|
if not kind and next(
|
|
|
|
|
(x for x in data.get("seriesList") if x.get("seriesLabel").lower() == "latest episodes"), None
|
|
|
|
|
):
|
|
|
|
|
titles = data["seriesList"][0]["titles"]
|
|
|
|
|
return Series(
|
|
|
|
|
[
|
|
|
|
|
Episode(
|
|
|
|
|
id_=episode["episodeId"],
|
|
|
|
|
service=self.__class__,
|
|
|
|
|
title=data["programme"]["title"],
|
|
|
|
|
season=episode.get("series") if isinstance(episode.get("series"), int) else 0,
|
|
|
|
|
number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0,
|
|
|
|
|
name=episode["episodeTitle"],
|
|
|
|
|
language="en", # TODO: language detection
|
|
|
|
|
data=episode,
|
|
|
|
|
)
|
|
|
|
|
for episode in titles
|
|
|
|
|
]
|
|
|
|
|
)
|
2024-05-30 15:07:54 +00:00
|
|
|
|
|
|
|
|
|
if kind == "SERIES" and data.get("episode"):
|
|
|
|
|
episode = data.get("episode")
|
|
|
|
|
return Series(
|
|
|
|
|
[
|
|
|
|
|
Episode(
|
|
|
|
|
id_=episode["episodeId"],
|
|
|
|
|
service=self.__class__,
|
|
|
|
|
title=data["programme"]["title"],
|
|
|
|
|
season=episode.get("series") if isinstance(episode.get("series"), int) else 0,
|
|
|
|
|
number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0,
|
|
|
|
|
name=episode["episodeTitle"],
|
|
|
|
|
language="en", # TODO: language detection
|
|
|
|
|
data=episode,
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
elif kind == "SERIES":
|
|
|
|
|
return Series(
|
|
|
|
|
[
|
|
|
|
|
Episode(
|
|
|
|
|
id_=episode["episodeId"],
|
|
|
|
|
service=self.__class__,
|
|
|
|
|
title=data["programme"]["title"],
|
|
|
|
|
season=episode.get("series") if isinstance(episode.get("series"), int) else 0,
|
|
|
|
|
number=episode.get("episode") if isinstance(episode.get("episode"), int) else 0,
|
|
|
|
|
name=episode["episodeTitle"],
|
|
|
|
|
language="en", # TODO: language detection
|
|
|
|
|
data=episode,
|
|
|
|
|
)
|
|
|
|
|
for series in data["seriesList"]
|
|
|
|
|
if "Latest episodes" not in series["seriesLabel"]
|
|
|
|
|
for episode in series["titles"]
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
elif kind == "FILM":
|
|
|
|
|
return Movies(
|
|
|
|
|
[
|
|
|
|
|
Movie(
|
|
|
|
|
id_=movie["episodeId"],
|
|
|
|
|
service=self.__class__,
|
|
|
|
|
name=data["programme"]["title"],
|
|
|
|
|
year=movie.get("productionYear"),
|
|
|
|
|
language="en", # TODO: language detection
|
|
|
|
|
data=movie,
|
|
|
|
|
)
|
|
|
|
|
for movies in data["seriesList"]
|
|
|
|
|
for movie in movies["titles"]
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
|
|
|
|
|
playlist = title.data.get("playlistUrl")
|
|
|
|
|
|
|
|
|
|
featureset = {k: ("mpeg-dash", "widevine", "outband-webvtt", "hd", "single-track") for k in ("min", "max")}
|
|
|
|
|
payload = {
|
|
|
|
|
"client": {"id": "browser"},
|
|
|
|
|
"variantAvailability": {"featureset": featureset, "platformTag": "dotcom"},
|
|
|
|
|
}
|
|
|
|
|
if self.authorization:
|
|
|
|
|
payload["user"] = {"token": self.authorization}
|
|
|
|
|
|
|
|
|
|
r = self.session.post(playlist, json=payload)
|
|
|
|
|
if r.status_code != 200:
|
|
|
|
|
raise ConnectionError(r.text)
|
|
|
|
|
|
|
|
|
|
data = r.json()
|
|
|
|
|
video = data["Playlist"]["Video"]
|
|
|
|
|
subtitles = video.get("Subtitles")
|
|
|
|
|
self.manifest = video.get("Base") + video["MediaFiles"][0].get("Href")
|
|
|
|
|
self.license = video["MediaFiles"][0].get("KeyServiceUrl")
|
|
|
|
|
|
|
|
|
|
tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language)
|
|
|
|
|
tracks.videos[0].data = data
|
|
|
|
|
|
|
|
|
|
if subtitles is not None:
|
|
|
|
|
for subtitle in subtitles:
|
|
|
|
|
tracks.add(
|
|
|
|
|
Subtitle(
|
|
|
|
|
id_=hashlib.md5(subtitle.get("Href", "").encode()).hexdigest()[0:6],
|
|
|
|
|
url=subtitle.get("Href", ""),
|
|
|
|
|
codec=Subtitle.Codec.from_mime(subtitle.get("Href", "")[-3:]),
|
|
|
|
|
language=title.language,
|
|
|
|
|
forced=False,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for track in tracks.audio:
|
|
|
|
|
role = track.data["dash"]["representation"].find("Role")
|
|
|
|
|
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
|
|
|
|
|
track.descriptive = True
|
|
|
|
|
|
|
|
|
|
return tracks
|
|
|
|
|
|
|
|
|
|
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
|
|
|
|
|
track = title.tracks.videos[0]
|
|
|
|
|
if not track.data["Playlist"].get("ContentBreaks"):
|
|
|
|
|
return Chapters()
|
|
|
|
|
|
|
|
|
|
breaks = track.data["Playlist"]["ContentBreaks"]
|
|
|
|
|
timecodes = [".".join(x.get("TimeCode").rsplit(":", 1)) for x in breaks if x.get("TimeCode") != "00:00:00:000"]
|
|
|
|
|
|
|
|
|
|
# End credits are sometimes listed before the last chapter, so we skip those for now
|
|
|
|
|
return Chapters([Chapter(timecode) for timecode in timecodes])
|
|
|
|
|
|
|
|
|
|
def get_widevine_service_certificate(self, **_: Any) -> str:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
|
|
|
|
|
r = self.session.post(url=self.license, data=challenge)
|
|
|
|
|
if r.status_code != 200:
|
|
|
|
|
raise ConnectionError(r.text)
|
|
|
|
|
return r.content
|
|
|
|
|
|
|
|
|
|
# Service specific functions
|
|
|
|
|
|
|
|
|
|
def get_data(self, url: str) -> dict:
|
|
|
|
|
# TODO: Find a proper endpoint for this
|
|
|
|
|
|
|
|
|
|
r = self.session.get(url)
|
|
|
|
|
if r.status_code != 200:
|
|
|
|
|
raise ConnectionError(r.text)
|
|
|
|
|
|
|
|
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
|
|
|
props = soup.select_one("#__NEXT_DATA__").text
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
data = json.loads(props)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise ValueError(f"Failed to parse JSON: {e}")
|
|
|
|
|
|
|
|
|
|
return data["props"]["pageProps"]
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _sanitize(title: str) -> str:
|
|
|
|
|
title = title.lower()
|
|
|
|
|
title = title.replace("&", "and")
|
|
|
|
|
title = re.sub(r"[:;/()]", "", title)
|
|
|
|
|
title = re.sub(r"[ ]", "-", title)
|
|
|
|
|
title = re.sub(r"[\\*!?¿,'\"<>|$#`’]", "", title)
|
|
|
|
|
title = re.sub(rf"[{'.'}]{{2,}}", ".", title)
|
|
|
|
|
title = re.sub(rf"[{'_'}]{{2,}}", "_", title)
|
|
|
|
|
title = re.sub(rf"[{'-'}]{{2,}}", "-", title)
|
|
|
|
|
title = re.sub(rf"[{' '}]{{2,}}", " ", title)
|
|
|
|
|
return title
|