From 601a6a55d4759643c660a68ecf35228bf1fa10c3 Mon Sep 17 00:00:00 2001 From: stabbedbybrick <125766685+stabbedbybrick@users.noreply.github.com> Date: Mon, 10 Jun 2024 13:53:23 +0200 Subject: [PATCH] feat(services): Add Pluto TV service --- services/PLUTO/__init__.py | 282 +++++++++++++++++++++++++++++++++++++ services/PLUTO/config.yaml | 7 + 2 files changed, 289 insertions(+) create mode 100644 services/PLUTO/__init__.py create mode 100644 services/PLUTO/config.yaml diff --git a/services/PLUTO/__init__.py b/services/PLUTO/__init__.py new file mode 100644 index 0000000..f365fcf --- /dev/null +++ b/services/PLUTO/__init__.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import re +import uuid +from collections.abc import Generator +from http.cookiejar import CookieJar +from typing import Any, Optional + +import click + +from devine.core.credential import Credential +from devine.core.manifests import DASH, HLS +from devine.core.search_result import SearchResult +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T +from devine.core.tracks import Chapters, Tracks + + +class PLUTO(Service): + """ + \b + Service code for Pluto TV on demand streaming service (https://pluto.tv/) + Credit to @wks_uwu for providing an alternative API, making the codebase much cleaner + + \b + Author: stabbedbybrick + Authorization: None + Robustness: + Widevine: + L3: 720p, AAC2.0 + + \b + Tips: + - Input can be complete title URL or just the path: + SERIES: /series/65ce4e5003fa740013793127/details + EPISODE: /series/65ce4e5003fa740013793127/season/1/episode/662c2af0a9f2d200131ba731 + MOVIE: /movies/635c1e430888bc001ad01a9b/details + - Use --lang LANG_RANGE option to request non-English tracks + + \b + Notes: + - Both DASH(widevine) and HLS(AES) are looked for in the API + - HLS is prioritized over DASH, because the DASH version will sometimes have sync issues + - Pluto use transport streams for HLS, meaning the video and audio are a part of the same stream + As a result, only videos are listed as tracks. But the audio will be included as well. + - With the variations in manifests, and the inconsistency in the API, the language is set as "en" by default + for all tracks, no matter what region you're in. + You can manually set the language in the get_titles() function if you want to change it. + + """ + + ALIASES = ("plu", "plutotv") + TITLE_RE = ( + r"^" + r"(?:https?://(?:www\.)?pluto\.tv(?:/[a-z]{2})?)?" + r"(?:/on-demand)?" + r"/(?Pmovies|series)" + r"/(?P[a-z0-9-]+)" + r"(?:(?:/season/(\d+)/episode/(?P[a-z0-9-]+)))?" + ) + + @staticmethod + @click.command(name="PLUTO", short_help="https://pluto.tv/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx, **kwargs): + return PLUTO(ctx, **kwargs) + + def __init__(self, ctx, title): + self.title = title + super().__init__(ctx) + + def authenticate( + self, + cookies: Optional[CookieJar] = None, + credential: Optional[Credential] = None, + ) -> None: + super().authenticate(cookies, credential) + + self.session.params = { + "appName": "web", + "appVersion": "na", + "clientID": str(uuid.uuid1()), + "deviceDNT": 0, + "deviceId": "unknown", + "clientModelNumber": "na", + "serverSideAds": "false", + "deviceMake": "unknown", + "deviceModel": "web", + "deviceType": "web", + "deviceVersion": "unknown", + "sid": str(uuid.uuid1()), + "drmCapabilities": "widevine:L3", + } + + info = self.session.get(self.config["endpoints"]["auth"]).json() + self.token = info["sessionToken"] + self.region = info["session"].get("activeRegion", "").lower() + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "q": self.title, + "limit": "100", + } + + r = self.session.get( + self.config["endpoints"]["search"].format(query=self.title), + headers={"Authorization": f"Bearer {self.token}"}, + params=params, + ) + r.raise_for_status() + results = r.json() + + for result in results["data"]: + if result.get("type") not in ["timeline", "channel"]: + content = result.get("id") + kind = result.get("type") + kind = "movies" if kind == "movie" else "series" + + yield SearchResult( + id_=f"/{kind}/{content}/details", + title=result.get("name"), + description=result.get("synopsis"), + label=result.get("type"), + url=f"https://pluto.tv/{self.region}/on-demand/{kind}/{content}/details", + ) + + def get_titles(self) -> Titles_T: + try: + kind, content_id, episode_id = ( + re.match(self.TITLE_RE, self.title).group(i) for i in ("type", "id", "episode") + ) + except Exception: + raise ValueError("Could not parse ID from title - is the URL correct?") + + if kind == "series" and episode_id: + r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id)) + if not r.ok: + raise ConnectionError(f"{r.json().get('message')}") + + data = r.json() + return Series( + [ + Episode( + id_=episode.get("_id"), + service=self.__class__, + title=data.get("name"), + season=int(episode.get("season")), + number=int(episode.get("number")), + name=episode.get("name"), + year=None, + language="en", # self.region, + data=episode, + ) + for series in data["seasons"] + for episode in series["episodes"] + if episode.get("_id") == episode_id + ] + ) + + elif kind == "series": + r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id)) + if not r.ok: + raise ConnectionError(f"{r.json().get('message')}") + + data = r.json() + return Series( + [ + Episode( + id_=episode.get("_id"), + service=self.__class__, + title=data.get("name"), + season=int(episode.get("season")), + number=int(episode.get("number")), + name=episode.get("name"), + year=None, + language="en", # self.region, + data=episode, + ) + for series in data["seasons"] + for episode in series["episodes"] + ] + ) + + elif kind == "movies": + url = self.config["endpoints"]["movie"].format(video_id=content_id) + r = self.session.get(url, headers={"Authorization": f"Bearer {self.token}"}) + if not r.ok: + raise ConnectionError(f"{r.json().get('message')}") + + data = r.json() + return Movies( + [ + Movie( + id_=movie.get("_id"), + service=self.__class__, + year=movie.get("slug", "").split("-")[-3], + name=movie.get("name"), + language="en", # self.region, + data=movie, + ) + for movie in data + ] + ) + + def get_tracks(self, title: Title_T) -> Tracks: + url = self.config["endpoints"]["episodes"].format(episode_id=title.id) + episode = self.session.get(url).json() + + sources = next((item.get("sources") for item in episode if not self.bumpers(item.get("name", ""))), None) + + if not sources: + raise ValueError("Unable to find manifest for this title") + + hls = next((x.get("file") for x in sources if x.get("type").lower() == "hls"), None) + dash = next((x.get("file") for x in sources if x.get("type").lower() == "dash"), None) + + if hls: + self.license = None + m3u8_url = hls.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com") + manifest = self.clean_manifest(self.session.get(m3u8_url).text) + tracks = HLS.from_text(manifest, m3u8_url).to_tracks(language=title.language) + + # Remove separate AD audio tracks + for track in tracks.audio: + tracks.audio.remove(track) + + else: + self.license = self.config["endpoints"]["license"] + manifest = dash.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com") + tracks = DASH.from_url(manifest, self.session).to_tracks(language=title.language) + + for track in tracks.audio: + role = track.data["dash"]["adaptation_set"].find("Role") + if role is not None and role.get("value") in ["description", "alternative", "alternate"]: + track.descriptive = True + + return tracks + + def get_chapters(self, title: Title_T) -> Chapters: + return Chapters() + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes: + if not self.license: + return None + + r = self.session.post(url=self.license, data=challenge) + if r.status_code != 200: + raise ConnectionError(r.text) + + return r.content + + # service specific functions + + @staticmethod + def clean_manifest(text: str) -> str: + # Remove fairplay entries + index = text.find('#PLUTO-DRM:ID="fairplay') + if index == -1: + return text + else: + end_of_previous_line = text.rfind("\n", 0, index) + if end_of_previous_line == -1: + return "" + else: + return text[:end_of_previous_line] + + @staticmethod + def bumpers(text: str) -> bool: + ads = ( + "Pluto_TV_OandO", + "_ad", + "creative", + "Bumper", + "Promo", + "WarningCard", + ) + + return any(ad in text for ad in ads) diff --git a/services/PLUTO/config.yaml b/services/PLUTO/config.yaml new file mode 100644 index 0000000..f864d51 --- /dev/null +++ b/services/PLUTO/config.yaml @@ -0,0 +1,7 @@ +endpoints: + auth: https://boot.pluto.tv/v4/start + search: https://service-media-search.clusters.pluto.tv/v1/search + series: https://service-vod.clusters.pluto.tv/v3/vod/series/{season_id}/seasons + episodes: http://api.pluto.tv/v2/episodes/{episode_id}/clips.json + movie: https://service-vod.clusters.pluto.tv/v4/vod/items?ids={video_id} + license: https://service-concierge.clusters.pluto.tv/v1/wv/alt \ No newline at end of file