feat(services): Add TUBI service

This commit is contained in:
stabbedbybrick 2024-05-20 13:13:15 +02:00
parent 1223210040
commit be3020ed14
2 changed files with 215 additions and 0 deletions

210
services/TUBI/__init__.py Normal file
View File

@ -0,0 +1,210 @@
from __future__ import annotations
import hashlib
import re
from collections.abc import Generator
from http.cookiejar import CookieJar
from typing import Any, Optional
import click
import m3u8
from devine.core.credential import Credential
from devine.core.downloaders import requests
from devine.core.manifests import HLS
from devine.core.search_result import SearchResult
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Subtitle, Track, Tracks
from langcodes import Language
class TUBI(Service):
"""
Service code for TubiTV streaming service (https://tubitv.com/)
\b
Author: stabbedbybrick
Authorization: Cookies
Robustness:
Widevine:
L3: 720p, AAC2.0
\b
Tips:
- Input can be complete title URL or just the path:
/series/300001423/gotham
/tv-shows/200024793/s01-e01-pilot
/movies/589279/the-outsiders
"""
TITLE_RE = r"^(?:https?://(?:www\.)?tubitv\.com?)?/(?P<type>movies|series|tv-shows)/(?P<id>[a-z0-9-]+)"
GEOFENCE = ("us", "ca",)
@staticmethod
@click.command(name="TUBI", short_help="https://tubitv.com/", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return TUBI(ctx, **kwargs)
def __init__(self, ctx, title):
self.title = title
super().__init__(ctx)
self.license = None
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.session.cookies.update(cookies)
def search(self) -> Generator[SearchResult, None, None]:
params = {
"isKidsMode": "false",
"useLinearHeader": "true",
"isMobile": "false",
}
r = self.session.get(self.config["endpoints"]["search"].format(query=self.title), params=params)
r.raise_for_status()
results = r.json()
for result in results:
label = "series" if result["type"] == "s" else "movies" if result["type"] == "v" else result["type"]
title = (
result.get("title", "")
.lower()
.replace(" ", "-")
.replace(":", "")
.replace("(", "")
.replace(")", "")
.replace(".", "")
)
yield SearchResult(
id_=f"https://tubitv.com/{label}/{result.get('id')}/{title}",
title=result.get("title"),
description=result.get("description"),
label=label,
url=f"https://tubitv.com/{label}/{result.get('id')}/{title}",
)
def get_titles(self) -> Titles_T:
try:
kind, content_id = (re.match(self.TITLE_RE, self.title).group(i) for i in ("type", "id"))
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if kind == "tv-shows":
content = self.session.get(self.config["endpoints"]["content"].format(content_id=content_id))
content.raise_for_status()
series_id = "0" + content.json().get("series_id")
data = self.session.get(self.config["endpoints"]["content"].format(content_id=series_id)).json()
return Series(
[
Episode(
id_=episode["id"],
service=self.__class__,
title=data["title"],
season=int(season["id"]),
number=int(episode["episode_number"]),
name=episode["title"].split("-")[1],
year=data["year"],
language=Language.find(episode.get("lang", "en")).to_alpha3(),
data=episode,
)
for season in data["children"]
for episode in season["children"]
if episode["id"] == content_id
]
)
if kind == "series":
r = self.session.get(self.config["endpoints"]["content"].format(content_id=content_id))
r.raise_for_status()
data = r.json()
return Series(
[
Episode(
id_=episode["id"],
service=self.__class__,
title=data["title"],
season=int(season["id"]),
number=int(episode["episode_number"]),
name=episode["title"].split("-")[1],
year=data["year"],
language=Language.find(episode.get("lang", "en")).to_alpha3(),
data=episode,
)
for season in data["children"]
for episode in season["children"]
]
)
if kind == "movies":
r = self.session.get(self.config["endpoints"]["content"].format(content_id=content_id))
r.raise_for_status()
data = r.json()
return Movies(
[
Movie(
id_=data["id"],
service=self.__class__,
year=data["year"],
name=data["title"],
language=Language.find(data.get("lang", "en")).to_alpha3(),
data=data,
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
if not title.data.get("video_resources"):
raise ValueError("No video resources found. Title is either missing or geolocation is incorrect.")
self.manifest = title.data["video_resources"][0]["manifest"]["url"]
self.license = title.data["video_resources"][0].get("license_server", {}).get("url")
tracks = HLS.from_url(url=self.manifest, session=self.session).to_tracks(language=title.language)
for track in tracks:
master = m3u8.loads(self.session.get(track.url).text, uri=track.url)
track.url = master.segments[0].uri
track.descriptor = Track.Descriptor.URL
if title.data.get("subtitles"):
tracks.add(
Subtitle(
id_=hashlib.md5(title.data["subtitles"][0]["url"].encode()).hexdigest()[0:6],
url=title.data["subtitles"][0]["url"],
codec=Subtitle.Codec.from_mime(title.data["subtitles"][0]["url"][-3:]),
language=title.data["subtitles"][0].get("lang_alpha3", title.language),
downloader=requests,
is_original_lang=True,
forced=False,
sdh=False,
)
)
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_: Any) -> str:
return None
def get_widevine_license(self, challenge: bytes, **_: Any) -> bytes:
if not self.license:
return None
r = self.session.post(url=self.license, data=challenge)
if r.status_code != 200:
raise ConnectionError(r.text)
return r.content

View File

@ -0,0 +1,5 @@
endpoints:
content: https://tubitv.com/oz/videos/{content_id}/content?video_resources=hlsv6_widevine_nonclearlead&video_resources=hlsv6
search: https://tubitv.com/oz/search/{query}