devine-services/services/DROP/__init__.py

175 lines
6.5 KiB
Python

import re
import json
import click
from typing import Optional, Union
from http.cookiejar import CookieJar
from bs4 import BeautifulSoup
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks
from devine.core.credential import Credential
from devine.core.manifests import HLS
class DROP(Service):
"""
Service code for DROPOUT.tv
Author: @sp4rk.y
Authorization: Cookies or Credentials
Security: None
"""
TITLE_RE = r"^(?:https?://(?:www\.)?dropout\.tv/)([^/]+)(?:/.*)?$"
SERIES_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
LOGIN_URL = "https://www.dropout.tv/login"
@staticmethod
@click.command(name="DROP", short_help="https://www.dropout.tv", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return DROP(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
response = self.session.post(self.LOGIN_URL, data=login_data, allow_redirects=False)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info("No login credentials provided, proceeding without authentication")
def _get_authenticity_token(self):
signin_page = self.session.get(self.LOGIN_URL).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.SERIES_RE, self.title)
if match:
title_id = match.group(1)
else:
title_id = self.title
url = f"https://www.dropout.tv/{title_id}"
response = self.session.get(url)
soup = BeautifulSoup(response.text, "html.parser")
episodes = []
season_urls = []
# Extract season URLs
season_select = soup.find("select", class_="js-switch-season")
if season_select:
for option in season_select.find_all("option"):
season_urls.append(option["value"])
for season_url in season_urls:
season_response = self.session.get(season_url)
season_soup = BeautifulSoup(season_response.text, "html.parser")
season_number = int(re.search(r"/season:(\d+)", season_url).group(1))
for item in season_soup.find_all("div", class_="browse-item-card"):
episode_link = item.find("a", class_="browse-item-link")
if episode_link:
episode_url = episode_link["href"]
episode_data = json.loads(episode_link["data-track-event-properties"])
episode_id = episode_data["id"]
episode_title = episode_data["label"]
episode_number_elem = item.find("span", class_="media-identifier media-episode")
episode_number = (
int(re.search(r"Episode (\d+)", episode_number_elem.text).group(1))
if episode_number_elem
else None
)
show_title = self.title.split("/")[-1].replace("-", " ").title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None, # You might want to extract this from somewhere else
data={"url": episode_url},
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
tracks = Tracks()
episode_url = title.data["url"]
episode_page = self.session.get(episode_url).text
embed_url_match = re.search(r'embed_url:\s*"([^"]+)"', episode_page)
if not embed_url_match:
raise ValueError("Could not find embed_url in the episode page")
embed_url = embed_url_match.group(1).replace("&amp;", "&")
headers = {
"Referer": episode_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
# This is dirty as fuck, but it works.
embed_page = self.session.get(embed_url, headers=headers).text
config_url_match = re.search(r'config_url":"([^"]+)"', embed_page)
config_url = config_url_match.group(1).replace("\\u0026", "&")
config_data = self.session.get(config_url, headers=headers).json()
cdns = config_data["request"]["files"]["hls"]["cdns"]
default_cdn = config_data["request"]["files"]["hls"]["default_cdn"]
cdn = cdns.get(default_cdn) or next(iter(cdns.values()))
mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd")
tracks = HLS.from_url(url=mpd_url).to_tracks(language="en")
return tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
pass