devine-services/services/iP/__init__.py
stabbedbybrick c1d517f18c fix(iP): Fix failing audio track for Devine v.3.3.3, fix HLG range
- MIssing data for the added audio track in Devine v3.3.3 has been fixed and will now download properly.
- The range for UHD tracks were previously incorrectly listed as SDR but is now listed as HLG. You'll need to use `--range HLG` to request UHD tracks.
- Single episodes are now fetched from API rather than site source code.
2024-05-09 21:10:53 +02:00

366 lines
14 KiB
Python

from __future__ import annotations
import hashlib
import json
import re
import sys
import warnings
from collections.abc import Generator
from typing import Any, Union
import click
from bs4 import XMLParsedAsHTMLWarning
from click import Context
from devine.core.manifests import DASH, HLS
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Audio, Chapter, Subtitle, Tracks, Video
from devine.core.utils.collections import as_list
from devine.core.utils.sslciphers import SSLCiphers
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
class iP(Service):
"""
\b
Service code for the BBC iPlayer streaming service (https://www.bbc.co.uk/iplayer).
Base code from VT, credit to original author
\b
Author: stabbedbybrick
Authorization: None
Security: None
\b
Tips:
- Use full title URL as input for best results.
- Use --list-titles before anything, iPlayer's listings are often messed up.
\b
- An SSL certificate (PEM) is required for accessing the UHD endpoint.
Specify its path using the service configuration data in the root config:
\b
services:
iP:
cert: path/to/cert
\b
- Use --range HLG to request H.265 UHD tracks
- See which titles are available in UHD:
https://www.bbc.co.uk/iplayer/help/questions/programme-availability/uhd-content
"""
ALIASES = ("bbciplayer", "bbc", "iplayer")
GEOFENCE = ("gb",)
TITLE_RE = r"^(?:https?://(?:www\.)?bbc\.co\.uk/(?:iplayer/(?P<kind>episode|episodes)/|programmes/))?(?P<id>[a-z0-9]+)(?:/.*)?$"
@staticmethod
@click.command(name="iP", short_help="https://www.bbc.co.uk/iplayer", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> iP:
return iP(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
self.vcodec = ctx.parent.params.get("vcodec")
self.range = ctx.parent.params.get("range_")
super().__init__(ctx)
if self.range[0].name == "HLG" and not self.config.get("cert"):
self.log.error("UHD tracks cannot be selected without an SSL certificate")
sys.exit(1)
elif self.range[0].name == "HLG" and self.config.get("cert"):
self.vcodec = "H.265"
def search(self) -> Generator[SearchResult, None, None]:
params = {
"q": self.title,
"apikey": self.config["api_key"],
}
r = self.session.get(self.config["endpoints"]["search"], params=params)
r.raise_for_status()
results = r.json()
for result in results["results"]:
yield SearchResult(
id_=result.get("uri").split(":")[-1],
title=result.get("title"),
description=result.get("synopsis"),
label="series" if result.get("type", "") == "brand" else result.get("type"),
url=result.get("url"),
)
def get_titles(self) -> Union[Movies, Series]:
kind, pid = (re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "id"))
if not pid:
raise ValueError("Unable to parse title ID - is the URL or id correct?")
data = self.get_data(pid, slice_id=None)
if data is None and kind == "episode":
return self.get_single_episode(pid)
elif data is None:
raise ValueError(f"Metadata was not found - if {pid} is an episode, use full URL as input")
if "Film" in data["labels"]["category"]:
return Movies(
[
Movie(
id_=data["id"],
name=data["title"]["default"],
year=None, # TODO
service=self.__class__,
language="en",
data=data,
)
]
)
else:
seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]]
episodes = [
self.create_episode(episode, data) for season in seasons for episode in season["entities"]["results"]
]
return Series(episodes)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
r = self.session.get(url=self.config["endpoints"]["playlist"].format(pid=title.id))
if not r.ok:
self.log.error(r.text)
sys.exit(1)
versions = r.json().get("allAvailableVersions")
if not versions:
# If API returns no versions, try to fetch from site source code
r = self.session.get(self.config["base_url"].format(type="episode", pid=title.id))
redux = re.search("window.__IPLAYER_REDUX_STATE__ = (.*?);</script>", r.text).group(1)
data = json.loads(redux)
versions = [{"pid": x.get("id") for x in data["versions"] if not x.get("kind") == "audio-described"}]
quality = [
connection.get("height")
for i in (self.check_all_versions(version) for version in (x.get("pid") for x in versions))
for connection in i
if connection.get("height")
]
max_quality = max((h for h in quality if h < "1080"), default=None)
media = next(
(
i
for i in (self.check_all_versions(version) for version in (x.get("pid") for x in versions))
if any(connection.get("height") == max_quality for connection in i)
),
None,
)
if not media:
self.log.error("No media found. If you're behind a VPN/proxy, you might be blocked")
sys.exit(1)
connection = {}
for video in [x for x in media if x["kind"] == "video"]:
connections = sorted(video["connection"], key=lambda x: x["priority"])
if self.vcodec == "H.265":
connection = connections[0]
else:
connection = next(
x for x in connections if x["supplier"] == "mf_akamai" and x["transferFormat"] == "dash"
)
break
if not self.vcodec == "H.265":
if connection["transferFormat"] == "dash":
connection["href"] = "/".join(
connection["href"].replace("dash", "hls").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"]
)
connection["transferFormat"] = "hls"
elif connection["transferFormat"] == "hls":
connection["href"] = "/".join(
connection["href"].replace(".hlsv2.ism", "").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"]
)
if connection["transferFormat"] != "hls":
raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}")
if connection["transferFormat"] == "dash":
tracks = DASH.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language)
elif connection["transferFormat"] == "hls":
tracks = HLS.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language)
else:
raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}")
for video in tracks.videos:
# UHD DASH manifest has no range information, so we add it manually
if video.codec == Video.Codec.HEVC:
video.range = Video.Range.HLG
if any(re.search(r"-audio_\w+=\d+", x) for x in as_list(video.url)):
# create audio stream from the video stream
audio_url = re.sub(r"-video=\d+", "", as_list(video.url)[0])
audio = Audio(
# use audio_url not video url, as to ignore video bitrate in ID
id_=hashlib.md5(audio_url.encode()).hexdigest()[0:7],
url=audio_url,
codec=Audio.Codec.from_codecs(video.data["hls"]["playlist"].stream_info.codecs),
language=video.data["hls"]["playlist"].media[0].language,
bitrate=int(self.find(r"-audio_\w+=(\d+)", as_list(video.url)[0]) or 0),
channels=video.data["hls"]["playlist"].media[0].channels,
descriptive=False, # Not available
descriptor=Audio.Descriptor.HLS,
drm=video.drm,
data=video.data,
)
if not tracks.exists(by_id=audio.id):
# some video streams use the same audio, so natural dupes exist
tracks.add(audio)
# remove audio from the video stream
video.url = [re.sub(r"-audio_\w+=\d+", "", x) for x in as_list(video.url)][0]
video.codec = Video.Codec.from_codecs(video.data["hls"]["playlist"].stream_info.codecs)
video.bitrate = int(self.find(r"-video=(\d+)", as_list(video.url)[0]) or 0)
for caption in [x for x in media if x["kind"] == "captions"]:
connection = sorted(caption["connection"], key=lambda x: x["priority"])[0]
tracks.add(
Subtitle(
id_=hashlib.md5(connection["href"].encode()).hexdigest()[0:6],
url=connection["href"],
codec=Subtitle.Codec.from_codecs("ttml"),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
break
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> str:
return None
# service specific functions
def get_data(self, pid: str, slice_id: str) -> dict:
json_data = {
"id": "9fd1636abe711717c2baf00cebb668de",
"variables": {
"id": pid,
"perPage": 200,
"page": 1,
"sliceId": slice_id if slice_id else None,
},
}
r = self.session.post(self.config["endpoints"]["metadata"], json=json_data)
r.raise_for_status()
return r.json()["data"]["programme"]
def check_all_versions(self, vpid: str) -> list:
if self.config.get("cert"):
url = self.config["endpoints"]["manifest_"].format(
vpid=vpid,
mediaset="iptv-uhd" if self.vcodec == "H.265" else "iptv-all",
)
session = self.session
session.mount("https://", SSLCiphers())
session.mount("http://", SSLCiphers())
manifest = session.get(
url, headers={"user-agent": self.config["user_agent"]}, cert=self.config["cert"]
).json()
if "result" in manifest:
return {}
else:
url = self.config["endpoints"]["manifest"].format(
vpid=vpid,
mediaset="iptv-all",
)
manifest = self.session.get(url).json()
if "result" in manifest:
return {}
return manifest["media"]
def create_episode(self, episode: dict, data: dict) -> Episode:
title = episode["episode"]["title"]["default"].strip()
subtitle = episode["episode"]["subtitle"]
series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle.get("default") or "")
season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0))
if season_num == 0 and not data.get("slices"):
season_num = 1
number = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle.get("slice") or subtitle.get("default") or "")
ep_num = int(next((m.group(1) or m.group(2) for m in number), 0))
name = re.search(r"\d+\. (.+)", subtitle.get("slice") or "")
ep_name = name.group(1) if name else subtitle.get("slice") or ""
if not subtitle.get("slice"):
ep_name = subtitle.get("default") or ""
return Episode(
id_=episode["episode"].get("id"),
service=self.__class__,
title=title,
season=season_num,
number=ep_num,
name=ep_name,
language="en",
data=episode,
)
def get_single_episode(self, pid: str) -> Series:
r = self.session.get(self.config["endpoints"]["episodes"].format(pid=pid))
r.raise_for_status()
data = json.loads(r.content)
subtitle = data["episodes"][0].get("subtitle")
if subtitle is not None:
season_match = re.search(r"Series (\d+):", subtitle)
season = int(season_match.group(1)) if season_match else 0
number_match = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle)
number = int(next((m.group(1) or m.group(2) for m in number_match), 0))
name_match = re.search(r"\d+\. (.+)", subtitle)
name = (
name_match.group(1)
if name_match
else subtitle
if not re.search(r"Series (\d+): Episode (\d+)", subtitle)
else ""
)
return Series(
[
Episode(
id_=data["episodes"][0]["id"],
service=self.__class__,
title=data["episodes"][0]["title"],
season=season if subtitle else 0,
number=number if subtitle else 0,
name=name if subtitle else "",
language="en",
)
]
)
def find(self, pattern, string, group=None):
if group:
m = re.search(pattern, string)
if m:
return m.group(group)
else:
return next(iter(re.findall(pattern, string)), None)