From 1e58c9359f97dae67c76506e058d689941a9f2c3 Mon Sep 17 00:00:00 2001 From: TPD94 <> Date: Mon, 22 Apr 2024 13:52:16 -0400 Subject: [PATCH] Updated @stabbedbybrick service files --- services/ALL4/__init__.py | 381 -------------------------------------- services/ALL4/config.yaml | 27 --- services/CTV/__init__.py | 364 ------------------------------------ services/CTV/config.yaml | 6 - services/ROKU/__init__.py | 249 ------------------------- services/ROKU/config.yaml | 5 - services/iP/__init__.py | 358 ----------------------------------- services/iP/config.yaml | 11 -- 8 files changed, 1401 deletions(-) delete mode 100644 services/ALL4/__init__.py delete mode 100644 services/ALL4/config.yaml delete mode 100644 services/CTV/__init__.py delete mode 100644 services/CTV/config.yaml delete mode 100644 services/ROKU/__init__.py delete mode 100644 services/ROKU/config.yaml delete mode 100644 services/iP/__init__.py delete mode 100644 services/iP/config.yaml diff --git a/services/ALL4/__init__.py b/services/ALL4/__init__.py deleted file mode 100644 index 3972726..0000000 --- a/services/ALL4/__init__.py +++ /dev/null @@ -1,381 +0,0 @@ -from __future__ import annotations - -import base64 -import hashlib -import json -import sys -import re -from collections.abc import Generator -from datetime import datetime, timezone -from http.cookiejar import MozillaCookieJar -from typing import Any, Optional, Union - -import click -from click import Context -from Crypto.Util.Padding import unpad -from Cryptodome.Cipher import AES -from pywidevine.cdm import Cdm as WidevineCdm - -from devine.core.credential import Credential -from devine.core.manifests.dash import DASH -from devine.core.search_result import SearchResult -from devine.core.service import Service -from devine.core.titles import Episode, Movie, Movies, Series -from devine.core.tracks import Chapter, Subtitle, Tracks - - -class ALL4(Service): - """ - Service code for Channel 4's All4 streaming service (https://channel4.com). - - \b - Author: stabbedbybrick - Authorization: Credentials - Robustness: - L3: 1080p, AAC2.0 - - \b - Tips: - - Use complete title URL or slug as input: - https://www.channel4.com/programmes/taskmaster OR taskmaster - - Use on demand URL for directly downloading episodes: - https://www.channel4.com/programmes/taskmaster/on-demand/75588-002 - - Both android and web/pc endpoints are checked for quality profiles. - If android is missing 1080p, it automatically falls back to web. - """ - - GEOFENCE = ("gb", "ie") - TITLE_RE = r"^(?:https?://(?:www\.)?channel4\.com/programmes/)?(?P[a-z0-9-]+)(?:/on-demand/(?P[0-9-]+))?" - - @staticmethod - @click.command(name="ALL4", short_help="https://channel4.com", help=__doc__) - @click.argument("title", type=str) - @click.pass_context - def cli(ctx: Context, **kwargs: Any) -> ALL4: - return ALL4(ctx, **kwargs) - - def __init__(self, ctx: Context, title: str): - self.title = title - super().__init__(ctx) - - self.authorization: str - self.asset_id: int - self.license_token: str - self.manifest: str - - self.session.headers.update( - { - "X-C4-Platform-Name": self.config["device"]["platform_name"], - "X-C4-Device-Type": self.config["device"]["device_type"], - "X-C4-Device-Name": self.config["device"]["device_name"], - "X-C4-App-Version": self.config["device"]["app_version"], - "X-C4-Optimizely-Datafile": self.config["device"]["optimizely_datafile"], - } - ) - - def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: - super().authenticate(cookies, credential) - if not credential: - raise EnvironmentError("Service requires Credentials for Authentication.") - - cache = self.cache.get(f"tokens_{credential.sha1}") - - if cache and not cache.expired: - # cached - self.log.info(" + Using cached Tokens...") - tokens = cache.data - elif cache and cache.expired: - # expired, refresh - self.log.info("Refreshing cached Tokens") - r = self.session.post( - self.config["endpoints"]["login"], - headers={"authorization": f"Basic {self.config['android']['auth']}"}, - data={ - "grant_type": "refresh_token", - "username": credential.username, - "password": credential.password, - "refresh_token": cache.data["refreshToken"], - }, - ) - try: - res = r.json() - except json.JSONDecodeError: - raise ValueError(f"Failed to refresh tokens: {r.text}") - - if "error" in res: - self.log.error(f"Failed to refresh tokens: {res['errorMessage']}") - sys.exit(1) - - tokens = res - self.log.info(" + Refreshed") - else: - # new - headers = {"authorization": f"Basic {self.config['android']['auth']}"} - data = { - "grant_type": "password", - "username": credential.username, - "password": credential.password, - } - r = self.session.post(self.config["endpoints"]["login"], headers=headers, data=data) - try: - res = r.json() - except json.JSONDecodeError: - raise ValueError(f"Failed to log in: {r.text}") - - if "error" in res: - self.log.error(f"Failed to log in: {res['errorMessage']}") - sys.exit(1) - - tokens = res - self.log.info(" + Acquired tokens...") - - cache.set(tokens, expiration=tokens["expiresIn"]) - - self.authorization = f"Bearer {tokens['accessToken']}" - - def search(self) -> Generator[SearchResult, None, None]: - params = { - "expand": "default", - "q": self.title, - "limit": "100", - "offset": "0", - } - - r = self.session.get(self.config["endpoints"]["search"], params=params) - r.raise_for_status() - - results = r.json() - if isinstance(results["results"], list): - for result in results["results"]: - yield SearchResult( - id_=result["brand"].get("websafeTitle"), - title=result["brand"].get("title"), - description=result["brand"].get("description"), - label=result.get("label"), - url=result["brand"].get("href"), - ) - - def get_titles(self) -> Union[Movies, Series]: - title, on_demand = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "vid")) - - r = self.session.get( - self.config["endpoints"]["title"].format(title=title), - params={"client": "android-mod", "deviceGroup": "mobile", "include": "extended-restart"}, - headers={"Authorization": self.authorization}, - ) - if not r.ok: - self.log.error(r.text) - sys.exit(1) - - data = r.json() - - if on_demand is not None: - return Series( - [ - Episode( - id_=episode["programmeId"], - service=self.__class__, - title=data["brand"]["title"], - season=episode["seriesNumber"], - number=episode["episodeNumber"], - name=episode["originalTitle"], - language="en", - data=episode["assetInfo"].get("streaming"), - ) - for episode in data["brand"]["episodes"] - if episode.get("assetInfo") and episode["programmeId"] == on_demand - ] - ) - - elif data["brand"]["programmeType"] == "FM": - return Movies( - [ - Movie( - id_=movie["programmeId"], - service=self.__class__, - name=data["brand"]["title"], - year=int(data["brand"]["summary"].split(" ")[0].strip().strip("()")), - language="en", - data=movie["assetInfo"].get("streaming"), - ) - for movie in data["brand"]["episodes"] - ] - ) - else: - return Series( - [ - Episode( - id_=episode["programmeId"], - service=self.__class__, - title=data["brand"]["title"], - season=episode["seriesNumber"], - number=episode["episodeNumber"], - name=episode["originalTitle"], - language="en", - data=episode["assetInfo"].get("streaming"), - ) - for episode in data["brand"]["episodes"] - if episode.get("assetInfo") - ] - ) - - def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: - android_assets: tuple = self.android_playlist(title.id) - web_assets: tuple = self.web_playlist(title.id) - self.manifest, self.license_token, subtitle, data = self.sort_assets(android_assets, web_assets) - self.asset_id = int(title.data["assetId"]) - - tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language) - tracks.videos[0].data = data - - if subtitle is not None: - tracks.add( - Subtitle( - id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6], - url=subtitle, - codec=Subtitle.Codec.from_mime(subtitle[-3:]), - language=title.language, - is_original_lang=True, - forced=False, - sdh=True, - ) - ) - - for track in tracks.audio: - role = track.data["dash"]["representation"].find("Role") - if role is not None and role.get("value") in ["description", "alternative", "alternate"]: - track.descriptive = True - - return tracks - - def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: - track = title.tracks.videos[0] - - chapters = [ - Chapter( - name=f"Chapter {i + 1:02}", - timestamp=datetime.fromtimestamp((ms / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3], - ) - for i, ms in enumerate(x["breakOffset"] for x in track.data["adverts"]["breaks"]) - ] - - if track.data.get("endCredits", {}).get("squeezeIn"): - chapters.append( - Chapter( - name="Credits", - timestamp=datetime.fromtimestamp( - (track.data["endCredits"]["squeezeIn"] / 1000), tz=timezone.utc - ).strftime("%H:%M:%S.%f")[:-3], - ) - ) - - return chapters - - def get_widevine_service_certificate(self, **_: Any) -> str: - return WidevineCdm.common_privacy_cert - - def get_widevine_license(self, challenge: bytes, **_: Any) -> str: - payload = { - "message": base64.b64encode(challenge).decode("utf8"), - "token": self.license_token, - "request_id": self.asset_id, - "video": {"type": "ondemand", "url": self.manifest}, - } - - r = self.session.post(self.config["endpoints"]["license"], json=payload) - if not r.ok: - raise ConnectionError(f"License request failed: {r.json()['status']['type']}") - - return r.json()["license"] - - # Service specific functions - - def sort_assets(self, android_assets: tuple, web_assets: tuple) -> tuple: - if android_assets is not None: - try: - a_manifest, a_token, a_subtitle, data = android_assets - android_tracks = DASH.from_url(a_manifest, self.session).to_tracks("en") - android_heights = sorted([int(track.height) for track in android_tracks.videos], reverse=True) - except Exception: - android_heights = None - - if web_assets is not None: - try: - b_manifest, b_token, b_subtitle, data = web_assets - session = self.session - session.headers.update(self.config["headers"]) - web_tracks = DASH.from_url(b_manifest, session).to_tracks("en") - web_heights = sorted([int(track.height) for track in web_tracks.videos], reverse=True) - except Exception: - web_heights = None - - if not android_heights and not web_heights: - self.log.error("Failed to request manifest data. If you're behind a VPN/proxy, you might be blocked") - sys.exit(1) - - if not android_heights or android_heights[0] < 1080: - lic_token = self.decrypt_token(b_token, client="WEB") - return b_manifest, lic_token, b_subtitle, data - else: - lic_token = self.decrypt_token(a_token, client="ANDROID") - return a_manifest, lic_token, a_subtitle, data - - def android_playlist(self, video_id: str) -> tuple: - url = self.config["android"]["vod"].format(video_id=video_id) - headers = {"authorization": self.authorization} - - r = self.session.get(url=url, headers=headers) - if not r.ok: - self.log.warning("Request for Android endpoint returned %s", r) - return - - data = json.loads(r.content) - manifest = data["videoProfiles"][0]["streams"][0]["uri"] - token = data["videoProfiles"][0]["streams"][0]["token"] - subtitle = next( - (x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")), - None, - ) - - return manifest, token, subtitle, data - - def web_playlist(self, video_id: str) -> tuple: - url = self.config["web"]["vod"].format(programmeId=video_id) - r = self.session.get(url, headers=self.config["headers"]) - if not r.ok: - self.log.warning("Request for WEB endpoint returned %s", r) - return - - data = json.loads(r.content) - - for item in data["videoProfiles"]: - if item["name"] == "dashwv-dyn-stream-1": - token = item["streams"][0]["token"] - manifest = item["streams"][0]["uri"] - - subtitle = next( - (x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")), - None, - ) - - return manifest, token, subtitle, data - - def decrypt_token(self, token: str, client: str) -> tuple: - if client == "ANDROID": - key = self.config["android"]["key"] - iv = self.config["android"]["iv"] - - if client == "WEB": - key = self.config["web"]["key"] - iv = self.config["web"]["iv"] - - if isinstance(token, str): - token = base64.b64decode(token) - cipher = AES.new( - key=base64.b64decode(key), - iv=base64.b64decode(iv), - mode=AES.MODE_CBC, - ) - data = unpad(cipher.decrypt(token), AES.block_size) - dec_token = data.decode().split("|")[1] - return dec_token.strip() diff --git a/services/ALL4/config.yaml b/services/ALL4/config.yaml deleted file mode 100644 index 1a4cb73..0000000 --- a/services/ALL4/config.yaml +++ /dev/null @@ -1,27 +0,0 @@ -headers: - Accept-Language: en-US,en;q=0.8 - User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36 - -endpoints: - login: https://api.channel4.com/online/v2/auth/token - title: https://api.channel4.com/online/v1/views/content-hubs/{title}.json - license: https://c4.eme.lp.aws.redbeemedia.com/wvlicenceproxy-service/widevine/acquire - search: https://all4nav.channel4.com/v1/api/search - -android: - key: QVlESUQ4U0RGQlA0TThESA==" - iv: MURDRDAzODNES0RGU0w4Mg==" - auth: MzZVVUN0OThWTVF2QkFnUTI3QXU4ekdIbDMxTjlMUTE6Sllzd3lIdkdlNjJWbGlrVw== - vod: https://api.channel4.com/online/v1/vod/stream/{video_id}?client=android-mod - -web: - key: bjljTGllWWtxd3pOQ3F2aQ== - iv: b2R6Y1UzV2RVaVhMdWNWZA== - vod: https://www.channel4.com/vod/stream/{programmeId} - -device: - platform_name: android - device_type: mobile - device_name: "Sony C6903 (C6903)" - app_version: "android_app:9.4.2" - optimizely_datafile: "2908" diff --git a/services/CTV/__init__.py b/services/CTV/__init__.py deleted file mode 100644 index 7f3d5ac..0000000 --- a/services/CTV/__init__.py +++ /dev/null @@ -1,364 +0,0 @@ -from __future__ import annotations - -import hashlib -import json -import re -import sys -from collections.abc import Generator -from concurrent.futures import ThreadPoolExecutor, as_completed -from http.cookiejar import CookieJar -from typing import Any, Optional - -import click -from pywidevine.cdm import Cdm as WidevineCdm - -from devine.core.credential import Credential -from devine.core.manifests import DASH -from devine.core.search_result import SearchResult -from devine.core.service import Service -from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from devine.core.tracks import Chapter, Subtitle, Tracks - - -class CTV(Service): - """ - Service code for CTV.ca (https://www.ctv.ca) - - \b - Author: stabbedbybrick - Authorization: Credentials for subscription, none for freely available titles - Robustness: - Widevine: - L3: 1080p, DD5.1 - - \b - Tips: - - Input can be either complete title/episode URL or just the path: - /shows/young-sheldon - /shows/young-sheldon/baptists-catholics-and-an-attempted-drowning-s7e6 - /movies/war-for-the-planet-of-the-apes - """ - - TITLE_RE = r"^(?:https?://(?:www\.)?ctv\.ca(?:/[a-z]{2})?)?/(?Pmovies|shows)/(?P[a-z0-9-]+)(?:/(?P[a-z0-9-]+))?$" - GEOFENCE = ("ca",) - - @staticmethod - @click.command(name="CTV", short_help="https://www.ctv.ca", help=__doc__) - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return CTV(ctx, **kwargs) - - def __init__(self, ctx, title): - self.title = title - super().__init__(ctx) - - self.authorization: str = None - - self.api = self.config["endpoints"]["api"] - self.license_url = self.config["endpoints"]["license"] - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - super().authenticate(cookies, credential) - if credential: - cache = self.cache.get(f"tokens_{credential.sha1}") - - if cache and not cache.expired: - # cached - self.log.info(" + Using cached Tokens...") - tokens = cache.data - elif cache and cache.expired: - # expired, refresh - self.log.info("Refreshing cached Tokens") - r = self.session.post( - self.config["endpoints"]["login"], - headers={"authorization": f"Basic {self.config['endpoints']['auth']}"}, - data={ - "grant_type": "refresh_token", - "username": credential.username, - "password": credential.password, - "refresh_token": cache.data["refresh_token"], - }, - ) - try: - res = r.json() - except json.JSONDecodeError: - raise ValueError(f"Failed to refresh tokens: {r.text}") - - tokens = res - self.log.info(" + Refreshed") - else: - # new - r = self.session.post( - self.config["endpoints"]["login"], - headers={"authorization": f"Basic {self.config['endpoints']['auth']}"}, - data={ - "grant_type": "password", - "username": credential.username, - "password": credential.password, - }, - ) - try: - res = r.json() - except json.JSONDecodeError: - raise ValueError(f"Failed to log in: {r.text}") - - tokens = res - self.log.info(" + Acquired tokens...") - - cache.set(tokens, expiration=tokens["expires_in"]) - - self.authorization = f"Bearer {tokens['access_token']}" - - def search(self) -> Generator[SearchResult, None, None]: - payload = { - "operationName": "searchMedia", - "variables": {"title": f"{self.title}"}, - "query": """ - query searchMedia($title: String!) {searchMedia(titleMatches: $title) { - ... on Medias {page {items {title\npath}}}}}, """, - } - - r = self.session.post(self.config["endpoints"]["search"], json=payload) - if r.status_code != 200: - self.log.error(r.text) - return - - for result in r.json()["data"]["searchMedia"]["page"]["items"]: - yield SearchResult( - id_=result.get("path"), - title=result.get("title"), - description=result.get("description"), - label=result["path"].split("/")[1], - url="https://www.ctv.ca" + result.get("path"), - ) - - def get_titles(self) -> Titles_T: - title, kind, episode = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "type", "episode")) - title_path = self.get_title_id(kind, title, episode) - - if episode is not None: - data = self.get_episode_data(title_path) - return Series( - [ - Episode( - id_=data["axisId"], - service=self.__class__, - title=data["axisMedia"]["title"], - season=int(data["seasonNumber"]), - number=int(data["episodeNumber"]), - name=data["title"], - year=data.get("firstAirYear"), - language=data["axisPlaybackLanguages"][0].get("language", "en"), - data=data["axisPlaybackLanguages"][0]["destinationCode"], - ) - ] - ) - - if kind == "shows": - data = self.get_series_data(title_path) - titles = self.fetch_episodes(data["contentData"]["seasons"]) - return Series( - [ - Episode( - id_=episode["axisId"], - service=self.__class__, - title=data["contentData"]["title"], - season=int(episode["seasonNumber"]), - number=int(episode["episodeNumber"]), - name=episode["title"], - year=data["contentData"]["firstAirYear"], - language=episode["axisPlaybackLanguages"][0].get("language", "en"), - data=episode["axisPlaybackLanguages"][0]["destinationCode"], - ) - for episode in titles - ] - ) - - if kind == "movies": - data = self.get_movie_data(title_path) - return Movies( - [ - Movie( - id_=data["contentData"]["firstPlayableContent"]["axisId"], - service=self.__class__, - name=data["contentData"]["title"], - year=data["contentData"]["firstAirYear"], - language=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0].get( - "language", "en" - ), - data=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0]["destinationCode"], - ) - ] - ) - - def get_tracks(self, title: Title_T) -> Tracks: - base = f"https://capi.9c9media.com/destinations/{title.data}/platforms/desktop" - - r = self.session.get(f"{base}/contents/{title.id}/contentPackages") - r.raise_for_status() - - pkg_id = r.json()["Items"][0]["Id"] - base += "/playback/contents" - - manifest = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.mpd?filter=25" - subtitle = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.vtt" - - if self.authorization: - self.session.headers.update({"authorization": self.authorization}) - - tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language) - tracks.add( - Subtitle( - id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6], - url=subtitle, - codec=Subtitle.Codec.from_mime(subtitle[-3:]), - language=title.language, - is_original_lang=True, - forced=False, - sdh=True, - ) - ) - return tracks - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] # Chapters not available - - def get_widevine_service_certificate(self, **_: Any) -> str: - return WidevineCdm.common_privacy_cert - - def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes: - r = self.session.post(url=self.license_url, data=challenge) - if r.status_code != 200: - self.log.error(r.text) - sys.exit(1) - return r.content - - # service specific functions - - def get_title_id(self, kind: str, title: tuple, episode: str) -> str: - if episode is not None: - title += f"/{episode}" - payload = { - "operationName": "resolvePath", - "variables": {"path": f"{kind}/{title}"}, - "query": """ - query resolvePath($path: String!) { - resolvedPath(path: $path) { - lastSegment { - content { - id - } - } - } - } - """, - } - r = self.session.post(self.api, json=payload).json() - return r["data"]["resolvedPath"]["lastSegment"]["content"]["id"] - - def get_series_data(self, title_id: str) -> json: - payload = { - "operationName": "axisMedia", - "variables": {"axisMediaId": f"{title_id}"}, - "query": """ - query axisMedia($axisMediaId: ID!) { - contentData: axisMedia(id: $axisMediaId) { - title - description - originalSpokenLanguage - mediaType - firstAirYear - seasons { - title - id - seasonNumber - } - } - } - """, - } - - return self.session.post(self.api, json=payload).json()["data"] - - def get_movie_data(self, title_id: str) -> json: - payload = { - "operationName": "axisMedia", - "variables": {"axisMediaId": f"{title_id}"}, - "query": """ - query axisMedia($axisMediaId: ID!) { - contentData: axisMedia(id: $axisMediaId) { - title - description - firstAirYear - firstPlayableContent { - axisId - axisPlaybackLanguages { - destinationCode - } - } - } - } - """, - } - - return self.session.post(self.api, json=payload).json()["data"] - - def get_episode_data(self, title_path: str) -> json: - payload = { - "operationName": "axisContent", - "variables": {"id": f"{title_path}"}, - "query": """ - query axisContent($id: ID!) { - axisContent(id: $id) { - axisId - title - description - contentType - seasonNumber - episodeNumber - axisMedia { - title - } - axisPlaybackLanguages { - language - destinationCode - } - } - } - """, - } - return self.session.post(self.api, json=payload).json()["data"]["axisContent"] - - def fetch_episode(self, episode: str) -> json: - payload = { - "operationName": "season", - "variables": {"seasonId": f"{episode}"}, - "query": """ - query season($seasonId: ID!) { - axisSeason(id: $seasonId) { - episodes { - axisId - title - description - contentType - seasonNumber - episodeNumber - axisPlaybackLanguages { - language - destinationCode - } - } - } - } - """, - } - response = self.session.post(self.api, json=payload) - return response.json()["data"]["axisSeason"]["episodes"] - - def fetch_episodes(self, data: dict) -> list: - """TODO: Switch to async once https proxies are fully supported""" - with ThreadPoolExecutor(max_workers=10) as executor: - tasks = [executor.submit(self.fetch_episode, x["id"]) for x in data] - titles = [future.result() for future in as_completed(tasks)] - return [episode for episodes in titles for episode in episodes] diff --git a/services/CTV/config.yaml b/services/CTV/config.yaml deleted file mode 100644 index 6f8f89e..0000000 --- a/services/CTV/config.yaml +++ /dev/null @@ -1,6 +0,0 @@ -endpoints: - login: https://account.bellmedia.ca/api/login/v2.1 - auth: Y3R2LXdlYjpkZWZhdWx0 - api: https://api.ctv.ca/space-graphql/graphql - license: https://license.9c9media.ca/widevine - search: https://www.ctv.ca/space-graphql/apq/graphql \ No newline at end of file diff --git a/services/ROKU/__init__.py b/services/ROKU/__init__.py deleted file mode 100644 index cc51f87..0000000 --- a/services/ROKU/__init__.py +++ /dev/null @@ -1,249 +0,0 @@ -import json -import re -import sys -from collections.abc import Generator -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timezone -from http.cookiejar import CookieJar -from typing import Any, Optional -from urllib.parse import unquote, urlparse - -import click -import requests - -from devine.core.credential import Credential -from devine.core.manifests import DASH -from devine.core.search_result import SearchResult -from devine.core.service import Service -from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from devine.core.tracks import Chapter, Tracks - - -class ROKU(Service): - """ - Service code for The Roku Channel (https://therokuchannel.roku.com) - - \b - Author: stabbedbybrick - Authorization: Cookies (optional) - Robustness: - Widevine: - L3: 1080p, DD5.1 - - \b - Tips: - - Use complete title/episode URL or id as input: - https://therokuchannel.roku.com/details/e05fc677ab9c5d5e8332f123770697b9/paddington - OR - e05fc677ab9c5d5e8332f123770697b9 - - Supports movies, series, and single episodes - - Search is geofenced - """ - - GEOFENCE = ("us",) - TITLE_RE = r"^(?:https?://(?:www.)?therokuchannel.roku.com/(?:details|watch)/)?(?P[a-z0-9-]+)" - - @staticmethod - @click.command(name="ROKU", short_help="https://therokuchannel.roku.com", help=__doc__) - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return ROKU(ctx, **kwargs) - - def __init__(self, ctx, title): - self.title = re.match(self.TITLE_RE, title).group("id") - super().__init__(ctx) - - self.license: str - - def authenticate( - self, - cookies: Optional[CookieJar] = None, - credential: Optional[Credential] = None, - ) -> None: - super().authenticate(cookies, credential) - if cookies is not None: - self.session.cookies.update(cookies) - - def search(self) -> Generator[SearchResult, None, None]: - token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"] - - headers = {"csrf-token": token} - payload = {"query": self.title} - - r = self.session.post(self.config["endpoints"]["search"], headers=headers, json=payload) - r.raise_for_status() - - results = r.json() - for result in results["view"]: - if result["content"]["type"] not in ["zone", "provider"]: - _id = result["content"].get("meta", {}).get("id") - _desc = result["content"].get("descriptions", {}) - - label = f'{result["content"].get("type")} ({result["content"].get("releaseYear")})' - if result["content"].get("viewOptions"): - label += f' ({result["content"]["viewOptions"][0].get("priceDisplay")})' - - title = re.sub(r"^-|-$", "", re.sub(r"\W+", "-", result["content"].get("title").lower())) - - yield SearchResult( - id_=_id, - title=title, - description=_desc["250"]["text"] if _desc.get("250") else None, - label=label, - url=f"https://therokuchannel.roku.com/details/{_id}/{title}", - ) - - def get_titles(self) -> Titles_T: - data = self.session.get(self.config["endpoints"]["content"] + self.title).json() - if not data["isAvailable"]: - self.log.error("This title is temporarily unavailable or expired") - sys.exit(1) - - if data["type"] == "movie": - return Movies( - [ - Movie( - id_=data["meta"]["id"], - service=self.__class__, - name=data["title"], - year=data["releaseYear"], - language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"), - data=None, - ) - ] - ) - - elif data["type"] == "series": - episodes = self.fetch_episodes(data) - return Series( - [ - Episode( - id_=episode["meta"]["id"], - service=self.__class__, - title=data["title"], - season=int(episode["seasonNumber"]), - number=int(episode["episodeNumber"]), - name=episode["title"], - year=data["releaseYear"], - language=episode["viewOptions"][0]["media"].get("originalAudioLanguage", "en"), - data=None, - ) - for episode in episodes - ] - ) - - elif data["type"] == "episode": - return Series( - [ - Episode( - id_=data["meta"]["id"], - service=self.__class__, - title=data["title"], - season=int(data["seasonNumber"]), - number=int(data["episodeNumber"]), - name=data["title"], - year=data["releaseYear"], - language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"), - data=None, - ) - ] - ) - - def get_tracks(self, title: Title_T) -> Tracks: - token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"] - - headers = { - "csrf-token": token, - } - payload = { - "rokuId": title.id, - "mediaFormat": "mpeg-dash", - "drmType": "widevine", - "quality": "fhd", - "providerId": "rokuavod", - } - - r = self.session.post( - self.config["endpoints"]["vod"], - headers=headers, - json=payload, - ) - r.raise_for_status() - - videos = r.json()["playbackMedia"]["videos"] - self.license = next( - ( - x["drmParams"]["licenseServerURL"] - for x in videos - if x.get("drmParams") and x["drmParams"]["keySystem"] == "Widevine" - ), - None, - ) - - url = next((x["url"] for x in videos if x["streamFormat"] == "dash"), None) - if url and "origin" in urlparse(url).query: - url = unquote(urlparse(url).query.split("=")[1]).split("?")[0] - - tracks = DASH.from_url(url=url).to_tracks(language=title.language) - tracks.videos[0].data["playbackMedia"] = r.json()["playbackMedia"] - - for track in tracks.audio: - label = track.data["dash"]["adaptation_set"].find("Label") - if label is not None and "description" in label.text: - track.descriptive = True - - for track in tracks.subtitles: - label = track.data["dash"]["adaptation_set"].find("Label") - if label is not None and "caption" in label.text: - track.cc = True - - return tracks - - def get_chapters(self, title: Title_T) -> list[Chapter]: - track = title.tracks.videos[0] - - chapters = [] - if track.data.get("playbackMedia", {}).get("adBreaks"): - timestamps = sorted(track.data["playbackMedia"]["adBreaks"]) - chapters = [Chapter(name=f"Chapter {i + 1:02}", timestamp=ad.split(".")[0]) for i, ad in enumerate(timestamps)] - - if track.data.get("playbackMedia", {}).get("creditCuePoints"): - start = next(( - x.get("start") for x in track.data["playbackMedia"]["creditCuePoints"] if x.get("start") != 0), None) - if start: - chapters.append( - Chapter( - name="Credits", - timestamp=datetime.fromtimestamp((start / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3], - ) - ) - - return chapters - - def get_widevine_service_certificate(self, **_: Any) -> str: - return # WidevineCdm.common_privacy_cert - - def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes: - r = self.session.post(url=self.license, data=challenge) - if r.status_code != 200: - self.log.error(r.text) - sys.exit(1) - return r.content - - # service specific functions - - def fetch_episode(self, episode: dict) -> json: - try: - r = self.session.get(self.config["endpoints"]["content"] + episode["meta"]["id"]) - r.raise_for_status() - return r.json() - except requests.exceptions.RequestException as e: - self.log.error(f"An error occurred while fetching episode {episode['meta']['id']}: {e}") - return None - - def fetch_episodes(self, data: dict) -> list: - """TODO: Switch to async once https proxies are fully supported""" - with ThreadPoolExecutor(max_workers=10) as executor: - tasks = list(executor.map(self.fetch_episode, data["episodes"])) - return [task for task in tasks if task is not None] diff --git a/services/ROKU/config.yaml b/services/ROKU/config.yaml deleted file mode 100644 index 4de6fa1..0000000 --- a/services/ROKU/config.yaml +++ /dev/null @@ -1,5 +0,0 @@ -endpoints: - content: https://therokuchannel.roku.com/api/v2/homescreen/content/https%3A%2F%2Fcontent.sr.roku.com%2Fcontent%2Fv1%2Froku-trc%2F - vod: https://therokuchannel.roku.com/api/v3/playback - token: https://therokuchannel.roku.com/api/v1/csrf - search: https://therokuchannel.roku.com/api/v1/search \ No newline at end of file diff --git a/services/iP/__init__.py b/services/iP/__init__.py deleted file mode 100644 index f376470..0000000 --- a/services/iP/__init__.py +++ /dev/null @@ -1,358 +0,0 @@ -from __future__ import annotations - -import hashlib -import json -import re -import sys -import warnings -from collections.abc import Generator -from typing import Any, Union - -import click -from bs4 import XMLParsedAsHTMLWarning -from click import Context -from devine.core.manifests import DASH, HLS -from devine.core.search_result import SearchResult -from devine.core.service import Service -from devine.core.titles import Episode, Movie, Movies, Series -from devine.core.tracks import Audio, Chapter, Subtitle, Track, Tracks, Video -from devine.core.utils.collections import as_list -from devine.core.utils.sslciphers import SSLCiphers - -warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) - - -class iP(Service): - """ - \b - Service code for the BBC iPlayer streaming service (https://www.bbc.co.uk/iplayer). - Base code from VT, credit to original author - - \b - Author: stabbedbybrick - Authorization: None - Security: None - - \b - Tips: - - Use full title URL as input for best results. - - Use --list-titles before anything, iPlayer's listings are often messed up. - \b - - An SSL certificate (PEM) is required for accessing the UHD endpoint. - Specify its path using the service configuration data in the root config: - \b - services: - iP: - cert: path/to/cert - \b - - Use -v H.265 to request UHD tracks - - See which titles are available in UHD: - https://www.bbc.co.uk/iplayer/help/questions/programme-availability/uhd-content - """ - - ALIASES = ("bbciplayer", "bbc", "iplayer") - GEOFENCE = ("gb",) - TITLE_RE = r"^(?:https?://(?:www\.)?bbc\.co\.uk/(?:iplayer/(?Pepisode|episodes)/|programmes/))?(?P[a-z0-9]+)(?:/.*)?$" - - @staticmethod - @click.command(name="iP", short_help="https://www.bbc.co.uk/iplayer", help=__doc__) - @click.argument("title", type=str) - @click.pass_context - def cli(ctx: Context, **kwargs: Any) -> iP: - return iP(ctx, **kwargs) - - def __init__(self, ctx: Context, title: str): - self.title = title - self.vcodec = ctx.parent.params.get("vcodec") - super().__init__(ctx) - - if self.vcodec == "H.265" and not self.config.get("cert"): - self.log.error("H.265 cannot be selected without a certificate") - sys.exit(1) - - quality = ctx.parent.params.get("quality") - if quality and quality[0] > 1080 and self.vcodec != "H.265" and self.config.get("cert"): - self.log.info(" + Switched video codec to H.265 to be able to get 2160p video track") - self.vcodec = "H.265" - - def search(self) -> Generator[SearchResult, None, None]: - params = { - "q": self.title, - "apikey": self.config["api_key"], - } - - r = self.session.get(self.config["endpoints"]["search"], params=params) - r.raise_for_status() - - results = r.json() - for result in results["results"]: - yield SearchResult( - id_=result.get("uri").split(":")[-1], - title=result.get("title"), - description=result.get("synopsis"), - label="series" if result.get("type", "") == "brand" else result.get("type"), - url=result.get("url"), - ) - - def get_titles(self) -> Union[Movies, Series]: - kind, pid = (re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "id")) - if not pid: - self.log.error("Unable to parse title ID - is the URL or id correct?") - sys.exit(1) - - data = self.get_data(pid, slice_id=None) - if data is None and kind == "episode": - return self.get_single_episode(self.title) - elif data is None: - self.log.error("Metadata was not found - if %s is an episode, use full URL as input", pid) - sys.exit(1) - - if "Film" in data["labels"]["category"]: - return Movies( - [ - Movie( - id_=data["id"], - name=data["title"]["default"], - year=None, # TODO - service=self.__class__, - language="en", - data=data, - ) - ] - ) - else: - seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]] - episodes = [self.create_episode(episode, data) for season in seasons for episode in season["entities"]["results"]] - return Series(episodes) - - def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: - r = self.session.get(url=self.config["endpoints"]["playlist"].format(pid=title.id)) - if not r.ok: - self.log.error(r.text) - sys.exit(1) - - versions = r.json().get("allAvailableVersions") - if not versions: - r = self.session.get(self.config["base_url"].format(type="episode", pid=title.id)) - redux = re.search("window.__IPLAYER_REDUX_STATE__ = (.*?);", r.text).group(1) - data = json.loads(redux) - versions = [ - {"pid": x.get("id") for x in data["versions"] if not x.get("kind") == "audio-described"} - ] - - quality = [ - connection.get("height") - for i in ( - self.check_all_versions(version) - for version in (x.get("pid") for x in versions) - ) - for connection in i - if connection.get("height") - ] - max_quality = max((h for h in quality if h < "1080"), default=None) - - media = next((i for i in (self.check_all_versions(version) - for version in (x.get("pid") for x in versions)) - if any(connection.get("height") == max_quality for connection in i)), None) - - connection = {} - for video in [x for x in media if x["kind"] == "video"]: - connections = sorted(video["connection"], key=lambda x: x["priority"]) - if self.vcodec == "H.265": - connection = connections[0] - else: - connection = next( - x for x in connections if x["supplier"] == "mf_akamai" and x["transferFormat"] == "dash" - ) - - break - - if not self.vcodec == "H.265": - if connection["transferFormat"] == "dash": - connection["href"] = "/".join( - connection["href"].replace("dash", "hls").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"] - ) - connection["transferFormat"] = "hls" - elif connection["transferFormat"] == "hls": - connection["href"] = "/".join( - connection["href"].replace(".hlsv2.ism", "").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"] - ) - - if connection["transferFormat"] != "hls": - raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}") - - if connection["transferFormat"] == "dash": - tracks = DASH.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language) - elif connection["transferFormat"] == "hls": - tracks = HLS.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language) - else: - raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}") - - for video in tracks.videos: - # TODO: add HLG to UHD tracks - - if any(re.search(r"-audio_\w+=\d+", x) for x in as_list(video.url)): - # create audio stream from the video stream - audio_url = re.sub(r"-video=\d+", "", as_list(video.url)[0]) - audio = Audio( - # use audio_url not video url, as to ignore video bitrate in ID - id_=hashlib.md5(audio_url.encode()).hexdigest()[0:7], - url=audio_url, - codec=Audio.Codec.from_codecs("mp4a"), - language=[v.language for v in video.data["hls"]["playlist"].media][0], - bitrate=int(self.find(r"-audio_\w+=(\d+)", as_list(video.url)[0]) or 0), - channels=[v.channels for v in video.data["hls"]["playlist"].media][0], - descriptive=False, # Not available - descriptor=Track.Descriptor.HLS, - ) - if not tracks.exists(by_id=audio.id): - # some video streams use the same audio, so natural dupes exist - tracks.add(audio) - # remove audio from the video stream - video.url = [re.sub(r"-audio_\w+=\d+", "", x) for x in as_list(video.url)][0] - video.codec = Video.Codec.from_codecs(video.data["hls"]["playlist"].stream_info.codecs) - video.bitrate = int(self.find(r"-video=(\d+)", as_list(video.url)[0]) or 0) - - for caption in [x for x in media if x["kind"] == "captions"]: - connection = sorted(caption["connection"], key=lambda x: x["priority"])[0] - tracks.add( - Subtitle( - id_=hashlib.md5(connection["href"].encode()).hexdigest()[0:6], - url=connection["href"], - codec=Subtitle.Codec.from_codecs("ttml"), - language=title.language, - is_original_lang=True, - forced=False, - sdh=True, - ) - ) - break - - return tracks - - def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: - return [] - - def get_widevine_service_certificate(self, **_: Any) -> str: - return None - - def get_widevine_license(self, challenge: bytes, **_: Any) -> str: - return None - - # service specific functions - - def get_data(self, pid: str, slice_id: str) -> dict: - json_data = { - "id": "9fd1636abe711717c2baf00cebb668de", - "variables": { - "id": pid, - "perPage": 200, - "page": 1, - "sliceId": slice_id if slice_id else None, - }, - } - - r = self.session.post(self.config["endpoints"]["metadata"], json=json_data) - r.raise_for_status() - - return r.json()["data"]["programme"] - - def check_all_versions(self, vpid: str) -> list: - if self.config.get("cert"): - url = self.config["endpoints"]["manifest_"].format( - vpid=vpid, - mediaset="iptv-uhd" if self.vcodec == "H.265" else "iptv-all", - ) - - session = self.session - session.mount("https://", SSLCiphers()) - session.mount("http://", SSLCiphers()) - manifest = session.get( - url, headers={"user-agent": self.config["user_agent"]}, cert=self.config["cert"] - ).json() - - if "result" in manifest: - return {} - - else: - url = self.config["endpoints"]["manifest"].format( - vpid=vpid, - mediaset="iptv-all", - ) - manifest = self.session.get(url).json() - - if "result" in manifest: - return {} - - return manifest["media"] - - def create_episode(self, episode: dict, data: dict) -> Episode: - title = episode["episode"]["title"]["default"].strip() - subtitle = episode["episode"]["subtitle"] - series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle.get("default") or "") - season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0)) - if season_num == 0 and not data.get("slices"): - season_num = 1 - - number = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle.get("slice") or subtitle.get("default") or "") - ep_num = int(next((m.group(1) or m.group(2) for m in number), 0)) - - name = re.search(r"\d+\. (.+)", subtitle.get("slice") or "") - ep_name = name.group(1) if name else subtitle.get("slice") or "" - if not subtitle.get("slice"): - ep_name = subtitle.get("default") or "" - - return Episode( - id_=episode["episode"].get("id"), - service=self.__class__, - title=title, - season=season_num, - number=ep_num, - name=ep_name, - language="en", - data=episode, - ) - - def get_single_episode(self, url: str) -> Series: - r = self.session.get(url) - r.raise_for_status() - - redux = re.search("window.__IPLAYER_REDUX_STATE__ = (.*?);", r.text).group(1) - data = json.loads(redux) - subtitle = data["episode"].get("subtitle") - - if subtitle is not None: - season_match = re.search(r"Series (\d+):", subtitle) - season = int(season_match.group(1)) if season_match else 0 - number_match = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle) - number = int(next((m.group(1) or m.group(2) for m in number_match), 0)) - name_match = re.search(r"\d+\. (.+)", subtitle) - name = ( - name_match.group(1) - if name_match - else subtitle - if not re.search(r"Series (\d+): Episode (\d+)", subtitle) - else "" - ) - - return Series( - [ - Episode( - id_=data["episode"]["id"], - service=self.__class__, - title=data["episode"]["title"], - season=season if subtitle else 0, - number=number if subtitle else 0, - name=name if subtitle else "", - language="en", - ) - ] - ) - - def find(self, pattern, string, group=None): - if group: - m = re.search(pattern, string) - if m: - return m.group(group) - else: - return next(iter(re.findall(pattern, string)), None) diff --git a/services/iP/config.yaml b/services/iP/config.yaml deleted file mode 100644 index 7675834..0000000 --- a/services/iP/config.yaml +++ /dev/null @@ -1,11 +0,0 @@ -base_url: https://www.bbc.co.uk/iplayer/{type}/{pid} -user_agent: 'smarttv_AFTMM_Build_0003255372676_Chromium_41.0.2250.2' -api_key: 'D2FgtcTxGqqIgLsfBWTJdrQh2tVdeaAp' - -endpoints: - metadata: 'https://graph.ibl.api.bbc.co.uk/' - playlist: 'https://www.bbc.co.uk/programmes/{pid}/playlist.json' - manifest: "https://open.live.bbc.co.uk/mediaselector/6/select/version/2.0/mediaset/{mediaset}/vpid/{vpid}/" - manifest_: 'https://securegate.iplayer.bbc.co.uk/mediaselector/6/select/version/2.0/vpid/{vpid}/format/json/mediaset/{mediaset}/proto/https' - search: "https://search.api.bbci.co.uk/formula/iplayer-ibl-root" -