diff --git a/services/iP/__init__.py b/services/iP/__init__.py new file mode 100644 index 0000000..1872801 --- /dev/null +++ b/services/iP/__init__.py @@ -0,0 +1,287 @@ +from __future__ import annotations + +import hashlib +import re +import sys +import warnings +from collections.abc import Generator +from typing import Any, Union + +import click +from bs4 import XMLParsedAsHTMLWarning +from click import Context +from devine.core.manifests import DASH, HLS +from devine.core.search_result import SearchResult +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Audio, Chapter, Subtitle, Track, Tracks, Video +from devine.core.utils.collections import as_list +from devine.core.utils.sslciphers import SSLCiphers + +warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) + + +class iP(Service): + """ + \b + Service code for the BBC iPlayer streaming service (https://www.bbc.co.uk/iplayer). + Base code from VT, credit to original author + + \b + Author: stabbedbybrick + Authorization: None + Security: None + + \b + Tips: + - Use title URL or id as input: + https://www.bbc.co.uk/iplayer/episodes/b007r3n8 OR b007r3n8 + - Downloading by episode URL is currently not supported + + \b + - An SSL certificate (PEM) is required for accessing the UHD endpoint. + Specify its path using the service configuration data in the root config: + \b + services: + iP: + cert: path/to/cert + \b + - Use -v H.265 to request UHD tracks + """ + + ALIASES = ("bbciplayer", "bbc", "iplayer") + GEOFENCE = ("gb",) + TITLE_RE = r"^(?:https?://(?:www\.)?bbc\.co\.uk/iplayer/episodes?/)?(?P[a-z0-9]+)" + + @staticmethod + @click.command(name="iP", short_help="https://www.bbc.co.uk/iplayer", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> iP: + return iP(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + self.vcodec = ctx.parent.params.get("vcodec") + super().__init__(ctx) + + if self.vcodec == "H.265" and not self.config.get("cert"): + self.log.error("H.265 cannot be selected without a certificate") + sys.exit(1) + + quality = ctx.parent.params.get("quality") + if quality and quality[0] > 1080 and self.vcodec != "H.265" and self.config.get("cert"): + self.log.info(" + Switched video codec to H.265 to be able to get 2160p video track") + self.vcodec = "H.265" + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "q": self.title, + "apikey": self.config["api_key"], + } + + r = self.session.get(self.config["endpoints"]["search"], params=params) + r.raise_for_status() + + results = r.json() + for result in results["results"]: + yield SearchResult( + id_=result.get("uri").split(":")[-1], + title=result.get("title"), + description=result.get("synopsis"), + label="series" if result.get("type", "") == "brand" else result.get("type"), + url=result.get("url"), + ) + + def get_titles(self) -> Union[Movies, Series]: + pid = re.match(self.TITLE_RE, self.title).group("id") + if not pid: + self.log.error("Invalid input - is the URL or id correct?") + sys.exit(1) + + data = self.get_data(pid, slice_id=None) + if data is None: + self.log.error("Metadata was not found - is the URL or id correct?") + sys.exit(1) + + if "Film" in data["labels"]["category"]: + return Movies( + [ + Movie( + id_=data["id"], + name=data["title"]["default"], + year=None, # TODO + service=self.__class__, + language="en", + data=data, + ) + ] + ) + else: + seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]] + episodes = [self.create_episode(episode) for season in seasons for episode in season["entities"]["results"]] + return Series(episodes) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + playlist = self.session.get(url=self.config["endpoints"]["playlist"].format(pid=title.id)).json() + if not playlist["defaultAvailableVersion"]: + self.log.error(" - Title is unavailable") + sys.exit(1) + + if self.config.get("cert"): + url = self.config["endpoints"]["manifest_"].format( + vpid=playlist["defaultAvailableVersion"]["smpConfig"]["items"][0]["vpid"], + mediaset="iptv-uhd" if self.vcodec == "H.265" else "iptv-all", + ) + + session = self.session + session.mount("https://", SSLCiphers()) + session.mount("http://", SSLCiphers()) + manifest = session.get( + url, headers={"user-agent": self.config["user_agent"]}, cert=self.config["cert"] + ).json() + + if "result" in manifest: + self.log.error(f" - Failed to get manifest [{manifest['result']}]") + sys.exit(1) + + else: + url = self.config["endpoints"]["manifest"].format( + vpid=playlist["defaultAvailableVersion"]["smpConfig"]["items"][0]["vpid"], + mediaset="iptv-all", + ) + manifest = self.session.get(url).json() + + connection = {} + for video in [x for x in manifest["media"] if x["kind"] == "video"]: + connections = sorted(video["connection"], key=lambda x: x["priority"]) + if self.vcodec == "H.265": + connection = connections[0] + else: + connection = next( + x for x in connections if x["supplier"] == "mf_akamai" and x["transferFormat"] == "dash" + ) + + break + + if not self.vcodec == "H.265": + if connection["transferFormat"] == "dash": + connection["href"] = "/".join( + connection["href"].replace("dash", "hls").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"] + ) + connection["transferFormat"] = "hls" + elif connection["transferFormat"] == "hls": + connection["href"] = "/".join( + connection["href"].replace(".hlsv2.ism", "").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"] + ) + + if connection["transferFormat"] != "hls": + raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}") + + if connection["transferFormat"] == "dash": + tracks = DASH.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language) + elif connection["transferFormat"] == "hls": + tracks = HLS.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language) + else: + raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}") + + for video in tracks.videos: + # TODO: add HLG to UHD tracks + + if any(re.search(r"-audio_\w+=\d+", x) for x in as_list(video.url)): + # create audio stream from the video stream + audio_url = re.sub(r"-video=\d+", "", as_list(video.url)[0]) + audio = Audio( + # use audio_url not video url, as to ignore video bitrate in ID + id_=hashlib.md5(audio_url.encode()).hexdigest()[0:7], + url=audio_url, + codec=Audio.Codec.from_codecs("mp4a"), + language=[v.language for v in video.data["hls"]["playlist"].media][0], + bitrate=int(self.find(r"-audio_\w+=(\d+)", as_list(video.url)[0]) or 0), + channels=[v.channels for v in video.data["hls"]["playlist"].media][0], + descriptive=False, # Not available + descriptor=Track.Descriptor.HLS, + ) + if not tracks.exists(by_id=audio.id): + # some video streams use the same audio, so natural dupes exist + tracks.add(audio) + # remove audio from the video stream + video.url = [re.sub(r"-audio_\w+=\d+", "", x) for x in as_list(video.url)][0] + video.codec = Video.Codec.from_codecs(video.data["hls"]["playlist"].stream_info.codecs) + video.bitrate = int(self.find(r"-video=(\d+)", as_list(video.url)[0]) or 0) + + for caption in [x for x in manifest["media"] if x["kind"] == "captions"]: + connection = sorted(caption["connection"], key=lambda x: x["priority"])[0] + tracks.add( + Subtitle( + id_=hashlib.md5(connection["href"].encode()).hexdigest()[0:6], + url=connection["href"], + codec=Subtitle.Codec.from_codecs("ttml"), + language=title.language, + is_original_lang=True, + forced=False, + sdh=True, + ) + ) + break + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: + return [] + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, challenge: bytes, **_: Any) -> str: + return None + + # service specific functions + + def get_data(self, pid: str, slice_id: str) -> dict: + json_data = { + "id": "9fd1636abe711717c2baf00cebb668de", + "variables": { + "id": pid, + "perPage": 200, + "page": 1, + "sliceId": slice_id if slice_id else None, + }, + } + + r = self.session.post(self.config["endpoints"]["metadata"], json=json_data) + r.raise_for_status() + + return r.json()["data"]["programme"] + + def create_episode(self, episode): + title = episode["episode"]["title"]["default"].strip() + subtitle = episode["episode"]["subtitle"] + series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle.get("default")) + season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0)) + + number = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle.get("slice") or subtitle.get("default") or "") + ep_num = int(next((m.group(1) or m.group(2) for m in number), 0)) + + name = re.search(r"\d+\. (.+)", subtitle.get("slice") or "") + ep_name = name.group(1) if name else subtitle.get("slice") or "" + if not subtitle.get("slice"): + ep_name = subtitle.get("default") or "" + + return Episode( + id_=episode["episode"].get("id"), + service=self.__class__, + title=title, + season=season_num, + number=ep_num, + name=ep_name, + language="en", + ) + + def find(self, pattern, string, group=None): + if group: + m = re.search(pattern, string) + if m: + return m.group(group) + else: + return next(iter(re.findall(pattern, string)), None) diff --git a/services/iP/config.yaml b/services/iP/config.yaml new file mode 100644 index 0000000..8c60032 --- /dev/null +++ b/services/iP/config.yaml @@ -0,0 +1,9 @@ +user_agent: 'smarttv_AFTMM_Build_0003255372676_Chromium_41.0.2250.2' +api_key: 'D2FgtcTxGqqIgLsfBWTJdrQh2tVdeaAp' + +endpoints: + metadata: 'https://graph.ibl.api.bbc.co.uk/' + playlist: 'https://www.bbc.co.uk/programmes/{pid}/playlist.json' + manifest: "https://open.live.bbc.co.uk/mediaselector/6/select/version/2.0/mediaset/{mediaset}/vpid/{vpid}/" + manifest_: 'https://securegate.iplayer.bbc.co.uk/mediaselector/6/select/version/2.0/vpid/{vpid}/format/json/mediaset/{mediaset}/proto/https' + search: "https://search.api.bbci.co.uk/formula/iplayer-ibl-root"