Updated @stabbedbybrick service files

This commit is contained in:
TPD94 2024-04-22 13:52:16 -04:00
parent aee2998d66
commit 1e58c9359f
8 changed files with 0 additions and 1401 deletions

View File

@ -1,381 +0,0 @@
from __future__ import annotations
import base64
import hashlib
import json
import sys
import re
from collections.abc import Generator
from datetime import datetime, timezone
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
from click import Context
from Crypto.Util.Padding import unpad
from Cryptodome.Cipher import AES
from pywidevine.cdm import Cdm as WidevineCdm
from devine.core.credential import Credential
from devine.core.manifests.dash import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Subtitle, Tracks
class ALL4(Service):
"""
Service code for Channel 4's All4 streaming service (https://channel4.com).
\b
Author: stabbedbybrick
Authorization: Credentials
Robustness:
L3: 1080p, AAC2.0
\b
Tips:
- Use complete title URL or slug as input:
https://www.channel4.com/programmes/taskmaster OR taskmaster
- Use on demand URL for directly downloading episodes:
https://www.channel4.com/programmes/taskmaster/on-demand/75588-002
- Both android and web/pc endpoints are checked for quality profiles.
If android is missing 1080p, it automatically falls back to web.
"""
GEOFENCE = ("gb", "ie")
TITLE_RE = r"^(?:https?://(?:www\.)?channel4\.com/programmes/)?(?P<id>[a-z0-9-]+)(?:/on-demand/(?P<vid>[0-9-]+))?"
@staticmethod
@click.command(name="ALL4", short_help="https://channel4.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> ALL4:
return ALL4(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
super().__init__(ctx)
self.authorization: str
self.asset_id: int
self.license_token: str
self.manifest: str
self.session.headers.update(
{
"X-C4-Platform-Name": self.config["device"]["platform_name"],
"X-C4-Device-Type": self.config["device"]["device_type"],
"X-C4-Device-Name": self.config["device"]["device_name"],
"X-C4-App-Version": self.config["device"]["app_version"],
"X-C4-Optimizely-Datafile": self.config["device"]["optimizely_datafile"],
}
)
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential:
raise EnvironmentError("Service requires Credentials for Authentication.")
cache = self.cache.get(f"tokens_{credential.sha1}")
if cache and not cache.expired:
# cached
self.log.info(" + Using cached Tokens...")
tokens = cache.data
elif cache and cache.expired:
# expired, refresh
self.log.info("Refreshing cached Tokens")
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['android']['auth']}"},
data={
"grant_type": "refresh_token",
"username": credential.username,
"password": credential.password,
"refresh_token": cache.data["refreshToken"],
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to refresh tokens: {r.text}")
if "error" in res:
self.log.error(f"Failed to refresh tokens: {res['errorMessage']}")
sys.exit(1)
tokens = res
self.log.info(" + Refreshed")
else:
# new
headers = {"authorization": f"Basic {self.config['android']['auth']}"}
data = {
"grant_type": "password",
"username": credential.username,
"password": credential.password,
}
r = self.session.post(self.config["endpoints"]["login"], headers=headers, data=data)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to log in: {r.text}")
if "error" in res:
self.log.error(f"Failed to log in: {res['errorMessage']}")
sys.exit(1)
tokens = res
self.log.info(" + Acquired tokens...")
cache.set(tokens, expiration=tokens["expiresIn"])
self.authorization = f"Bearer {tokens['accessToken']}"
def search(self) -> Generator[SearchResult, None, None]:
params = {
"expand": "default",
"q": self.title,
"limit": "100",
"offset": "0",
}
r = self.session.get(self.config["endpoints"]["search"], params=params)
r.raise_for_status()
results = r.json()
if isinstance(results["results"], list):
for result in results["results"]:
yield SearchResult(
id_=result["brand"].get("websafeTitle"),
title=result["brand"].get("title"),
description=result["brand"].get("description"),
label=result.get("label"),
url=result["brand"].get("href"),
)
def get_titles(self) -> Union[Movies, Series]:
title, on_demand = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "vid"))
r = self.session.get(
self.config["endpoints"]["title"].format(title=title),
params={"client": "android-mod", "deviceGroup": "mobile", "include": "extended-restart"},
headers={"Authorization": self.authorization},
)
if not r.ok:
self.log.error(r.text)
sys.exit(1)
data = r.json()
if on_demand is not None:
return Series(
[
Episode(
id_=episode["programmeId"],
service=self.__class__,
title=data["brand"]["title"],
season=episode["seriesNumber"],
number=episode["episodeNumber"],
name=episode["originalTitle"],
language="en",
data=episode["assetInfo"].get("streaming"),
)
for episode in data["brand"]["episodes"]
if episode.get("assetInfo") and episode["programmeId"] == on_demand
]
)
elif data["brand"]["programmeType"] == "FM":
return Movies(
[
Movie(
id_=movie["programmeId"],
service=self.__class__,
name=data["brand"]["title"],
year=int(data["brand"]["summary"].split(" ")[0].strip().strip("()")),
language="en",
data=movie["assetInfo"].get("streaming"),
)
for movie in data["brand"]["episodes"]
]
)
else:
return Series(
[
Episode(
id_=episode["programmeId"],
service=self.__class__,
title=data["brand"]["title"],
season=episode["seriesNumber"],
number=episode["episodeNumber"],
name=episode["originalTitle"],
language="en",
data=episode["assetInfo"].get("streaming"),
)
for episode in data["brand"]["episodes"]
if episode.get("assetInfo")
]
)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
android_assets: tuple = self.android_playlist(title.id)
web_assets: tuple = self.web_playlist(title.id)
self.manifest, self.license_token, subtitle, data = self.sort_assets(android_assets, web_assets)
self.asset_id = int(title.data["assetId"])
tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language)
tracks.videos[0].data = data
if subtitle is not None:
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6],
url=subtitle,
codec=Subtitle.Codec.from_mime(subtitle[-3:]),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
for track in tracks.audio:
role = track.data["dash"]["representation"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
track = title.tracks.videos[0]
chapters = [
Chapter(
name=f"Chapter {i + 1:02}",
timestamp=datetime.fromtimestamp((ms / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3],
)
for i, ms in enumerate(x["breakOffset"] for x in track.data["adverts"]["breaks"])
]
if track.data.get("endCredits", {}).get("squeezeIn"):
chapters.append(
Chapter(
name="Credits",
timestamp=datetime.fromtimestamp(
(track.data["endCredits"]["squeezeIn"] / 1000), tz=timezone.utc
).strftime("%H:%M:%S.%f")[:-3],
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> str:
payload = {
"message": base64.b64encode(challenge).decode("utf8"),
"token": self.license_token,
"request_id": self.asset_id,
"video": {"type": "ondemand", "url": self.manifest},
}
r = self.session.post(self.config["endpoints"]["license"], json=payload)
if not r.ok:
raise ConnectionError(f"License request failed: {r.json()['status']['type']}")
return r.json()["license"]
# Service specific functions
def sort_assets(self, android_assets: tuple, web_assets: tuple) -> tuple:
if android_assets is not None:
try:
a_manifest, a_token, a_subtitle, data = android_assets
android_tracks = DASH.from_url(a_manifest, self.session).to_tracks("en")
android_heights = sorted([int(track.height) for track in android_tracks.videos], reverse=True)
except Exception:
android_heights = None
if web_assets is not None:
try:
b_manifest, b_token, b_subtitle, data = web_assets
session = self.session
session.headers.update(self.config["headers"])
web_tracks = DASH.from_url(b_manifest, session).to_tracks("en")
web_heights = sorted([int(track.height) for track in web_tracks.videos], reverse=True)
except Exception:
web_heights = None
if not android_heights and not web_heights:
self.log.error("Failed to request manifest data. If you're behind a VPN/proxy, you might be blocked")
sys.exit(1)
if not android_heights or android_heights[0] < 1080:
lic_token = self.decrypt_token(b_token, client="WEB")
return b_manifest, lic_token, b_subtitle, data
else:
lic_token = self.decrypt_token(a_token, client="ANDROID")
return a_manifest, lic_token, a_subtitle, data
def android_playlist(self, video_id: str) -> tuple:
url = self.config["android"]["vod"].format(video_id=video_id)
headers = {"authorization": self.authorization}
r = self.session.get(url=url, headers=headers)
if not r.ok:
self.log.warning("Request for Android endpoint returned %s", r)
return
data = json.loads(r.content)
manifest = data["videoProfiles"][0]["streams"][0]["uri"]
token = data["videoProfiles"][0]["streams"][0]["token"]
subtitle = next(
(x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")),
None,
)
return manifest, token, subtitle, data
def web_playlist(self, video_id: str) -> tuple:
url = self.config["web"]["vod"].format(programmeId=video_id)
r = self.session.get(url, headers=self.config["headers"])
if not r.ok:
self.log.warning("Request for WEB endpoint returned %s", r)
return
data = json.loads(r.content)
for item in data["videoProfiles"]:
if item["name"] == "dashwv-dyn-stream-1":
token = item["streams"][0]["token"]
manifest = item["streams"][0]["uri"]
subtitle = next(
(x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")),
None,
)
return manifest, token, subtitle, data
def decrypt_token(self, token: str, client: str) -> tuple:
if client == "ANDROID":
key = self.config["android"]["key"]
iv = self.config["android"]["iv"]
if client == "WEB":
key = self.config["web"]["key"]
iv = self.config["web"]["iv"]
if isinstance(token, str):
token = base64.b64decode(token)
cipher = AES.new(
key=base64.b64decode(key),
iv=base64.b64decode(iv),
mode=AES.MODE_CBC,
)
data = unpad(cipher.decrypt(token), AES.block_size)
dec_token = data.decode().split("|")[1]
return dec_token.strip()

View File

@ -1,27 +0,0 @@
headers:
Accept-Language: en-US,en;q=0.8
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36
endpoints:
login: https://api.channel4.com/online/v2/auth/token
title: https://api.channel4.com/online/v1/views/content-hubs/{title}.json
license: https://c4.eme.lp.aws.redbeemedia.com/wvlicenceproxy-service/widevine/acquire
search: https://all4nav.channel4.com/v1/api/search
android:
key: QVlESUQ4U0RGQlA0TThESA=="
iv: MURDRDAzODNES0RGU0w4Mg=="
auth: MzZVVUN0OThWTVF2QkFnUTI3QXU4ekdIbDMxTjlMUTE6Sllzd3lIdkdlNjJWbGlrVw==
vod: https://api.channel4.com/online/v1/vod/stream/{video_id}?client=android-mod
web:
key: bjljTGllWWtxd3pOQ3F2aQ==
iv: b2R6Y1UzV2RVaVhMdWNWZA==
vod: https://www.channel4.com/vod/stream/{programmeId}
device:
platform_name: android
device_type: mobile
device_name: "Sony C6903 (C6903)"
app_version: "android_app:9.4.2"
optimizely_datafile: "2908"

View File

@ -1,364 +0,0 @@
from __future__ import annotations
import hashlib
import json
import re
import sys
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor, as_completed
from http.cookiejar import CookieJar
from typing import Any, Optional
import click
from pywidevine.cdm import Cdm as WidevineCdm
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Subtitle, Tracks
class CTV(Service):
"""
Service code for CTV.ca (https://www.ctv.ca)
\b
Author: stabbedbybrick
Authorization: Credentials for subscription, none for freely available titles
Robustness:
Widevine:
L3: 1080p, DD5.1
\b
Tips:
- Input can be either complete title/episode URL or just the path:
/shows/young-sheldon
/shows/young-sheldon/baptists-catholics-and-an-attempted-drowning-s7e6
/movies/war-for-the-planet-of-the-apes
"""
TITLE_RE = r"^(?:https?://(?:www\.)?ctv\.ca(?:/[a-z]{2})?)?/(?P<type>movies|shows)/(?P<id>[a-z0-9-]+)(?:/(?P<episode>[a-z0-9-]+))?$"
GEOFENCE = ("ca",)
@staticmethod
@click.command(name="CTV", short_help="https://www.ctv.ca", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return CTV(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
self.authorization: str = None
self.api = self.config["endpoints"]["api"]
self.license_url = self.config["endpoints"]["license"]
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if credential:
cache = self.cache.get(f"tokens_{credential.sha1}")
if cache and not cache.expired:
# cached
self.log.info(" + Using cached Tokens...")
tokens = cache.data
elif cache and cache.expired:
# expired, refresh
self.log.info("Refreshing cached Tokens")
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['endpoints']['auth']}"},
data={
"grant_type": "refresh_token",
"username": credential.username,
"password": credential.password,
"refresh_token": cache.data["refresh_token"],
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to refresh tokens: {r.text}")
tokens = res
self.log.info(" + Refreshed")
else:
# new
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['endpoints']['auth']}"},
data={
"grant_type": "password",
"username": credential.username,
"password": credential.password,
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to log in: {r.text}")
tokens = res
self.log.info(" + Acquired tokens...")
cache.set(tokens, expiration=tokens["expires_in"])
self.authorization = f"Bearer {tokens['access_token']}"
def search(self) -> Generator[SearchResult, None, None]:
payload = {
"operationName": "searchMedia",
"variables": {"title": f"{self.title}"},
"query": """
query searchMedia($title: String!) {searchMedia(titleMatches: $title) {
... on Medias {page {items {title\npath}}}}}, """,
}
r = self.session.post(self.config["endpoints"]["search"], json=payload)
if r.status_code != 200:
self.log.error(r.text)
return
for result in r.json()["data"]["searchMedia"]["page"]["items"]:
yield SearchResult(
id_=result.get("path"),
title=result.get("title"),
description=result.get("description"),
label=result["path"].split("/")[1],
url="https://www.ctv.ca" + result.get("path"),
)
def get_titles(self) -> Titles_T:
title, kind, episode = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "type", "episode"))
title_path = self.get_title_id(kind, title, episode)
if episode is not None:
data = self.get_episode_data(title_path)
return Series(
[
Episode(
id_=data["axisId"],
service=self.__class__,
title=data["axisMedia"]["title"],
season=int(data["seasonNumber"]),
number=int(data["episodeNumber"]),
name=data["title"],
year=data.get("firstAirYear"),
language=data["axisPlaybackLanguages"][0].get("language", "en"),
data=data["axisPlaybackLanguages"][0]["destinationCode"],
)
]
)
if kind == "shows":
data = self.get_series_data(title_path)
titles = self.fetch_episodes(data["contentData"]["seasons"])
return Series(
[
Episode(
id_=episode["axisId"],
service=self.__class__,
title=data["contentData"]["title"],
season=int(episode["seasonNumber"]),
number=int(episode["episodeNumber"]),
name=episode["title"],
year=data["contentData"]["firstAirYear"],
language=episode["axisPlaybackLanguages"][0].get("language", "en"),
data=episode["axisPlaybackLanguages"][0]["destinationCode"],
)
for episode in titles
]
)
if kind == "movies":
data = self.get_movie_data(title_path)
return Movies(
[
Movie(
id_=data["contentData"]["firstPlayableContent"]["axisId"],
service=self.__class__,
name=data["contentData"]["title"],
year=data["contentData"]["firstAirYear"],
language=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0].get(
"language", "en"
),
data=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0]["destinationCode"],
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
base = f"https://capi.9c9media.com/destinations/{title.data}/platforms/desktop"
r = self.session.get(f"{base}/contents/{title.id}/contentPackages")
r.raise_for_status()
pkg_id = r.json()["Items"][0]["Id"]
base += "/playback/contents"
manifest = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.mpd?filter=25"
subtitle = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.vtt"
if self.authorization:
self.session.headers.update({"authorization": self.authorization})
tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6],
url=subtitle,
codec=Subtitle.Codec.from_mime(subtitle[-3:]),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return [] # Chapters not available
def get_widevine_service_certificate(self, **_: Any) -> str:
return WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license_url, data=challenge)
if r.status_code != 200:
self.log.error(r.text)
sys.exit(1)
return r.content
# service specific functions
def get_title_id(self, kind: str, title: tuple, episode: str) -> str:
if episode is not None:
title += f"/{episode}"
payload = {
"operationName": "resolvePath",
"variables": {"path": f"{kind}/{title}"},
"query": """
query resolvePath($path: String!) {
resolvedPath(path: $path) {
lastSegment {
content {
id
}
}
}
}
""",
}
r = self.session.post(self.api, json=payload).json()
return r["data"]["resolvedPath"]["lastSegment"]["content"]["id"]
def get_series_data(self, title_id: str) -> json:
payload = {
"operationName": "axisMedia",
"variables": {"axisMediaId": f"{title_id}"},
"query": """
query axisMedia($axisMediaId: ID!) {
contentData: axisMedia(id: $axisMediaId) {
title
description
originalSpokenLanguage
mediaType
firstAirYear
seasons {
title
id
seasonNumber
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]
def get_movie_data(self, title_id: str) -> json:
payload = {
"operationName": "axisMedia",
"variables": {"axisMediaId": f"{title_id}"},
"query": """
query axisMedia($axisMediaId: ID!) {
contentData: axisMedia(id: $axisMediaId) {
title
description
firstAirYear
firstPlayableContent {
axisId
axisPlaybackLanguages {
destinationCode
}
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]
def get_episode_data(self, title_path: str) -> json:
payload = {
"operationName": "axisContent",
"variables": {"id": f"{title_path}"},
"query": """
query axisContent($id: ID!) {
axisContent(id: $id) {
axisId
title
description
contentType
seasonNumber
episodeNumber
axisMedia {
title
}
axisPlaybackLanguages {
language
destinationCode
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]["axisContent"]
def fetch_episode(self, episode: str) -> json:
payload = {
"operationName": "season",
"variables": {"seasonId": f"{episode}"},
"query": """
query season($seasonId: ID!) {
axisSeason(id: $seasonId) {
episodes {
axisId
title
description
contentType
seasonNumber
episodeNumber
axisPlaybackLanguages {
language
destinationCode
}
}
}
}
""",
}
response = self.session.post(self.api, json=payload)
return response.json()["data"]["axisSeason"]["episodes"]
def fetch_episodes(self, data: dict) -> list:
"""TODO: Switch to async once https proxies are fully supported"""
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = [executor.submit(self.fetch_episode, x["id"]) for x in data]
titles = [future.result() for future in as_completed(tasks)]
return [episode for episodes in titles for episode in episodes]

View File

@ -1,6 +0,0 @@
endpoints:
login: https://account.bellmedia.ca/api/login/v2.1
auth: Y3R2LXdlYjpkZWZhdWx0
api: https://api.ctv.ca/space-graphql/graphql
license: https://license.9c9media.ca/widevine
search: https://www.ctv.ca/space-graphql/apq/graphql

View File

@ -1,249 +0,0 @@
import json
import re
import sys
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from http.cookiejar import CookieJar
from typing import Any, Optional
from urllib.parse import unquote, urlparse
import click
import requests
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Tracks
class ROKU(Service):
"""
Service code for The Roku Channel (https://therokuchannel.roku.com)
\b
Author: stabbedbybrick
Authorization: Cookies (optional)
Robustness:
Widevine:
L3: 1080p, DD5.1
\b
Tips:
- Use complete title/episode URL or id as input:
https://therokuchannel.roku.com/details/e05fc677ab9c5d5e8332f123770697b9/paddington
OR
e05fc677ab9c5d5e8332f123770697b9
- Supports movies, series, and single episodes
- Search is geofenced
"""
GEOFENCE = ("us",)
TITLE_RE = r"^(?:https?://(?:www.)?therokuchannel.roku.com/(?:details|watch)/)?(?P<id>[a-z0-9-]+)"
@staticmethod
@click.command(name="ROKU", short_help="https://therokuchannel.roku.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return ROKU(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = re.match(self.TITLE_RE, title).group("id")
super().__init__(ctx)
self.license: str
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
if cookies is not None:
self.session.cookies.update(cookies)
def search(self) -> Generator[SearchResult, None, None]:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {"csrf-token": token}
payload = {"query": self.title}
r = self.session.post(self.config["endpoints"]["search"], headers=headers, json=payload)
r.raise_for_status()
results = r.json()
for result in results["view"]:
if result["content"]["type"] not in ["zone", "provider"]:
_id = result["content"].get("meta", {}).get("id")
_desc = result["content"].get("descriptions", {})
label = f'{result["content"].get("type")} ({result["content"].get("releaseYear")})'
if result["content"].get("viewOptions"):
label += f' ({result["content"]["viewOptions"][0].get("priceDisplay")})'
title = re.sub(r"^-|-$", "", re.sub(r"\W+", "-", result["content"].get("title").lower()))
yield SearchResult(
id_=_id,
title=title,
description=_desc["250"]["text"] if _desc.get("250") else None,
label=label,
url=f"https://therokuchannel.roku.com/details/{_id}/{title}",
)
def get_titles(self) -> Titles_T:
data = self.session.get(self.config["endpoints"]["content"] + self.title).json()
if not data["isAvailable"]:
self.log.error("This title is temporarily unavailable or expired")
sys.exit(1)
if data["type"] == "movie":
return Movies(
[
Movie(
id_=data["meta"]["id"],
service=self.__class__,
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
elif data["type"] == "series":
episodes = self.fetch_episodes(data)
return Series(
[
Episode(
id_=episode["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(episode["seasonNumber"]),
number=int(episode["episodeNumber"]),
name=episode["title"],
year=data["releaseYear"],
language=episode["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
for episode in episodes
]
)
elif data["type"] == "episode":
return Series(
[
Episode(
id_=data["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(data["seasonNumber"]),
number=int(data["episodeNumber"]),
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {
"csrf-token": token,
}
payload = {
"rokuId": title.id,
"mediaFormat": "mpeg-dash",
"drmType": "widevine",
"quality": "fhd",
"providerId": "rokuavod",
}
r = self.session.post(
self.config["endpoints"]["vod"],
headers=headers,
json=payload,
)
r.raise_for_status()
videos = r.json()["playbackMedia"]["videos"]
self.license = next(
(
x["drmParams"]["licenseServerURL"]
for x in videos
if x.get("drmParams") and x["drmParams"]["keySystem"] == "Widevine"
),
None,
)
url = next((x["url"] for x in videos if x["streamFormat"] == "dash"), None)
if url and "origin" in urlparse(url).query:
url = unquote(urlparse(url).query.split("=")[1]).split("?")[0]
tracks = DASH.from_url(url=url).to_tracks(language=title.language)
tracks.videos[0].data["playbackMedia"] = r.json()["playbackMedia"]
for track in tracks.audio:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "description" in label.text:
track.descriptive = True
for track in tracks.subtitles:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "caption" in label.text:
track.cc = True
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
track = title.tracks.videos[0]
chapters = []
if track.data.get("playbackMedia", {}).get("adBreaks"):
timestamps = sorted(track.data["playbackMedia"]["adBreaks"])
chapters = [Chapter(name=f"Chapter {i + 1:02}", timestamp=ad.split(".")[0]) for i, ad in enumerate(timestamps)]
if track.data.get("playbackMedia", {}).get("creditCuePoints"):
start = next((
x.get("start") for x in track.data["playbackMedia"]["creditCuePoints"] if x.get("start") != 0), None)
if start:
chapters.append(
Chapter(
name="Credits",
timestamp=datetime.fromtimestamp((start / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3],
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return # WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
self.log.error(r.text)
sys.exit(1)
return r.content
# service specific functions
def fetch_episode(self, episode: dict) -> json:
try:
r = self.session.get(self.config["endpoints"]["content"] + episode["meta"]["id"])
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
self.log.error(f"An error occurred while fetching episode {episode['meta']['id']}: {e}")
return None
def fetch_episodes(self, data: dict) -> list:
"""TODO: Switch to async once https proxies are fully supported"""
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = list(executor.map(self.fetch_episode, data["episodes"]))
return [task for task in tasks if task is not None]

View File

@ -1,5 +0,0 @@
endpoints:
content: https://therokuchannel.roku.com/api/v2/homescreen/content/https%3A%2F%2Fcontent.sr.roku.com%2Fcontent%2Fv1%2Froku-trc%2F
vod: https://therokuchannel.roku.com/api/v3/playback
token: https://therokuchannel.roku.com/api/v1/csrf
search: https://therokuchannel.roku.com/api/v1/search

View File

@ -1,358 +0,0 @@
from __future__ import annotations
import hashlib
import json
import re
import sys
import warnings
from collections.abc import Generator
from typing import Any, Union
import click
from bs4 import XMLParsedAsHTMLWarning
from click import Context
from devine.core.manifests import DASH, HLS
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Audio, Chapter, Subtitle, Track, Tracks, Video
from devine.core.utils.collections import as_list
from devine.core.utils.sslciphers import SSLCiphers
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
class iP(Service):
"""
\b
Service code for the BBC iPlayer streaming service (https://www.bbc.co.uk/iplayer).
Base code from VT, credit to original author
\b
Author: stabbedbybrick
Authorization: None
Security: None
\b
Tips:
- Use full title URL as input for best results.
- Use --list-titles before anything, iPlayer's listings are often messed up.
\b
- An SSL certificate (PEM) is required for accessing the UHD endpoint.
Specify its path using the service configuration data in the root config:
\b
services:
iP:
cert: path/to/cert
\b
- Use -v H.265 to request UHD tracks
- See which titles are available in UHD:
https://www.bbc.co.uk/iplayer/help/questions/programme-availability/uhd-content
"""
ALIASES = ("bbciplayer", "bbc", "iplayer")
GEOFENCE = ("gb",)
TITLE_RE = r"^(?:https?://(?:www\.)?bbc\.co\.uk/(?:iplayer/(?P<kind>episode|episodes)/|programmes/))?(?P<id>[a-z0-9]+)(?:/.*)?$"
@staticmethod
@click.command(name="iP", short_help="https://www.bbc.co.uk/iplayer", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> iP:
return iP(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
self.vcodec = ctx.parent.params.get("vcodec")
super().__init__(ctx)
if self.vcodec == "H.265" and not self.config.get("cert"):
self.log.error("H.265 cannot be selected without a certificate")
sys.exit(1)
quality = ctx.parent.params.get("quality")
if quality and quality[0] > 1080 and self.vcodec != "H.265" and self.config.get("cert"):
self.log.info(" + Switched video codec to H.265 to be able to get 2160p video track")
self.vcodec = "H.265"
def search(self) -> Generator[SearchResult, None, None]:
params = {
"q": self.title,
"apikey": self.config["api_key"],
}
r = self.session.get(self.config["endpoints"]["search"], params=params)
r.raise_for_status()
results = r.json()
for result in results["results"]:
yield SearchResult(
id_=result.get("uri").split(":")[-1],
title=result.get("title"),
description=result.get("synopsis"),
label="series" if result.get("type", "") == "brand" else result.get("type"),
url=result.get("url"),
)
def get_titles(self) -> Union[Movies, Series]:
kind, pid = (re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "id"))
if not pid:
self.log.error("Unable to parse title ID - is the URL or id correct?")
sys.exit(1)
data = self.get_data(pid, slice_id=None)
if data is None and kind == "episode":
return self.get_single_episode(self.title)
elif data is None:
self.log.error("Metadata was not found - if %s is an episode, use full URL as input", pid)
sys.exit(1)
if "Film" in data["labels"]["category"]:
return Movies(
[
Movie(
id_=data["id"],
name=data["title"]["default"],
year=None, # TODO
service=self.__class__,
language="en",
data=data,
)
]
)
else:
seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]]
episodes = [self.create_episode(episode, data) for season in seasons for episode in season["entities"]["results"]]
return Series(episodes)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
r = self.session.get(url=self.config["endpoints"]["playlist"].format(pid=title.id))
if not r.ok:
self.log.error(r.text)
sys.exit(1)
versions = r.json().get("allAvailableVersions")
if not versions:
r = self.session.get(self.config["base_url"].format(type="episode", pid=title.id))
redux = re.search("window.__IPLAYER_REDUX_STATE__ = (.*?);</script>", r.text).group(1)
data = json.loads(redux)
versions = [
{"pid": x.get("id") for x in data["versions"] if not x.get("kind") == "audio-described"}
]
quality = [
connection.get("height")
for i in (
self.check_all_versions(version)
for version in (x.get("pid") for x in versions)
)
for connection in i
if connection.get("height")
]
max_quality = max((h for h in quality if h < "1080"), default=None)
media = next((i for i in (self.check_all_versions(version)
for version in (x.get("pid") for x in versions))
if any(connection.get("height") == max_quality for connection in i)), None)
connection = {}
for video in [x for x in media if x["kind"] == "video"]:
connections = sorted(video["connection"], key=lambda x: x["priority"])
if self.vcodec == "H.265":
connection = connections[0]
else:
connection = next(
x for x in connections if x["supplier"] == "mf_akamai" and x["transferFormat"] == "dash"
)
break
if not self.vcodec == "H.265":
if connection["transferFormat"] == "dash":
connection["href"] = "/".join(
connection["href"].replace("dash", "hls").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"]
)
connection["transferFormat"] = "hls"
elif connection["transferFormat"] == "hls":
connection["href"] = "/".join(
connection["href"].replace(".hlsv2.ism", "").split("?")[0].split("/")[0:-1] + ["hls", "master.m3u8"]
)
if connection["transferFormat"] != "hls":
raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}")
if connection["transferFormat"] == "dash":
tracks = DASH.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language)
elif connection["transferFormat"] == "hls":
tracks = HLS.from_url(url=connection["href"], session=self.session).to_tracks(language=title.language)
else:
raise ValueError(f"Unsupported video media transfer format {connection['transferFormat']!r}")
for video in tracks.videos:
# TODO: add HLG to UHD tracks
if any(re.search(r"-audio_\w+=\d+", x) for x in as_list(video.url)):
# create audio stream from the video stream
audio_url = re.sub(r"-video=\d+", "", as_list(video.url)[0])
audio = Audio(
# use audio_url not video url, as to ignore video bitrate in ID
id_=hashlib.md5(audio_url.encode()).hexdigest()[0:7],
url=audio_url,
codec=Audio.Codec.from_codecs("mp4a"),
language=[v.language for v in video.data["hls"]["playlist"].media][0],
bitrate=int(self.find(r"-audio_\w+=(\d+)", as_list(video.url)[0]) or 0),
channels=[v.channels for v in video.data["hls"]["playlist"].media][0],
descriptive=False, # Not available
descriptor=Track.Descriptor.HLS,
)
if not tracks.exists(by_id=audio.id):
# some video streams use the same audio, so natural dupes exist
tracks.add(audio)
# remove audio from the video stream
video.url = [re.sub(r"-audio_\w+=\d+", "", x) for x in as_list(video.url)][0]
video.codec = Video.Codec.from_codecs(video.data["hls"]["playlist"].stream_info.codecs)
video.bitrate = int(self.find(r"-video=(\d+)", as_list(video.url)[0]) or 0)
for caption in [x for x in media if x["kind"] == "captions"]:
connection = sorted(caption["connection"], key=lambda x: x["priority"])[0]
tracks.add(
Subtitle(
id_=hashlib.md5(connection["href"].encode()).hexdigest()[0:6],
url=connection["href"],
codec=Subtitle.Codec.from_codecs("ttml"),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
break
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> str:
return None
# service specific functions
def get_data(self, pid: str, slice_id: str) -> dict:
json_data = {
"id": "9fd1636abe711717c2baf00cebb668de",
"variables": {
"id": pid,
"perPage": 200,
"page": 1,
"sliceId": slice_id if slice_id else None,
},
}
r = self.session.post(self.config["endpoints"]["metadata"], json=json_data)
r.raise_for_status()
return r.json()["data"]["programme"]
def check_all_versions(self, vpid: str) -> list:
if self.config.get("cert"):
url = self.config["endpoints"]["manifest_"].format(
vpid=vpid,
mediaset="iptv-uhd" if self.vcodec == "H.265" else "iptv-all",
)
session = self.session
session.mount("https://", SSLCiphers())
session.mount("http://", SSLCiphers())
manifest = session.get(
url, headers={"user-agent": self.config["user_agent"]}, cert=self.config["cert"]
).json()
if "result" in manifest:
return {}
else:
url = self.config["endpoints"]["manifest"].format(
vpid=vpid,
mediaset="iptv-all",
)
manifest = self.session.get(url).json()
if "result" in manifest:
return {}
return manifest["media"]
def create_episode(self, episode: dict, data: dict) -> Episode:
title = episode["episode"]["title"]["default"].strip()
subtitle = episode["episode"]["subtitle"]
series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle.get("default") or "")
season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0))
if season_num == 0 and not data.get("slices"):
season_num = 1
number = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle.get("slice") or subtitle.get("default") or "")
ep_num = int(next((m.group(1) or m.group(2) for m in number), 0))
name = re.search(r"\d+\. (.+)", subtitle.get("slice") or "")
ep_name = name.group(1) if name else subtitle.get("slice") or ""
if not subtitle.get("slice"):
ep_name = subtitle.get("default") or ""
return Episode(
id_=episode["episode"].get("id"),
service=self.__class__,
title=title,
season=season_num,
number=ep_num,
name=ep_name,
language="en",
data=episode,
)
def get_single_episode(self, url: str) -> Series:
r = self.session.get(url)
r.raise_for_status()
redux = re.search("window.__IPLAYER_REDUX_STATE__ = (.*?);</script>", r.text).group(1)
data = json.loads(redux)
subtitle = data["episode"].get("subtitle")
if subtitle is not None:
season_match = re.search(r"Series (\d+):", subtitle)
season = int(season_match.group(1)) if season_match else 0
number_match = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle)
number = int(next((m.group(1) or m.group(2) for m in number_match), 0))
name_match = re.search(r"\d+\. (.+)", subtitle)
name = (
name_match.group(1)
if name_match
else subtitle
if not re.search(r"Series (\d+): Episode (\d+)", subtitle)
else ""
)
return Series(
[
Episode(
id_=data["episode"]["id"],
service=self.__class__,
title=data["episode"]["title"],
season=season if subtitle else 0,
number=number if subtitle else 0,
name=name if subtitle else "",
language="en",
)
]
)
def find(self, pattern, string, group=None):
if group:
m = re.search(pattern, string)
if m:
return m.group(group)
else:
return next(iter(re.findall(pattern, string)), None)

View File

@ -1,11 +0,0 @@
base_url: https://www.bbc.co.uk/iplayer/{type}/{pid}
user_agent: 'smarttv_AFTMM_Build_0003255372676_Chromium_41.0.2250.2'
api_key: 'D2FgtcTxGqqIgLsfBWTJdrQh2tVdeaAp'
endpoints:
metadata: 'https://graph.ibl.api.bbc.co.uk/'
playlist: 'https://www.bbc.co.uk/programmes/{pid}/playlist.json'
manifest: "https://open.live.bbc.co.uk/mediaselector/6/select/version/2.0/mediaset/{mediaset}/vpid/{vpid}/"
manifest_: 'https://securegate.iplayer.bbc.co.uk/mediaselector/6/select/version/2.0/vpid/{vpid}/format/json/mediaset/{mediaset}/proto/https'
search: "https://search.api.bbci.co.uk/formula/iplayer-ibl-root"