initial commit

This commit is contained in:
stabbedbybrick 2024-04-07 05:14:51 +02:00
commit f91fbb62dc
7 changed files with 1209 additions and 0 deletions

178
.gitignore vendored Normal file
View File

@ -0,0 +1,178 @@
# devine
*.mkv
*.mp4
*.exe
*.dll
*.crt
*.wvd
*.der
*.pem
*.bin
*.db
*.ttf
*.otf
device_cert
device_client_id_blob
device_private_key
device_vmp_blob
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

384
ALL4/__init__.py Normal file
View File

@ -0,0 +1,384 @@
from __future__ import annotations
import base64
import hashlib
import json
import sys
import re
from collections.abc import Generator
from datetime import datetime, timezone
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
from click import Context
from Crypto.Util.Padding import unpad
from Cryptodome.Cipher import AES
from pywidevine.cdm import Cdm as WidevineCdm
from devine.core.credential import Credential
from devine.core.manifests.dash import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Subtitle, Tracks
class ALL4(Service):
"""
Service code for Channel 4's All4 streaming service (https://channel4.com).
\b
Author: stabbedbybrick
Authorization: Credentials
Robustness:
L3: 1080p, AAC2.0
\b
Tips:
- Use complete title URL or slug as input:
https://www.channel4.com/programmes/taskmaster OR taskmaster
- Use on demand URL for directly downloading episodes:
https://www.channel4.com/programmes/taskmaster/on-demand/75588-002
- Both android and web/pc endpoints are checked for quality profiles.
If android is missing 1080p, it automatically falls back to web.
"""
GEOFENCE = ("gb", "ie")
TITLE_RE = r"^(?:https?://(?:www\.)?channel4\.com/programmes/)?(?P<id>[a-z0-9-]+)(?:/on-demand/(?P<vid>[0-9-]+))?"
@staticmethod
@click.command(name="ALL4", short_help="https://channel4.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: Context, **kwargs: Any) -> ALL4:
return ALL4(ctx, **kwargs)
def __init__(self, ctx: Context, title: str):
self.title = title
super().__init__(ctx)
self.authorization: str
self.asset_id: int
self.license_token: str
self.manifest: str
self.session.headers.update(
{
"X-C4-Platform-Name": self.config["device"]["platform_name"],
"X-C4-Device-Type": self.config["device"]["device_type"],
"X-C4-Device-Name": self.config["device"]["device_name"],
"X-C4-App-Version": self.config["device"]["app_version"],
"X-C4-Optimizely-Datafile": self.config["device"]["optimizely_datafile"],
}
)
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential:
raise EnvironmentError("Service requires Credentials for Authentication.")
cache = self.cache.get(f"tokens_{credential.sha1}")
if cache and not cache.expired:
# cached
self.log.info(" + Using cached Tokens...")
tokens = cache.data
elif cache and cache.expired:
# expired, refresh
self.log.info("Refreshing cached Tokens")
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['android']['auth']}"},
data={
"grant_type": "refresh_token",
"username": credential.username,
"password": credential.password,
"refresh_token": cache.data["refreshToken"],
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to refresh tokens: {r.text}")
if "error" in res:
self.log.error(f"Failed to refresh tokens: {res['errorMessage']}")
sys.exit(1)
tokens = res
self.log.info(" + Refreshed")
else:
# new
headers = {"authorization": f"Basic {self.config['android']['auth']}"}
data = {
"grant_type": "password",
"username": credential.username,
"password": credential.password,
}
r = self.session.post(self.config["endpoints"]["login"], headers=headers, data=data)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to log in: {r.text}")
if "error" in res:
self.log.error(f"Failed to log in: {res['errorMessage']}")
sys.exit(1)
tokens = res
self.log.info(" + Acquired tokens...")
cache.set(tokens, expiration=tokens["expiresIn"])
self.authorization = f"Bearer {tokens['accessToken']}"
def search(self) -> Generator[SearchResult, None, None]:
params = {
"expand": "default",
"q": self.title,
"limit": "100",
"offset": "0",
}
r = self.session.get(self.config["endpoints"]["search"], params=params)
r.raise_for_status()
results = r.json()
if isinstance(results["results"], list):
for result in results["results"]:
yield SearchResult(
id_=result["brand"].get("websafeTitle"),
title=result["brand"].get("title"),
description=result["brand"].get("description"),
label=result.get("label"),
url=result["brand"].get("href"),
)
def get_titles(self) -> Union[Movies, Series]:
title, on_demand = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "vid"))
r = self.session.get(
self.config["endpoints"]["title"].format(title=title),
params={"client": "android-mod", "deviceGroup": "mobile", "include": "extended-restart"},
headers={"Authorization": self.authorization},
)
if not r.ok:
self.log.error(r.text)
sys.exit(1)
data = r.json()
if on_demand is not None:
return Series(
[
Episode(
id_=episode["programmeId"],
service=self.__class__,
title=data["brand"]["title"],
season=episode["seriesNumber"],
number=episode["episodeNumber"],
name=episode["originalTitle"],
language="en",
data=episode["assetInfo"].get("streaming"),
)
for episode in data["brand"]["episodes"]
if episode.get("assetInfo") and episode["programmeId"] == on_demand
]
)
elif data["brand"]["programmeType"] == "FM":
return Movies(
[
Movie(
id_=movie["programmeId"],
service=self.__class__,
name=data["brand"]["title"],
year=int(data["brand"]["summary"].split(" ")[0].strip().strip("()")),
language="en",
data=movie["assetInfo"].get("streaming"),
)
for movie in data["brand"]["episodes"]
]
)
else:
return Series(
[
Episode(
id_=episode["programmeId"],
service=self.__class__,
title=data["brand"]["title"],
season=episode["seriesNumber"],
number=episode["episodeNumber"],
name=episode["originalTitle"],
language="en",
data=episode["assetInfo"].get("streaming"),
)
for episode in data["brand"]["episodes"]
if episode.get("assetInfo")
]
)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
android_assets: tuple = self.android_playlist(title.id)
web_assets: tuple = self.web_playlist(title.id)
self.manifest, self.license_token, subtitle, data = self.sort_assets(android_assets, web_assets)
self.asset_id = int(title.data["assetId"])
tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language)
tracks.videos[0].data = data
if subtitle is not None:
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6],
url=subtitle,
codec=Subtitle.Codec.from_mime(subtitle[-3:]),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
for track in tracks.audio:
role = track.data["dash"]["representation"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
track = title.tracks.videos[0]
chapters = [
Chapter(
name=f"Chapter {i + 1:02}",
timestamp=datetime.fromtimestamp((ms / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3],
)
for i, ms in enumerate(x["breakOffset"] for x in track.data["adverts"]["breaks"])
]
if track.data.get("endCredits", {}).get("squeezeIn"):
chapters.append(
Chapter(
name="Credits",
timestamp=datetime.fromtimestamp(
(track.data["endCredits"]["squeezeIn"] / 1000), tz=timezone.utc
).strftime("%H:%M:%S.%f")[:-3],
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> str:
payload = {
"message": base64.b64encode(challenge).decode("utf8"),
"token": self.license_token,
"request_id": self.asset_id,
"video": {"type": "ondemand", "url": self.manifest},
}
r = self.session.post(self.config["endpoints"]["license"], json=payload)
if not r.ok:
raise ConnectionError(f"License request failed: {r.json()['status']['type']}")
return r.json()["license"]
# Service specific functions
def sort_assets(self, android_assets: tuple, web_assets: tuple) -> tuple:
if android_assets is not None:
try:
a_manifest, a_token, a_subtitle, data = android_assets
android_tracks = DASH.from_url(a_manifest, self.session).to_tracks("en")
android_heights = sorted([int(track.height) for track in android_tracks.videos], reverse=True)
except Exception:
android_heights = None
if web_assets is not None:
try:
b_manifest, b_token, b_subtitle, data = web_assets
web_tracks = DASH.from_url(b_manifest, self.session).to_tracks("en")
web_heights = sorted([int(track.height) for track in web_tracks.videos], reverse=True)
except Exception:
web_heights = None
if not android_heights and not web_heights:
self.log.error("Failed to request manifest data. If you're behind a VPN/proxy, you might be blocked")
sys.exit(1)
if not android_heights or android_heights[0] < 1080:
self.log.warning(
"ANDROID data returned None or is missing full quality profile, falling back to WEB data..."
)
lic_token = self.decrypt_token(b_token, client="WEB")
return b_manifest, lic_token, b_subtitle, data
else:
lic_token = self.decrypt_token(a_token, client="ANDROID")
return a_manifest, lic_token, a_subtitle, data
def android_playlist(self, video_id: str) -> tuple:
self.log.info("Requesting ANDROID assets...")
url = self.config["android"]["vod"].format(video_id=video_id)
headers = {"authorization": self.authorization}
r = self.session.get(url=url, headers=headers)
if not r.ok:
self.log.warning("Request for Android endpoint returned %s", r)
return
data = json.loads(r.content)
manifest = data["videoProfiles"][0]["streams"][0]["uri"]
token = data["videoProfiles"][0]["streams"][0]["token"]
subtitle = next(
(x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")),
None,
)
return manifest, token, subtitle, data
def web_playlist(self, video_id: str) -> tuple:
self.log.info("Requesting WEB assets...")
url = self.config["web"]["vod"].format(programmeId=video_id)
r = self.session.get(url)
if not r.ok:
self.log.warning("Request for WEB endpoint returned %s", r)
return
data = json.loads(r.content)
for item in data["videoProfiles"]:
if item["name"] == "dashwv-dyn-stream-1":
token = item["streams"][0]["token"]
manifest = item["streams"][0]["uri"]
subtitle = next(
(x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")),
None,
)
return manifest, token, subtitle, data
def decrypt_token(self, token: str, client: str) -> tuple:
if client == "ANDROID":
key = self.config["android"]["key"]
iv = self.config["android"]["iv"]
if client == "WEB":
key = self.config["web"]["key"]
iv = self.config["web"]["iv"]
if isinstance(token, str):
token = base64.b64decode(token)
cipher = AES.new(
key=base64.b64decode(key),
iv=base64.b64decode(iv),
mode=AES.MODE_CBC,
)
data = unpad(cipher.decrypt(token), AES.block_size)
dec_token = data.decode().split("|")[1]
return dec_token.strip()

23
ALL4/config.yaml Normal file
View File

@ -0,0 +1,23 @@
endpoints:
login: https://api.channel4.com/online/v2/auth/token
title: https://api.channel4.com/online/v1/views/content-hubs/{title}.json
license: https://c4.eme.lp.aws.redbeemedia.com/wvlicenceproxy-service/widevine/acquire
search: https://all4nav.channel4.com/v1/api/search
android:
key: QVlESUQ4U0RGQlA0TThESA=="
iv: MURDRDAzODNES0RGU0w4Mg=="
auth: MzZVVUN0OThWTVF2QkFnUTI3QXU4ekdIbDMxTjlMUTE6Sllzd3lIdkdlNjJWbGlrVw==
vod: https://api.channel4.com/online/v1/vod/stream/{video_id}?client=android-mod
web:
key: bjljTGllWWtxd3pOQ3F2aQ==
iv: b2R6Y1UzV2RVaVhMdWNWZA==
vod: https://www.channel4.com/vod/stream/{programmeId}
device:
platform_name: android
device_type: mobile
device_name: "Sony C6903 (C6903)"
app_version: "android_app:9.4.2"
optimizely_datafile: "2908"

364
CTV/__init__.py Normal file
View File

@ -0,0 +1,364 @@
from __future__ import annotations
import hashlib
import json
import re
import sys
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor, as_completed
from http.cookiejar import CookieJar
from typing import Any, Optional
import click
from pywidevine.cdm import Cdm as WidevineCdm
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Subtitle, Tracks
class CTV(Service):
"""
Service code for CTV.ca (https://www.ctv.ca)
\b
Author: stabbedbybrick
Authorization: Credentials for subscription, none for freely available titles
Robustness:
Widevine:
L3: 1080p, DD5.1
\b
Tips:
- Input can be either complete title/episode URL or just the path:
/shows/young-sheldon
/shows/young-sheldon/baptists-catholics-and-an-attempted-drowning-s7e6
/movies/war-for-the-planet-of-the-apes
"""
TITLE_RE = r"^(?:https?://(?:www\.)?ctv\.ca(?:/[a-z]{2})?)?/(?P<type>movies|shows)/(?P<id>[a-z0-9-]+)(?:/(?P<episode>[a-z0-9-]+))?$"
GEOFENCE = ("ca",)
@staticmethod
@click.command(name="CTV", short_help="https://www.ctv.ca", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return CTV(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
self.authorization: str = None
self.api = self.config["endpoints"]["api"]
self.license_url = self.config["endpoints"]["license"]
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if credential:
cache = self.cache.get(f"tokens_{credential.sha1}")
if cache and not cache.expired:
# cached
self.log.info(" + Using cached Tokens...")
tokens = cache.data
elif cache and cache.expired:
# expired, refresh
self.log.info("Refreshing cached Tokens")
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['endpoints']['auth']}"},
data={
"grant_type": "refresh_token",
"username": credential.username,
"password": credential.password,
"refresh_token": cache.data["refresh_token"],
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to refresh tokens: {r.text}")
tokens = res
self.log.info(" + Refreshed")
else:
# new
r = self.session.post(
self.config["endpoints"]["login"],
headers={"authorization": f"Basic {self.config['endpoints']['auth']}"},
data={
"grant_type": "password",
"username": credential.username,
"password": credential.password,
},
)
try:
res = r.json()
except json.JSONDecodeError:
raise ValueError(f"Failed to log in: {r.text}")
tokens = res
self.log.info(" + Acquired tokens...")
cache.set(tokens, expiration=tokens["expires_in"])
self.authorization = f"Bearer {tokens['access_token']}"
def search(self) -> Generator[SearchResult, None, None]:
payload = {
"operationName": "searchMedia",
"variables": {"title": f"{self.title}"},
"query": """
query searchMedia($title: String!) {searchMedia(titleMatches: $title) {
... on Medias {page {items {title\npath}}}}}, """,
}
r = self.session.post(self.config["endpoints"]["search"], json=payload)
if r.status_code != 200:
self.log.error(r.text)
return
for result in r.json()["data"]["searchMedia"]["page"]["items"]:
yield SearchResult(
id_=result.get("path"),
title=result.get("title"),
description=result.get("description"),
label=result["path"].split("/")[1],
url="https://www.ctv.ca" + result.get("path"),
)
def get_titles(self) -> Titles_T:
title, kind, episode = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "type", "episode"))
title_path = self.get_title_id(kind, title, episode)
if episode is not None:
data = self.get_episode_data(title_path)
return Series(
[
Episode(
id_=data["axisId"],
service=self.__class__,
title=data["axisMedia"]["title"],
season=int(data["seasonNumber"]),
number=int(data["episodeNumber"]),
name=data["title"],
year=data.get("firstAirYear"),
language=data["axisPlaybackLanguages"][0].get("language", "en"),
data=data["axisPlaybackLanguages"][0]["destinationCode"],
)
]
)
if kind == "shows":
data = self.get_series_data(title_path)
titles = self.fetch_episodes(data["contentData"]["seasons"])
return Series(
[
Episode(
id_=episode["axisId"],
service=self.__class__,
title=data["contentData"]["title"],
season=int(episode["seasonNumber"]),
number=int(episode["episodeNumber"]),
name=episode["title"],
year=data["contentData"]["firstAirYear"],
language=episode["axisPlaybackLanguages"][0].get("language", "en"),
data=episode["axisPlaybackLanguages"][0]["destinationCode"],
)
for episode in titles
]
)
if kind == "movies":
data = self.get_movie_data(title_path)
return Movies(
[
Movie(
id_=data["contentData"]["firstPlayableContent"]["axisId"],
service=self.__class__,
name=data["contentData"]["title"],
year=data["contentData"]["firstAirYear"],
language=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0].get(
"language", "en"
),
data=data["contentData"]["firstPlayableContent"]["axisPlaybackLanguages"][0]["destinationCode"],
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
base = f"https://capi.9c9media.com/destinations/{title.data}/platforms/desktop"
r = self.session.get(f"{base}/contents/{title.id}/contentPackages")
r.raise_for_status()
pkg_id = r.json()["Items"][0]["Id"]
base += "/playback/contents"
manifest = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.mpd?filter=25"
subtitle = f"{base}/{title.id}/contentPackages/{pkg_id}/manifest.vtt"
if self.authorization:
self.session.headers.update({"authorization": self.authorization})
tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
tracks.add(
Subtitle(
id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6],
url=subtitle,
codec=Subtitle.Codec.from_mime(subtitle[-3:]),
language=title.language,
is_original_lang=True,
forced=False,
sdh=True,
)
)
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return [] # Chapters not available
def get_widevine_service_certificate(self, **_: Any) -> str:
return WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license_url, data=challenge)
if r.status_code != 200:
self.log.error(r.text)
sys.exit(1)
return r.content
# service specific functions
def get_title_id(self, kind: str, title: tuple, episode: str) -> str:
if episode is not None:
title += f"/{episode}"
payload = {
"operationName": "resolvePath",
"variables": {"path": f"{kind}/{title}"},
"query": """
query resolvePath($path: String!) {
resolvedPath(path: $path) {
lastSegment {
content {
id
}
}
}
}
""",
}
r = self.session.post(self.api, json=payload).json()
return r["data"]["resolvedPath"]["lastSegment"]["content"]["id"]
def get_series_data(self, title_id: str) -> json:
payload = {
"operationName": "axisMedia",
"variables": {"axisMediaId": f"{title_id}"},
"query": """
query axisMedia($axisMediaId: ID!) {
contentData: axisMedia(id: $axisMediaId) {
title
description
originalSpokenLanguage
mediaType
firstAirYear
seasons {
title
id
seasonNumber
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]
def get_movie_data(self, title_id: str) -> json:
payload = {
"operationName": "axisMedia",
"variables": {"axisMediaId": f"{title_id}"},
"query": """
query axisMedia($axisMediaId: ID!) {
contentData: axisMedia(id: $axisMediaId) {
title
description
firstAirYear
firstPlayableContent {
axisId
axisPlaybackLanguages {
destinationCode
}
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]
def get_episode_data(self, title_path: str) -> json:
payload = {
"operationName": "axisContent",
"variables": {"id": f"{title_path}"},
"query": """
query axisContent($id: ID!) {
axisContent(id: $id) {
axisId
title
description
contentType
seasonNumber
episodeNumber
axisMedia {
title
}
axisPlaybackLanguages {
language
destinationCode
}
}
}
""",
}
return self.session.post(self.api, json=payload).json()["data"]["axisContent"]
def fetch_episode(self, episode: str) -> json:
payload = {
"operationName": "season",
"variables": {"seasonId": f"{episode}"},
"query": """
query season($seasonId: ID!) {
axisSeason(id: $seasonId) {
episodes {
axisId
title
description
contentType
seasonNumber
episodeNumber
axisPlaybackLanguages {
language
destinationCode
}
}
}
}
""",
}
response = self.session.post(self.api, json=payload)
return response.json()["data"]["axisSeason"]["episodes"]
def fetch_episodes(self, data: dict) -> list:
"""TODO: Switch to async once https proxies are fully supported"""
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = [executor.submit(self.fetch_episode, x["id"]) for x in data]
titles = [future.result() for future in as_completed(tasks)]
return [episode for episodes in titles for episode in episodes]

6
CTV/config.yaml Normal file
View File

@ -0,0 +1,6 @@
endpoints:
login: https://account.bellmedia.ca/api/login/v2.1
auth: Y3R2LXdlYjpkZWZhdWx0
api: https://api.ctv.ca/space-graphql/graphql
license: https://license.9c9media.ca/widevine
search: https://www.ctv.ca/space-graphql/apq/graphql

249
ROKU/__init__.py Normal file
View File

@ -0,0 +1,249 @@
import json
import re
import sys
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from http.cookiejar import CookieJar
from typing import Any, Optional
from urllib.parse import unquote, urlparse
import click
import requests
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Tracks
class ROKU(Service):
"""
Service code for The Roku Channel (https://therokuchannel.roku.com)
\b
Author: stabbedbybrick
Authorization: Cookies (optional)
Robustness:
Widevine:
L3: 1080p, DD5.1
\b
Tips:
- Use complete title/episode URL or id as input:
https://therokuchannel.roku.com/details/e05fc677ab9c5d5e8332f123770697b9/paddington
OR
e05fc677ab9c5d5e8332f123770697b9
- Supports movies, series, and single episodes
- Search is geofenced
"""
GEOFENCE = ("us",)
TITLE_RE = r"^(?:https?://(?:www.)?therokuchannel.roku.com/(?:details|watch)/)?(?P<id>[a-z0-9-]+)"
@staticmethod
@click.command(name="ROKU", short_help="https://therokuchannel.roku.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return ROKU(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = re.match(self.TITLE_RE, title).group("id")
super().__init__(ctx)
self.license: str
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
if cookies is not None:
self.session.cookies.update(cookies)
def search(self) -> Generator[SearchResult, None, None]:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {"csrf-token": token}
payload = {"query": self.title}
r = self.session.post(self.config["endpoints"]["search"], headers=headers, json=payload)
r.raise_for_status()
results = r.json()
for result in results["view"]:
if not result["content"]["type"] == "zone":
_id = result["content"].get("meta", {}).get("id")
_desc = result["content"].get("descriptions")
label = f'{result["content"].get("type")} ({result["content"].get("releaseYear")})'
if result["content"].get("viewOptions"):
label += f' ({result["content"]["viewOptions"][0].get("priceDisplay")})'
title = re.sub(r"^-|-$", "", re.sub(r"\W+", "-", result["content"].get("title").lower()))
yield SearchResult(
id_=_id,
title=title,
description=_desc["250"]["text"] if _desc.get("250") else None,
label=label,
url=f"https://therokuchannel.roku.com/details/{_id}/{title}",
)
def get_titles(self) -> Titles_T:
data = self.session.get(self.config["endpoints"]["content"] + self.title).json()
if not data["isAvailable"]:
self.log.error("This title is temporarily unavailable or expired")
sys.exit(1)
if data["type"] == "movie":
return Movies(
[
Movie(
id_=data["meta"]["id"],
service=self.__class__,
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
elif data["type"] == "series":
episodes = self.fetch_episodes(data)
return Series(
[
Episode(
id_=episode["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(episode["seasonNumber"]),
number=int(episode["episodeNumber"]),
name=episode["title"],
year=data["releaseYear"],
language=episode["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
for episode in episodes
]
)
elif data["type"] == "episode":
return Series(
[
Episode(
id_=data["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(data["seasonNumber"]),
number=int(data["episodeNumber"]),
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {
"csrf-token": token,
}
payload = {
"rokuId": title.id,
"mediaFormat": "mpeg-dash",
"drmType": "widevine",
"quality": "fhd",
"providerId": "rokuavod",
}
r = self.session.post(
self.config["endpoints"]["vod"],
headers=headers,
json=payload,
)
r.raise_for_status()
videos = r.json()["playbackMedia"]["videos"]
self.license = next(
(
x["drmParams"]["licenseServerURL"]
for x in videos
if x.get("drmParams") and x["drmParams"]["keySystem"] == "Widevine"
),
None,
)
url = next((x["url"] for x in videos if x["streamFormat"] == "dash"), None)
if url and "origin" in urlparse(url).query:
url = unquote(urlparse(url).query.split("=")[1]).split("?")[0]
tracks = DASH.from_url(url=url).to_tracks(language=title.language)
tracks.videos[0].data["playbackMedia"] = r.json()["playbackMedia"]
for track in tracks.audio:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "description" in label.text:
track.descriptive = True
for track in tracks.subtitles:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "caption" in label.text:
track.cc = True
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
track = title.tracks.videos[0]
chapters = []
if track.data.get("playbackMedia", {}).get("adBreaks"):
timestamps = sorted(track.data["playbackMedia"]["adBreaks"])
chapters = [Chapter(name=f"Chapter {i + 1:02}", timestamp=ad) for i, ad in enumerate(timestamps)]
if track.data.get("playbackMedia", {}).get("creditCuePoints"):
chapters.append(
Chapter(
name="Credits",
timestamp=datetime.fromtimestamp(
(track.data["playbackMedia"]["creditCuePoints"][0]["start"] / 1000),
tz=timezone.utc,
).strftime("%H:%M:%S.%f")[:-3],
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return # WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
self.log.error(r.text)
sys.exit(1)
return r.content
# service specific functions
def fetch_episode(self, episode: dict) -> json:
try:
r = self.session.get(self.config["endpoints"]["content"] + episode["meta"]["id"])
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
self.log.error(f"An error occurred while fetching episode {episode['meta']['id']}: {e}")
return None
def fetch_episodes(self, data: dict) -> list:
"""TODO: Switch to async once https proxies are fully supported"""
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = list(executor.map(self.fetch_episode, data["episodes"]))
return [task for task in tasks if task is not None]

5
ROKU/config.yaml Normal file
View File

@ -0,0 +1,5 @@
endpoints:
content: https://therokuchannel.roku.com/api/v2/homescreen/content/https%3A%2F%2Fcontent.sr.roku.com%2Fcontent%2Fv1%2Froku-trc%2F
vod: https://therokuchannel.roku.com/api/v3/playback
token: https://therokuchannel.roku.com/api/v1/csrf
search: https://therokuchannel.roku.com/api/v1/search