devine-services/services/ROKU/__init__.py
stabbedbybrick de37b1089b add services
2024-04-07 05:17:56 +02:00

250 lines
8.9 KiB
Python

import json
import re
import sys
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from http.cookiejar import CookieJar
from typing import Any, Optional
from urllib.parse import unquote, urlparse
import click
import requests
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Tracks
class ROKU(Service):
"""
Service code for The Roku Channel (https://therokuchannel.roku.com)
\b
Author: stabbedbybrick
Authorization: Cookies (optional)
Robustness:
Widevine:
L3: 1080p, DD5.1
\b
Tips:
- Use complete title/episode URL or id as input:
https://therokuchannel.roku.com/details/e05fc677ab9c5d5e8332f123770697b9/paddington
OR
e05fc677ab9c5d5e8332f123770697b9
- Supports movies, series, and single episodes
- Search is geofenced
"""
GEOFENCE = ("us",)
TITLE_RE = r"^(?:https?://(?:www.)?therokuchannel.roku.com/(?:details|watch)/)?(?P<id>[a-z0-9-]+)"
@staticmethod
@click.command(name="ROKU", short_help="https://therokuchannel.roku.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return ROKU(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = re.match(self.TITLE_RE, title).group("id")
super().__init__(ctx)
self.license: str
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
if cookies is not None:
self.session.cookies.update(cookies)
def search(self) -> Generator[SearchResult, None, None]:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {"csrf-token": token}
payload = {"query": self.title}
r = self.session.post(self.config["endpoints"]["search"], headers=headers, json=payload)
r.raise_for_status()
results = r.json()
for result in results["view"]:
if not result["content"]["type"] == "zone":
_id = result["content"].get("meta", {}).get("id")
_desc = result["content"].get("descriptions")
label = f'{result["content"].get("type")} ({result["content"].get("releaseYear")})'
if result["content"].get("viewOptions"):
label += f' ({result["content"]["viewOptions"][0].get("priceDisplay")})'
title = re.sub(r"^-|-$", "", re.sub(r"\W+", "-", result["content"].get("title").lower()))
yield SearchResult(
id_=_id,
title=title,
description=_desc["250"]["text"] if _desc.get("250") else None,
label=label,
url=f"https://therokuchannel.roku.com/details/{_id}/{title}",
)
def get_titles(self) -> Titles_T:
data = self.session.get(self.config["endpoints"]["content"] + self.title).json()
if not data["isAvailable"]:
self.log.error("This title is temporarily unavailable or expired")
sys.exit(1)
if data["type"] == "movie":
return Movies(
[
Movie(
id_=data["meta"]["id"],
service=self.__class__,
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
elif data["type"] == "series":
episodes = self.fetch_episodes(data)
return Series(
[
Episode(
id_=episode["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(episode["seasonNumber"]),
number=int(episode["episodeNumber"]),
name=episode["title"],
year=data["releaseYear"],
language=episode["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
for episode in episodes
]
)
elif data["type"] == "episode":
return Series(
[
Episode(
id_=data["meta"]["id"],
service=self.__class__,
title=data["title"],
season=int(data["seasonNumber"]),
number=int(data["episodeNumber"]),
name=data["title"],
year=data["releaseYear"],
language=data["viewOptions"][0]["media"].get("originalAudioLanguage", "en"),
data=None,
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
token = self.session.get(self.config["endpoints"]["token"]).json()["csrf"]
headers = {
"csrf-token": token,
}
payload = {
"rokuId": title.id,
"mediaFormat": "mpeg-dash",
"drmType": "widevine",
"quality": "fhd",
"providerId": "rokuavod",
}
r = self.session.post(
self.config["endpoints"]["vod"],
headers=headers,
json=payload,
)
r.raise_for_status()
videos = r.json()["playbackMedia"]["videos"]
self.license = next(
(
x["drmParams"]["licenseServerURL"]
for x in videos
if x.get("drmParams") and x["drmParams"]["keySystem"] == "Widevine"
),
None,
)
url = next((x["url"] for x in videos if x["streamFormat"] == "dash"), None)
if url and "origin" in urlparse(url).query:
url = unquote(urlparse(url).query.split("=")[1]).split("?")[0]
tracks = DASH.from_url(url=url).to_tracks(language=title.language)
tracks.videos[0].data["playbackMedia"] = r.json()["playbackMedia"]
for track in tracks.audio:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "description" in label.text:
track.descriptive = True
for track in tracks.subtitles:
label = track.data["dash"]["adaptation_set"].find("Label")
if label is not None and "caption" in label.text:
track.cc = True
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
track = title.tracks.videos[0]
chapters = []
if track.data.get("playbackMedia", {}).get("adBreaks"):
timestamps = sorted(track.data["playbackMedia"]["adBreaks"])
chapters = [Chapter(name=f"Chapter {i + 1:02}", timestamp=ad) for i, ad in enumerate(timestamps)]
if track.data.get("playbackMedia", {}).get("creditCuePoints"):
chapters.append(
Chapter(
name="Credits",
timestamp=datetime.fromtimestamp(
(track.data["playbackMedia"]["creditCuePoints"][0]["start"] / 1000),
tz=timezone.utc,
).strftime("%H:%M:%S.%f")[:-3],
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return # WidevineCdm.common_privacy_cert
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
self.log.error(r.text)
sys.exit(1)
return r.content
# service specific functions
def fetch_episode(self, episode: dict) -> json:
try:
r = self.session.get(self.config["endpoints"]["content"] + episode["meta"]["id"])
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
self.log.error(f"An error occurred while fetching episode {episode['meta']['id']}: {e}")
return None
def fetch_episodes(self, data: dict) -> list:
"""TODO: Switch to async once https proxies are fully supported"""
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = list(executor.map(self.fetch_episode, data["episodes"]))
return [task for task in tasks if task is not None]