devine-services/services/DROP/__init__.py

267 lines
9.0 KiB
Python

import re
import json
import click
from typing import Optional, Union
from http.cookiejar import CookieJar
from bs4 import BeautifulSoup
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks, Subtitle, Video, Audio
from devine.core.credential import Credential
from devine.core.manifests import HLS
class DROP(Service):
"""
Service code for DROPOUT.tv
Authorization: Cookies or Credentials
"""
# Updated regex to capture anything between / and /season or end of the URL
TITLE_RE = r"^(?:https?://(?:www\.)?dropout\.tv/)([^/]+)(?:/.*)?$"
SERIES_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = (
r"https?://(?:www\.)?dropout\.tv/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
)
LOGIN_URL = "https://www.dropout.tv/login"
@staticmethod
@click.command(name="DROP", short_help="https://www.dropout.tv", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return DROP(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
response = self.session.post(
self.LOGIN_URL, data=login_data, allow_redirects=False
)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info(
"No login credentials provided, proceeding without authentication"
)
def _get_authenticity_token(self):
signin_page = self.session.get(self.LOGIN_URL).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.SERIES_RE, self.title)
if match:
title_id = match.group(1)
else:
title_id = self.title
url = f"https://www.dropout.tv/{title_id}"
response = self.session.get(url)
soup = BeautifulSoup(response.text, "html.parser")
episodes = []
season_urls = []
# Extract season URLs
season_select = soup.find("select", class_="js-switch-season")
if season_select:
for option in season_select.find_all("option"):
season_urls.append(option["value"])
for season_url in season_urls:
season_response = self.session.get(season_url)
season_soup = BeautifulSoup(season_response.text, "html.parser")
season_number = int(re.search(r"/season:(\d+)", season_url).group(1))
for item in season_soup.find_all("div", class_="browse-item-card"):
episode_link = item.find("a", class_="browse-item-link")
if episode_link:
episode_url = episode_link["href"]
episode_data = json.loads(
episode_link["data-track-event-properties"]
)
episode_id = episode_data["id"]
episode_title = episode_data["label"]
episode_number_elem = item.find(
"span", class_="media-identifier media-episode"
)
episode_number = (
int(
re.search(r"Episode (\d+)", episode_number_elem.text).group(
1
)
)
if episode_number_elem
else None
)
show_title = self.title.split("/")[-1].replace("-", " ").title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None, # You might want to extract this from somewhere else
data={"url": episode_url},
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
# Fetch the episode page
episode_url = title.data["url"]
episode_page = self.session.get(episode_url).text
# Extract the embed_url
embed_url_match = re.search(r'embed_url:\s*"([^"]+)"', episode_page)
if not embed_url_match:
raise ValueError("Could not find embed_url in the episode page")
embed_url = embed_url_match.group(1)
embed_url = embed_url.replace("&amp;", "&") # Fix HTML entities
# Prepare headers for the embed page request
headers = {
"Referer": episode_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
# Fetch the embed page with headers
embed_page = self.session.get(embed_url, headers=headers).text
# Extract the config_url
config_url_match = re.search(r'config_url":"([^"]+)"', embed_page)
if not config_url_match:
raise ValueError("Could not find config_url in the embed page")
config_url = config_url_match.group(1).replace("\\u0026", "&")
# Fetch the config data
config_data = self.session.get(config_url, headers=headers).json()
# Get the HLS playlist URL
hls_url = config_data["request"]["files"]["hls"]["cdns"][
"akfire_interconnect_quic"
]["url"]
# Fetch and parse the HLS playlist
hls_tracks = HLS.from_url(url=hls_url, session=self.session).to_tracks(
language="en"
)
tracks = Tracks()
# Handle multiple video tracks
for video in hls_tracks.videos:
tracks.add(
Video(
id_=f"video_{video.id}",
url=video.url,
codec=video.codec,
language=video.language,
bitrate=video.bitrate,
width=video.width,
height=video.height,
fps=video.fps,
)
)
# Handle multiple audio tracks
for audio in hls_tracks.audio:
tracks.add(
Audio(
id_=f"audio_{audio.id}",
url=audio.url,
codec=audio.codec,
language=audio.language,
bitrate=audio.bitrate,
)
)
# Handle subtitles (if any)
for subtitle in hls_tracks.subtitles:
tracks.add(
Subtitle(
id_=f"subtitle_{subtitle.id}",
url=subtitle.url,
codec=subtitle.codec,
language=subtitle.language,
)
)
return Tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
pass
def map_video_codec(self, codec_string):
codec_map = {
"avc1": Video.Codec.AVC,
"hevc": Video.Codec.HEVC,
"vp9": Video.Codec.VP9,
"av1": Video.Codec.AV1,
}
for key, value in codec_map.items():
if codec_string.startswith(key):
return value
return None
def map_audio_codec(self, codec_string):
codec_map = {
"mp4a": Audio.Codec.AAC,
"ec-3": Audio.Codec.EC3,
"ac-3": Audio.Codec.AC3,
"opus": Audio.Codec.OPUS,
}
for key, value in codec_map.items():
if codec_string.startswith(key):
return value
return None