devine-services/services/PLUTO/__init__.py
2024-10-21 11:54:16 +02:00

290 lines
10 KiB
Python

from __future__ import annotations
import re
import uuid
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Any, Optional
import click
from devine.core.credential import Credential
from devine.core.manifests import DASH, HLS
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapters, Tracks
class PLUTO(Service):
"""
\b
Service code for Pluto TV on demand streaming service (https://pluto.tv/)
Credit to @wks_uwu for providing an alternative API, making the codebase much cleaner
\b
Author: stabbedbybrick
Authorization: None
Robustness:
Widevine:
L3: 720p, AAC2.0
\b
Tips:
- Input can be complete title URL or just the path:
SERIES: /series/65ce4e5003fa740013793127/details
EPISODE: /series/65ce4e5003fa740013793127/season/1/episode/662c2af0a9f2d200131ba731
MOVIE: /movies/635c1e430888bc001ad01a9b/details
- Use --lang LANG_RANGE option to request non-English tracks
\b
Notes:
- Both DASH(widevine) and HLS(AES) are looked for in the API
- HLS is prioritized over DASH, because the DASH version will sometimes have sync issues
- Pluto use transport streams for HLS, meaning the video and audio are a part of the same stream
As a result, only videos are listed as tracks. But the audio will be included as well.
- With the variations in manifests, and the inconsistency in the API, the language is set as "en" by default
for all tracks, no matter what region you're in.
You can manually set the language in the get_titles() function if you want to change it.
"""
ALIASES = ("plu", "plutotv")
TITLE_RE = (
r"^"
r"(?:https?://(?:www\.)?pluto\.tv(?:/[a-z]{2})?)?"
r"(?:/on-demand)?"
r"/(?P<type>movies|series)"
r"/(?P<id>[a-z0-9-]+)"
r"(?:(?:/season/(\d+)/episode/(?P<episode>[a-z0-9-]+)))?"
)
@staticmethod
@click.command(name="PLUTO", short_help="https://pluto.tv/", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return PLUTO(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
self.session.params = {
"appName": "web",
"appVersion": "na",
"clientID": str(uuid.uuid1()),
"deviceDNT": 0,
"deviceId": "unknown",
"clientModelNumber": "na",
"serverSideAds": "false",
"deviceMake": "unknown",
"deviceModel": "web",
"deviceType": "web",
"deviceVersion": "unknown",
"sid": str(uuid.uuid1()),
"drmCapabilities": "widevine:L3",
}
info = self.session.get(self.config["endpoints"]["auth"]).json()
self.token = info["sessionToken"]
self.region = info["session"].get("activeRegion", "").lower()
def search(self) -> Generator[SearchResult, None, None]:
params = {
"q": self.title,
"limit": "100",
}
r = self.session.get(
self.config["endpoints"]["search"].format(query=self.title),
headers={"Authorization": f"Bearer {self.token}"},
params=params,
)
r.raise_for_status()
results = r.json()
for result in results["data"]:
if result.get("type") not in ["timeline", "channel"]:
content = result.get("id")
kind = result.get("type")
kind = "movies" if kind == "movie" else "series"
yield SearchResult(
id_=f"/{kind}/{content}/details",
title=result.get("name"),
description=result.get("synopsis"),
label=result.get("type"),
url=f"https://pluto.tv/{self.region}/on-demand/{kind}/{content}/details",
)
def get_titles(self) -> Titles_T:
try:
kind, content_id, episode_id = (
re.match(self.TITLE_RE, self.title).group(i) for i in ("type", "id", "episode")
)
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if kind == "series" and episode_id:
r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id))
if not r.ok:
raise ConnectionError(f"{r.json().get('message')}")
data = r.json()
return Series(
[
Episode(
id_=episode.get("_id"),
service=self.__class__,
title=data.get("name"),
season=int(episode.get("season")),
number=int(episode.get("number")),
name=episode.get("name"),
year=None,
language="en", # self.region,
data=episode,
)
for series in data["seasons"]
for episode in series["episodes"]
if episode.get("_id") == episode_id
]
)
elif kind == "series":
r = self.session.get(self.config["endpoints"]["series"].format(season_id=content_id))
if not r.ok:
raise ConnectionError(f"{r.json().get('message')}")
data = r.json()
return Series(
[
Episode(
id_=episode.get("_id"),
service=self.__class__,
title=data.get("name"),
season=int(episode.get("season")),
number=int(episode.get("number")),
name=episode.get("name"),
year=self.year(episode),
language="en", # self.region,
data=episode,
)
for series in data["seasons"]
for episode in series["episodes"]
]
)
elif kind == "movies":
url = self.config["endpoints"]["movie"].format(video_id=content_id)
r = self.session.get(url, headers={"Authorization": f"Bearer {self.token}"})
if not r.ok:
raise ConnectionError(f"{r.json().get('message')}")
data = r.json()
return Movies(
[
Movie(
id_=movie.get("_id"),
service=self.__class__,
name=movie.get("name"),
language="en", # self.region,
data=movie,
year=self.year(movie),
)
for movie in data
]
)
def get_tracks(self, title: Title_T) -> Tracks:
url = self.config["endpoints"]["episodes"].format(episode_id=title.id)
episode = self.session.get(url).json()
sources = next((item.get("sources") for item in episode if not self.bumpers(item.get("name", ""))), None)
if not sources:
raise ValueError("Unable to find manifest for this title")
hls = next((x.get("file") for x in sources if x.get("type").lower() == "hls"), None)
dash = next((x.get("file") for x in sources if x.get("type").lower() == "dash"), None)
if hls:
self.license = None
m3u8_url = hls.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com")
manifest = self.clean_manifest(self.session.get(m3u8_url).text)
tracks = HLS.from_text(manifest, m3u8_url).to_tracks(language=title.language)
# Remove separate AD audio tracks
for track in tracks.audio:
tracks.audio.remove(track)
else:
self.license = self.config["endpoints"]["license"]
manifest = dash.replace("https://siloh.pluto.tv", "http://silo-hybrik.pluto.tv.s3.amazonaws.com")
tracks = DASH.from_url(manifest, self.session).to_tracks(language=title.language)
for track in tracks.audio:
role = track.data["dash"]["adaptation_set"].find("Role")
if role is not None and role.get("value") in ["description", "alternative", "alternate"]:
track.descriptive = True
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
return Chapters()
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
if not self.license:
return None
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
raise ConnectionError(r.text)
return r.content
# service specific functions
@staticmethod
def clean_manifest(text: str) -> str:
# Remove fairplay entries
index = text.find('#PLUTO-DRM:ID="fairplay')
if index == -1:
return text
else:
end_of_previous_line = text.rfind("\n", 0, index)
if end_of_previous_line == -1:
return ""
else:
return text[:end_of_previous_line]
@staticmethod
def bumpers(text: str) -> bool:
ads = (
"Pluto_TV_OandO",
"_ad",
"creative",
"Bumper",
"Promo",
"WarningCard",
)
return any(ad in text for ad in ads)
@staticmethod
def year(data: dict) -> Optional[int]:
title_year = (int(match.group(1)) if (match := re.search(r"\((\d{4})\)", data.get("name", ""))) else None)
slug_year = (int(match.group(1)) if (match := re.search(r"\b(\d{4})\b", data.get("slug", ""))) else None)
return None if title_year else slug_year