from __future__ import annotations import re import uuid from collections.abc import Generator from http.cookiejar import CookieJar from typing import Any, Optional import click from devine.core.credential import Credential from devine.core.manifests import DASH, HLS from devine.core.search_result import SearchResult from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from devine.core.tracks import Chapters, Tracks class PLUTO(Service): """ \b Service code for Pluto TV on demand streaming service (https://pluto.tv/) Credit to @wks_uwu for providing an alternative API, making the codebase much cleaner \b Author: stabbedbybrick Authorization: None Robustness: Widevine: L3: 720p, AAC2.0 \b Tips: - Input can be complete title URL or just the path: SERIES: /series/65ce4e5003fa740013793127/details EPISODE: /series/65ce4e5003fa740013793127/season/1/episode/662c2af0a9f2d200131ba731 MOVIE: /movies/635c1e430888bc001ad01a9b/details - Use --lang LANG_RANGE option to request non-English tracks \b Notes: - Both DASH(widevine) and HLS(AES) are looked for in the API - HLS is prioritized over DASH, because the DASH version will sometimes have sync issues - Pluto use transport streams for HLS, meaning the video and audio are a part of the same stream As a result, only videos are listed as tracks. But the audio will be included as well. - With the variations in manifests, and the inconsistency in the API, the language is set as "en" by default for all tracks, no matter what region you're in. You can manually set the language in the get_titles() function if you want to change it. """ ALIASES = ("plu", "plutotv") TITLE_RE = ( r"^" r"(?:https?://(?:www\.)?pluto\.tv(?:/[a-z]{2})?)?" r"(?:/on-demand)?" r"/(?Pmovies|series)" r"/(?P[a-z0-9-]+)" r"(?:(?:/season/(\d+)/episode/(?P[a-z0-9-]+)))?" ) @staticmethod @click.command(name="PLUTO", short_help="https://pluto.tv/", help=__doc__) @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return PLUTO(ctx, **kwargs) def __init__(self, ctx, title): self.title = title super().__init__(ctx) def authenticate( self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None, ) -> None: super().authenticate(cookies, credential) self.session.params = { "appName": "web", "appVersion": "na", "clientID": str(uuid.uuid1()), "deviceDNT": 0, "deviceId": "unknown", "clientModelNumber": "na", "serverSideAds": "false", "deviceMake": "unknown", "deviceModel": "web", "deviceType": "web", "deviceVersion": "unknown", "sid": str(uuid.uuid1()), "drmCapabilities": "widevine:L3", } info = self.session.get(self.config["endpoints"]["auth"]).json() self.token = info["sessionToken"] self.region = info["session"].get("activeRegion", "").lower() def search(self) -> Generator[SearchResult, None, None]: params = { "q": self.title, "limit": "100", } r = self.session.get( self.config["endpoints"]["search"].format(query=self.title), headers={"Authorization": f"Bearer {self.token}"}, params=params, ) r.raise_for_status() results = r.json() for result in results["data"]: if result.get("type") not in ["timeline", "channel"]: content = result.get("id") kind = result.get("type") kind = "movies" if kind == "movie" else "series" yield SearchResult( id_=f"/{kind}/{content}/details", title=result.get("name"), description=result.get("synopsis"), label=result.get("type"), url=f"https://pluto.tv/{self.region}/on-demand/{kind}/{content}/details", ) def get_titles(self) -> Titles_T: try: kind, content_id, episode_id = ( re.match(self.TITLE_RE, self.title).group(i) for i in ("type", "id", "episode") ) except Exception: raise ValueError("Could not parse ID from title - is the URL correct?") if kind == "series" and episode_id: r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id)) if not r.ok: raise ConnectionError(f"{r.json().get('message')}") data = r.json() return Series( [ Episode( id_=episode.get("_id"), service=self.__class__, title=data.get("name"), season=int(episode.get("season")), number=int(episode.get("number")), name=episode.get("name"), year=None, language="en", # self.region, data=episode, ) for series in data["seasons"] for episode in series["episodes"] if episode.get("_id") == episode_id ] ) elif kind == "series": r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id)) if not r.ok: raise ConnectionError(f"{r.json().get('message')}") data = r.json() return Series( [ Episode( id_=episode.get("_id"), service=self.__class__, title=data.get("name"), season=int(episode.get("season")), number=int(episode.get("number")), name=episode.get("name"), year=None, language="en", # self.region, data=episode, ) for series in data["seasons"] for episode in series["episodes"] ] ) elif kind == "movies": url = self.config["endpoints"]["movie"].format(video_id=content_id) r = self.session.get(url, headers={"Authorization": f"Bearer {self.token}"}) if not r.ok: raise ConnectionError(f"{r.json().get('message')}") data = r.json() return Movies( [ Movie( id_=movie.get("_id"), service=self.__class__, year=movie.get("slug", "").split("-")[-3], name=movie.get("name"), language="en", # self.region, data=movie, ) for movie in data ] ) def get_tracks(self, title: Title_T) -> Tracks: url = self.config["endpoints"]["episodes"].format(episode_id=title.id) episode = self.session.get(url).json() sources = next((item.get("sources") for item in episode if not self.bumpers(item.get("name", ""))), None) if not sources: raise ValueError("Unable to find manifest for this title") hls = next((x.get("file") for x in sources if x.get("type").lower() == "hls"), None) dash = next((x.get("file") for x in sources if x.get("type").lower() == "dash"), None) if hls: self.license = None m3u8_url = hls.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com") manifest = self.clean_manifest(self.session.get(m3u8_url).text) tracks = HLS.from_text(manifest, m3u8_url).to_tracks(language=title.language) # Remove separate AD audio tracks for track in tracks.audio: tracks.audio.remove(track) else: self.license = self.config["endpoints"]["license"] manifest = dash.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com") tracks = DASH.from_url(manifest, self.session).to_tracks(language=title.language) for track in tracks.audio: role = track.data["dash"]["adaptation_set"].find("Role") if role is not None and role.get("value") in ["description", "alternative", "alternate"]: track.descriptive = True return tracks def get_chapters(self, title: Title_T) -> Chapters: return Chapters() def get_widevine_service_certificate(self, **_: Any) -> str: return None def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes: if not self.license: return None r = self.session.post(url=self.license, data=challenge) if r.status_code != 200: raise ConnectionError(r.text) return r.content # service specific functions @staticmethod def clean_manifest(text: str) -> str: # Remove fairplay entries index = text.find('#PLUTO-DRM:ID="fairplay') if index == -1: return text else: end_of_previous_line = text.rfind("\n", 0, index) if end_of_previous_line == -1: return "" else: return text[:end_of_previous_line] @staticmethod def bumpers(text: str) -> bool: ads = ( "Pluto_TV_OandO", "_ad", "creative", "Bumper", "Promo", "WarningCard", ) return any(ad in text for ad in ads)