Update to latest from @stabbedbybrick

This commit is contained in:
CDM-Project 2024-09-08 23:16:39 -04:00
parent 2a260d6c70
commit a4dd6c6177
6 changed files with 730 additions and 122 deletions

241
services/CBS/__init__.py Normal file
View File

@ -0,0 +1,241 @@
from __future__ import annotations
import json
import re
import sys
from collections.abc import Generator
from typing import Any, Optional, Union
from urllib.parse import urljoin
import click
from devine.core.constants import AnyTrack
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Chapters, Tracks
from devine.core.utils.sslciphers import SSLCiphers
from devine.core.utils.xml import load_xml
from requests import Request
class CBS(Service):
"""
\b
Service code for CBS.com streaming service (https://cbs.com).
Credit to @srpen6 for the tip on anonymous session
\b
Author: stabbedbybrick
Authorization: None
Robustness:
Widevine:
L3: 2160p, DDP5.1
\b
Tips:
- Input should be complete URLs:
SERIES: https://www.cbs.com/shows/tracker/
EPISODE: https://www.cbs.com/shows/video/E0wG_ovVMkLlHOzv7KDpUV9bjeKFFG2v/
\b
Common VPN/proxy errors:
- SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING]'))
- ConnectionError: 406 Not Acceptable, 403 Forbidden
"""
GEOFENCE = ("us",)
@staticmethod
@click.command(name="CBS", short_help="https://cbs.com", help=__doc__)
@click.argument("title", type=str, required=False)
@click.pass_context
def cli(ctx, **kwargs) -> CBS:
return CBS(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
def search(self) -> Generator[SearchResult, None, None]:
params = {
"term": self.title,
"termCount": 50,
"showCanVids": "true",
}
results = self._request("GET", "/apps-api/v3.1/androidphone/contentsearch/search.json", params=params)["terms"]
for result in results:
yield SearchResult(
id_=result.get("path"),
title=result.get("title"),
description=None,
label=result.get("term_type"),
url=result.get("path"),
)
def get_titles(self) -> Titles_T:
title_re = r"https://www\.cbs\.com/shows/(?P<video>video/)?(?P<id>[a-zA-Z0-9_-]+)/?$"
try:
video, title_id = (re.match(title_re, self.title).group(i) for i in ("video", "id"))
except Exception:
raise ValueError("- Could not parse ID from title")
if video:
episodes = self._episode(title_id)
else:
episodes = self._show(title_id)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
self.token, self.license = self.ls_session(title.id)
manifest = self.get_manifest(title)
return DASH.from_url(url=manifest).to_tracks(language=title.language)
def get_chapters(self, title: Episode) -> Chapters:
if not title.data.get("playbackEvents", {}).get("endCreditChapterTimeMs"):
return Chapters()
end_credits = title.data["playbackEvents"]["endCreditChapterTimeMs"]
return Chapters([Chapter(name="Credits", timestamp=end_credits)])
def certificate(self, **_):
return None # will use common privacy cert
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
headers = {"Authorization": f"Bearer {self.token}"}
r = self.session.post(self.license, headers=headers, data=challenge)
if not r.ok:
self.log.error(r.text)
sys.exit(1)
return r.content
# Service specific functions
def _show(self, title: str) -> Episode:
data = self._request("GET", "/apps-api/v3.0/androidphone/shows/slug/{}.json".format(title))
links = next((x.get("links") for x in data["showMenu"] if x.get("device_app_id") == "all_platforms"), None)
config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None)
show = next((x for x in data["show"]["results"] if x.get("type").strip() == "show"), None)
seasons = [x.get("seasonNum") for x in data["available_video_seasons"].get("itemList", [])]
locale = show.get("locale", "en-US")
show_data = self._request(
"GET", "/apps-api/v2.0/androidphone/shows/{}/videos/config/{}.json".format(show.get("show_id"), config),
params={"platformType": "apps", "rows": "1", "begin": "0"},
)
section = next(
(x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None
)
episodes = []
for season in seasons:
res = self._request(
"GET", "/apps-api/v2.0/androidphone/videos/section/{}.json".format(section),
params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season},
)
episodes.extend(res["sectionItems"].get("itemList", []))
return [
Episode(
id_=episode["contentId"],
title=episode["seriesTitle"],
season=episode["seasonNum"] if episode["fullEpisode"] else 0,
number=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"],
name=episode["label"],
language=locale,
service=self.__class__,
data=episode,
)
for episode in episodes
if episode["fullEpisode"]
]
def _episode(self, title: str) -> Episode:
data = self._request("GET", "/apps-api/v2.0/androidphone/video/cid/{}.json".format(title))
return [
Episode(
id_=episode["contentId"],
title=episode["seriesTitle"],
season=episode["seasonNum"] if episode["fullEpisode"] else 0,
number=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"],
name=episode["label"],
language="en-US",
service=self.__class__,
data=episode,
)
for episode in data["itemList"]
]
def ls_session(self, content_id: str) -> str:
res = self._request(
"GET", "/apps-api/v3.0/androidphone/irdeto-control/anonymous-session-token.json",
params={"contentId": content_id},
)
return res.get("ls_session"), res.get("url")
def get_manifest(self, title: Episode) -> str:
try:
res = self._request(
"GET", "http://link.theplatform.com/s/{}/media/guid/2198311517/{}".format(
title.data.get("cmsAccountId"), title.id
),
params={
"format": "SMIL",
"assetTypes": "|".join(self.config["assets"]),
"formats": "MPEG-DASH,MPEG4,M3U",
},
)
body = load_xml(res).find("body").find("seq").findall("switch")
bitrate = max(body, key=lambda x: int(x.find("video").get("system-bitrate")))
videos = [x.get("src") for x in bitrate.findall("video")]
if not videos:
raise ValueError("Could not find any streams - is the title still available?")
manifest = next(
(x for x in videos if "hdr_dash" in x.lower()),
next((x for x in videos if "cenc_dash" in x.lower()), videos[0]),
)
except Exception as e:
self.log.warning("ThePlatform request failed: {}, falling back to standard manifest".format(e))
if not title.data.get("streamingUrl"):
raise ValueError("Could not find any streams - is the title still available?")
manifest = title.data.get("streamingUrl")
return manifest
def _request(self, method: str, api: str, params: dict = None, headers: dict = None) -> Any[dict | str]:
url = urljoin(self.config["endpoints"]["base_url"], api)
self.session.headers.update(self.config["headers"])
self.session.params = {"at": self.config["endpoints"]["token"]}
for prefix in ("https://", "http://"):
self.session.mount(prefix, SSLCiphers(security_level=2))
if params:
self.session.params.update(params)
if headers:
self.session.headers.update(headers)
prep = self.session.prepare_request(Request(method, url))
response = self.session.send(prep)
if response.status_code != 200:
raise ConnectionError(f"{response.text}")
try:
data = json.loads(response.content)
if not data.get("success"):
raise ValueError(data.get("message"))
return data
except json.JSONDecodeError:
return response.text

10
services/CBS/config.yaml Normal file
View File

@ -0,0 +1,10 @@
headers:
user-agent: Mozilla/5.0 (Linux; Android 13; SM-A536E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Mobile Safari/537.36
endpoints:
base_url: https://cbsdigital.cbs.com
token: ABBsaBMagMmYLUc9iXB0lXEKsUQ0/MwRn6z3Tg0KKQaH7Q6QGqJcABwlBP4XiMR1b0Q=
assets: [HLS_AES, DASH_LIVE, DASH_CENC, DASH_CENC_HDR10, DASH_LIVE, DASH_TA, DASH_CENC_PS4]

View File

@ -1,20 +1,23 @@
from __future__ import annotations
import json
import re
import sys
import uuid
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Any, Optional, Union
from urllib.parse import urljoin
import click
from click import Context
from devine.core.credential import Credential
from devine.core.manifests.dash import DASH
from devine.core.manifests import DASH, HLS
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Tracks
from devine.core.tracks import Chapters, Tracks
from requests import Request
class DSCP(Service):
@ -26,19 +29,29 @@ class DSCP(Service):
Author: stabbedbybrick
Authorization: Cookies
Robustness:
L3: 2160p, AAC2.0
Widevine:
L3: 2160p, AAC2.0
ClearKey:
AES-128: 1080p, AAC2.0
\b
Tips:
- Input can be either complete title URL or just the path: '/show/richard-hammonds-workshop'
- Input can be either complete title URL or just the path:
SHOW: /show/richard-hammonds-workshop
EPISODE: /video/richard-hammonds-workshop/new-beginnings
SPORT: /video/sport/tnt-sports-1/uefa-champions-league
- Use the --lang LANG_RANGE option to request non-english tracks
- Single video URLs are currently not supported
- use -v H.265 to request H.265 tracks
- use -v H.265 to request H.265 UHD tracks (if available)
\b
Notes:
- Using '-v H.265' will request DASH manifest even if no H.265 tracks are available.
This can be useful if HLS is not available for some reason.
"""
ALIASES = ("dplus", "discoveryplus", "discovery+")
TITLE_RE = r"^(?:https?://(?:www\.)?discoveryplus\.com(?:/[a-z]{2})?)?/(?P<type>show|video)/(?P<id>[a-z0-9-]+)"
TITLE_RE = r"^(?:https?://(?:www\.)?discoveryplus\.com(?:/[a-z]{2})?)?/(?P<type>show|video)/(?P<id>[a-z0-9-/]+)"
@staticmethod
@click.command(name="DSCP", short_help="https://discoveryplus.com", help=__doc__)
@ -52,8 +65,6 @@ class DSCP(Service):
self.vcodec = ctx.parent.params.get("vcodec")
super().__init__(ctx)
self.license = None
def authenticate(
self,
cookies: Optional[CookieJar] = None,
@ -64,12 +75,25 @@ class DSCP(Service):
raise EnvironmentError("Service requires Cookies for Authentication.")
self.session.cookies.update(cookies)
self.configure()
self.base_url = None
info = self._request("GET", "https://global-prod.disco-api.com/bootstrapInfo")
self.base_url = info["data"]["attributes"].get("baseApiUrl")
user = self._request("GET", "/users/me")
self.territory = user["data"]["attributes"]["currentLocationTerritory"]
self.user_language = user["data"]["attributes"]["clientTranslationLanguageTags"][0]
self.site_id = user["meta"]["site"]["id"]
def search(self) -> Generator[SearchResult, None, None]:
r = self.session.get(self.config["endpoints"]["search"].format(base_api=self.base_api, query=self.title))
r.raise_for_status()
data = r.json()
params = {
"include": "default",
"decorators": "viewingHistory,isFavorite,playbackAllowed,contentAction,badges",
"contentFilter[query]": self.title,
"page[items.number]": "1",
"page[items.size]": "8",
}
data = self._request("GET", "/cms/routes/search/result", params=params)
results = [x.get("attributes") for x in data["included"] if x.get("type") == "show"]
@ -89,106 +113,61 @@ class DSCP(Service):
raise ValueError("Could not parse ID from title - is the URL correct?")
if kind == "video":
self.log.error("Single videos are not supported by this service.")
sys.exit(1)
episodes = self._episode(content_id)
if kind == "show":
data = self.session.get(
self.config["endpoints"]["show"].format(base_api=self.base_api, title_id=content_id)
).json()
if "errors" in data:
if "invalid.token" in data["errors"][0]["code"]:
self.log.error("- Invalid Token. Cookies are invalid or may have expired.")
sys.exit(1)
episodes = self._show(content_id)
raise ConnectionError(data["errors"])
content = next(x for x in data["included"] if x["attributes"].get("alias") == "generic-show-episodes")
content_id = content["id"]
show_id = content["attributes"]["component"]["mandatoryParams"]
season_params = [x.get("parameter") for x in content["attributes"]["component"]["filters"][0]["options"]]
page = next(x for x in data["included"] if x.get("type", "") == "page")
seasons = [
self.session.get(
self.config["endpoints"]["seasons"].format(
base_api=self.base_api, content_id=content_id, season=season, show_id=show_id
)
).json()
for season in season_params
]
videos = [[x for x in season["included"] if x["type"] == "video"] for season in seasons]
return Series(
[
Episode(
id_=ep["id"],
service=self.__class__,
title=page["attributes"]["title"],
year=ep["attributes"]["airDate"][:4],
season=ep["attributes"].get("seasonNumber"),
number=ep["attributes"].get("episodeNumber"),
name=ep["attributes"]["name"],
language=ep["attributes"]["audioTracks"][0]
if ep["attributes"].get("audioTracks")
else self.user_language,
data=ep,
)
for episodes in videos
for ep in episodes
if ep["attributes"]["videoType"] == "EPISODE"
]
)
return Series(episodes)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
platform = "firetv" if self.vcodec == "H.265" else "desktop"
res = self.session.post(
self.config["endpoints"]["playback"].format(base_api=self.base_api),
json={
"videoId": title.id,
"deviceInfo": {
"adBlocker": "false",
"drmSupported": "true",
"hwDecodingCapabilities": ["H264", "H265"],
"screen": {"width": 3840, "height": 2160},
"player": {"width": 3840, "height": 2160},
},
"wisteriaProperties": {
"advertiser": {"firstPlay": 0, "fwIsLat": 0},
"device": {"browser": {"name": "chrome", "version": "96.0.4664.55"}, "type": platform},
"platform": platform,
"product": "dplus_emea",
"sessionId": str(uuid.uuid1()),
"streamProvider": {"suspendBeaconing": 0, "hlsVersion": 7, "pingConfig": 1},
},
payload = {
"videoId": title.id,
"deviceInfo": {
"adBlocker": "false",
"drmSupported": "false",
"hwDecodingCapabilities": ["H264", "H265"],
"screen": {"width": 3840, "height": 2160},
"player": {"width": 3840, "height": 2160},
},
).json()
"wisteriaProperties": {
"product": "dplus_emea",
"sessionId": str(uuid.uuid1()),
},
}
if "errors" in res:
if "missingpackage" in res["errors"][0]["code"]:
self.log.error("- Access Denied. Title is not available for this account.")
sys.exit(1)
if self.vcodec == "H.265":
payload["wisteriaProperties"]["device"] = {
"browser": {"name": "chrome", "version": "96.0.4664.55"},
"type": "firetv",
}
payload["wisteriaProperties"]["platform"] = "firetv"
if "invalid.token" in res["errors"][0]["code"]:
self.log.error("- Invalid Token. Cookies are invalid or may have expired.")
sys.exit(1)
raise ConnectionError(res["errors"])
res = self._request("POST", "/playback/v3/videoPlaybackInfo", payload=payload)
streaming = res["data"]["attributes"]["streaming"][0]
streaming_type = streaming["type"].strip().lower()
manifest = streaming["url"]
self.token = None
self.license = None
if streaming["protection"]["drmEnabled"]:
self.token = streaming["protection"]["drmToken"]
self.license = streaming["protection"]["schemes"]["widevine"]["licenseUrl"]
tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
if streaming_type == "hls":
tracks = HLS.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
elif streaming_type == "dash":
tracks = DASH.from_url(url=manifest, session=self.session).to_tracks(language=title.language)
else:
raise ValueError(f"Unknown streaming type: {streaming_type}")
return tracks
def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]:
return []
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
return Chapters()
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
@ -205,26 +184,117 @@ class DSCP(Service):
# Service specific functions
def configure(self):
self.session.headers.update(
{
"user-agent": "Chrome/96.0.4664.55",
"x-disco-client": "WEB:UNKNOWN:dplus_us:2.44.4",
"x-disco-params": "realm=go,siteLookupKey=dplus_us,bid=dplus,hn=www.discoveryplus.com,hth=,uat=false",
}
)
def _show(self, title: str) -> Episode:
params = {
"include": "default",
"decorators": "playbackAllowed,contentAction,badges",
}
data = self._request("GET", "/cms/routes/show/{}".format(title), params=params)
info = self.session.get(self.config["endpoints"]["info"]).json()
self.base_api = info["data"]["attributes"]["baseApiUrl"]
content = next(x for x in data["included"] if x["attributes"].get("alias") == "generic-show-episodes")
content_id = content["id"]
show_id = content["attributes"]["component"]["mandatoryParams"]
season_params = [x.get("parameter") for x in content["attributes"]["component"]["filters"][0]["options"]]
page = next(x for x in data["included"] if x.get("type", "") == "page")
user = self.session.get(self.config["endpoints"]["user"].format(base_api=self.base_api)).json()
if "errors" in user:
if "invalid.token" in user["errors"][0]["code"]:
self.log.error("- Invalid Token. Cookies are invalid or may have expired.")
sys.exit(1)
seasons = [
self._request(
"GET", "/cms/collections/{}?{}&{}".format(content_id, season, show_id),
params={"include": "default", "decorators": "playbackAllowed,contentAction,badges"},
)
for season in season_params
]
raise ConnectionError(user["errors"])
videos = [[x for x in season["included"] if x["type"] == "video"] for season in seasons]
self.territory = user["data"]["attributes"]["currentLocationTerritory"]
self.user_language = user["data"]["attributes"]["clientTranslationLanguageTags"][0]
self.site_id = user["meta"]["site"]["id"]
return [
Episode(
id_=ep["id"],
service=self.__class__,
title=page["attributes"]["title"],
year=ep["attributes"]["airDate"][:4],
season=ep["attributes"].get("seasonNumber"),
number=ep["attributes"].get("episodeNumber"),
name=ep["attributes"]["name"],
language=ep["attributes"]["audioTracks"][0]
if ep["attributes"].get("audioTracks")
else self.user_language,
data=ep,
)
for episodes in videos
for ep in episodes
if ep["attributes"]["videoType"] == "EPISODE"
]
def _episode(self, title: str) -> Episode:
params = {
"include": "default",
"decorators": "playbackAllowed,contentAction,badges",
}
data = self._request("GET", "/cms/routes/video/{}".format(title), params=params)
page = next((x for x in data["included"] if x.get("type", "") == "page"), None)
if not page:
raise IndexError("Episode page not found")
video_id = page["relationships"].get("primaryContent", {}).get("data", {}).get("id")
if not video_id:
raise IndexError("Episode id not found")
params = {"decorators": "isFavorite", "include": "primaryChannel"}
content = self._request("GET", "/content/videos/{}".format(video_id), params=params)
episode = content["data"]["attributes"]
name = episode.get("name")
if episode.get("secondaryTitle"):
name += f" {episode.get('secondaryTitle')}"
return [
Episode(
id_=content["data"].get("id"),
service=self.__class__,
title=page["attributes"]["title"],
year=int(episode.get("airDate")[:4]) if episode.get("airDate") else None,
season=episode.get("seasonNumber") or 0,
number=episode.get("episodeNumber") or 0,
name=name,
language=episode["audioTracks"][0] if episode.get("audioTracks") else self.user_language,
data=episode,
)
]
def _request(
self,
method: str,
api: str,
params: dict = None,
headers: dict = None,
payload: dict = None,
) -> Any[dict | str]:
url = urljoin(self.base_url, api)
self.session.headers.update(self.config["headers"])
if params:
self.session.params.update(params)
if headers:
self.session.headers.update(headers)
prep = self.session.prepare_request(Request(method, url, json=payload))
response = self.session.send(prep)
try:
data = json.loads(response.content)
if data.get("errors"):
if "invalid.token" in data["errors"][0]["code"]:
self.log.error("- Invalid Token. Cookies are invalid or may have expired.")
sys.exit(1)
if "missingpackage" in data["errors"][0]["code"]:
self.log.error("- Access Denied. Title is not available for this subscription.")
sys.exit(1)
raise ConnectionError(data["errors"])
return data
except Exception as e:
raise ConnectionError("Request failed: {}".format(e))

View File

@ -1,8 +1,4 @@
endpoints:
info: https://global-prod.disco-api.com/bootstrapInfo
user: "{base_api}/users/me"
prod: "{base_api}/cms/configs/web-prod"
show: "{base_api}/cms/routes/show/{title_id}?include=default&decorators=viewingHistory,isFavorite,playbackAllowed"
seasons: "{base_api}/cms/collections/{content_id}?include=default&decorators=viewingHistory,isFavorite,playbackAllowed,contentAction,badges&{season}&{show_id}"
playback: "{base_api}/playback/v3/videoPlaybackInfo"
search: "{base_api}/cms/routes/search/result?include=default&decorators=viewingHistory,isFavorite,playbackAllowed,contentAction,badges&contentFilter[query]={query}&page[items.number]=1&page[items.size]=8"
headers:
user-agent: Chrome/96.0.4664.55
x-disco-client: WEB:UNKNOWN:dplus_us:2.44.4
x-disco-params: realm=go,siteLookupKey=dplus_us,bid=dplus,hn=www.discoveryplus.com,hth=,uat=false

284
services/RTE/__init__.py Normal file
View File

@ -0,0 +1,284 @@
from __future__ import annotations
import base64
import json
import re
from collections.abc import Generator
from typing import Any, Optional, Union
from urllib.parse import urljoin
import click
from devine.core.constants import AnyTrack
from devine.core.manifests import DASH
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Chapters, Tracks
from devine.core.utils.xml import load_xml
from requests import Request
class RTE(Service):
"""
\b
Service code for RTE Player streaming service (https://www.rte.ie/player/).
\b
Author: stabbedbybrick
Authorization: None
Robustness:
Widevine:
L3: 1080p, AAC2.0
\b
Tips:
- Input (pay attention to the URL format):
SERIES: https://www.rte.ie/player/series/crossfire/10003928-00-0000
EPISODE: https://www.rte.ie/player/series/crossfire/10003928-00-0000?epguid=AQ10003929-01-0001
MOVIE: https://www.rte.ie/player/movie/glass/360230440380
\b
Notes:
- Since some content is accessible worldwide, geofence is deactivated.
- Using an IE IP-address is recommended to access everything.
"""
# GEOFENCE = ("ie",)
@staticmethod
@click.command(name="RTE", short_help="https://www.rte.ie/player/", help=__doc__)
@click.argument("title", type=str, required=False)
@click.pass_context
def cli(ctx, **kwargs) -> RTE:
return RTE(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
self.base_url = self.config["endpoints"]["base_url"]
self.feed = self.config["endpoints"]["feed"]
self.license = self.config["endpoints"]["license"]
def search(self) -> Generator[SearchResult, None, None]:
params = {
"byProgramType": "Series|Movie",
"q": f"title:({self.title})",
"range": "0-40",
"schema": "2.15",
"sort": "rte$rank|desc",
"gzip": "true",
"omitInvalidFields": "true",
}
results = self._request(f"{self.feed}/f/1uC-gC/rte-prd-prd-search", params=params)["entries"]
for result in results:
link = "https://www.rte.ie/player/{}/{}/{}"
series = result.get("plprogram$programType").lower() == "series"
_id = result.get("guid") if series else result.get("id").split("/")[-1]
_title = result.get("title") if series else result.get("plprogram$longTitle")
_type = result.get("plprogram$programType")
title = _title.format(_type, _title, _id).lower()
title = re.sub(r"\W+", "-", title)
title = re.sub(r"^-|-$", "", title)
yield SearchResult(
id_=link.format(_type, title, _id),
title=_title,
description=result.get("plprogram$shortDescription"),
label=_type,
url=link.format(_type, title, _id),
)
def get_titles(self) -> Titles_T:
title_re = (
r"https://www\.rte\.ie/player"
r"/(?P<type>series|movie)"
r"/(?P<slug>[a-zA-Z0-9_-]+)"
r"/(?P<id>[a-zA-Z0-9_\-=?]+)/?$"
)
try:
kind, _, title_id = (re.match(title_re, self.title).group(i) for i in ("type", "slug", "id"))
except Exception:
raise ValueError("- Could not parse ID from input")
episode = title_id.split("=")[1] if "epguid" in title_id else None
if episode:
episode = self._episode(title_id, episode)
return Series(episode)
elif kind == "movie":
movie = self._movie(title_id)
return Movies(movie)
elif kind == "series":
episodes = self._show(title_id)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
self.token, self.account = self.get_config()
media = title.data["plprogramavailability$media"][0].get("plmedia$publicUrl")
if not media:
raise ValueError("Could not find any streams - is the title still available?")
manifest, self.pid = self.get_manifest(media)
tracks = DASH.from_url(manifest, self.session).to_tracks(language=title.language)
for track in tracks.audio:
role = track.data["dash"]["adaptation_set"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Episode) -> Chapters:
if not title.data.get("rte$chapters"):
return Chapters()
timecodes = [x for x in title.data["rte$chapters"]]
chapters = [Chapter(timestamp=float(x)) for x in timecodes]
if title.data.get("rte$creditStart"):
chapters.append(Chapter(name="Credits", timestamp=float(title.data["rte$creditStart"])))
return chapters
def certificate(self, **_):
return None # will use common privacy cert
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
params = {
"token": self.token,
"account": self.account,
"form": "json",
"schema": "1.0",
}
payload = {
"getWidevineLicense": {
"releasePid": self.pid,
"widevineChallenge": base64.b64encode(challenge).decode("utf-8"),
}
}
r = self.session.post(url=self.license, params=params, json=payload)
if not r.ok:
raise ConnectionError(f"License request failed: {r.text}")
return r.json()["getWidevineLicenseResponse"]["license"]
# Service specific functions
def _movie(self, title: str) -> Movie:
params = {"count": "true", "entries": "true", "byId": title}
data = self._request("/mpx/1uC-gC/rte-prd-prd-all-programs", params=params)["entries"]
return [
Movie(
id_=movie["guid"],
service=self.__class__,
name=movie.get("plprogram$longTitle"),
year=movie.get("plprogram$year"),
language=movie["plprogram$languages"][0] if movie.get("plprogram$languages") else "eng",
data=movie,
)
for movie in data
]
def _show(self, title: str) -> Episode:
entry = self._request("/mpx/1uC-gC/rte-prd-prd-all-movies-series?byGuid={}".format(title))["entries"][0]["id"]
data = self._request("/mpx/1uC-gC/rte-prd-prd-all-programs?bySeriesId={}".format(entry.split("/")[-1]))["entries"]
return [
Episode(
id_=episode.get("guid"),
title=episode.get("plprogram$longTitle"),
season=episode.get("plprogram$tvSeasonNumber") or 0,
number=episode.get("plprogram$tvSeasonEpisodeNumber") or 0,
name=episode.get("description"),
language=episode["plprogram$languages"][0] if episode.get("plprogram$languages") else "eng",
service=self.__class__,
data=episode,
)
for episode in data
if episode["plprogram$programType"] == "episode"
]
def _episode(self, title: str, guid: str) -> Episode:
title = title.split("?")[0]
entry = self._request("/mpx/1uC-gC/rte-prd-prd-all-movies-series?byGuid={}".format(title))["entries"][0]["id"]
data = self._request("/mpx/1uC-gC/rte-prd-prd-all-programs?bySeriesId={}".format(entry.split("/")[-1]))["entries"]
return [
Episode(
id_=episode.get("guid"),
title=episode.get("plprogram$longTitle"),
season=episode.get("plprogram$tvSeasonNumber") or 0,
number=episode.get("plprogram$tvSeasonEpisodeNumber") or 0,
name=episode.get("description"),
language=episode["plprogram$languages"][0] if episode.get("plprogram$languages") else "eng",
service=self.__class__,
data=episode,
)
for episode in data
if episode["plprogram$programType"] == "episode" and episode.get("guid") == guid
]
def get_config(self):
token = self._request("/servicelayer/api/anonymouslogin")["mpx_token"]
account = self._request("/wordpress/wp-content/uploads/standard/web/config.json")["mpx_config"]["account_id"]
return token, account
def get_manifest(self, media_url: str) -> str:
try:
res = self._request(
media_url,
params={
"formats": "MPEG-DASH",
"auth": self.token,
"assetTypes": "default:isl",
"tracking": "true",
"format": "SMIL",
"iu": "/3014/RTE_Player_VOD/Android_Phone/NotRegistered",
"policy": "168602703",
},
)
root = load_xml(res)
video = root.xpath("//switch/video")
manifest = video[0].get("src")
elem = root.xpath("//switch/ref")
value = elem[0].find(".//param[@name='trackingData']").get("value")
pid = re.search(r"pid=([^|]+)", value).group(1)
return manifest, pid
except Exception as e:
raise ValueError(
f"Request for manifest failed: {e}.\n"
"Content may be geo-restricted to IE"
)
def _request(self, api: str, params: dict = None, headers: dict = None) -> Any[dict | str]:
url = urljoin(self.base_url, api)
self.session.headers.update(self.config["headers"])
if params:
self.session.params.update(params)
if headers:
self.session.headers.update(headers)
prep = self.session.prepare_request(Request("GET", url))
response = self.session.send(prep)
if response.status_code != 200:
raise ConnectionError(
f"Status: {response.status_code} - {response.url}\n"
"Content may be geo-restricted to IE"
)
try:
return json.loads(response.content)
except json.JSONDecodeError:
return response.text

7
services/RTE/config.yaml Normal file
View File

@ -0,0 +1,7 @@
headers:
user-agent: Dalvik/2.1.0 (Linux; U; Android 13; SM-A536E Build/RSR1.210722.013.A2)
endpoints:
base_url: https://www.rte.ie
feed: https://feed.entertainment.tv.theplatform.eu
license: https://widevine.entitlement.eu.theplatform.com/wv/web/ModularDrm