devine-services/services/DROP/__init__.py

216 lines
8.0 KiB
Python

import re
import os
import json
import click
from typing import Optional, Union
from http.cookiejar import CookieJar
from bs4 import BeautifulSoup
from devine.core.config import config
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks
from devine.core.credential import Credential
from devine.core.manifests import HLS
from devine.core.tracks.attachment import Attachment
class DROP(Service):
"""
Service code for DROPOUT.tv
Author: @sp4rk.y
Authorization: Cookies or Credentials
Security: None
"""
TITLE_RE = r"^(?:https?://(?:www\.)?dropout\.tv/)([^/]+)(?:/.*)?$"
SERIES_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
LOGIN_URL = "https://www.dropout.tv/login"
@staticmethod
@click.command(name="DROP", short_help="https://www.dropout.tv", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return DROP(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
response = self.session.post(self.LOGIN_URL, data=login_data, allow_redirects=False)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info("No login credentials provided, proceeding without authentication")
def _get_authenticity_token(self):
signin_page = self.session.get(self.LOGIN_URL).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.SERIES_RE, self.title)
if match:
title_id = match.group(1)
else:
title_id = self.title
url = f"https://www.dropout.tv/{title_id}"
response = self.session.get(url)
soup = BeautifulSoup(response.text, "html.parser")
episodes = []
season_urls = []
season_select = soup.find("select", class_="js-switch-season")
if season_select:
for option in season_select.find_all("option"):
season_urls.append(option["value"])
for season_url in season_urls:
season_response = self.session.get(season_url)
season_soup = BeautifulSoup(season_response.text, "html.parser")
season_number = int(re.search(r"/season:(\d+)", season_url).group(1))
for item in season_soup.find_all("div", class_="browse-item-card"):
episode_link = item.find("a", class_="browse-item-link")
if episode_link:
episode_url = episode_link["href"]
episode_data = json.loads(episode_link["data-track-event-properties"])
episode_id = episode_data["id"]
episode_title = episode_data["label"]
episode_number_elem = item.find("span", class_="media-identifier media-episode")
if episode_number_elem:
episode_number_match = re.search(r"Episode (\d+)", episode_number_elem.text)
if episode_number_match:
episode_number = int(episode_number_match.group(1))
else:
continue
else:
continue
show_title = self.title.split("/")[-1].replace("-", " ").title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None,
data={"url": episode_url},
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
tracks = Tracks()
episode_url = title.data["url"]
episode_page = self.session.get(episode_url).text
embed_url_match = re.search(r'embed_url:\s*"([^"]+)"', episode_page)
if not embed_url_match:
raise ValueError("Could not find embed_url in the episode page")
embed_url = embed_url_match.group(1).replace("&amp;", "&")
headers = {
"Referer": episode_url,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
# Fetch the embed page content
embed_page = self.session.get(embed_url, headers=headers).text
# Extract the config URL using regex
config_url_match = re.search(r'config_url":"([^"]+)"', embed_page)
if config_url_match:
config_url = config_url_match.group(1).replace("\\u0026", "&")
else:
raise ValueError("Config URL not found on the embed page.")
config_data = self.session.get(config_url, headers=headers).json()
# Retrieve the CDN information from the config data
cdns = config_data["request"]["files"]["hls"]["cdns"]
default_cdn = config_data["request"]["files"]["hls"]["default_cdn"]
# Select the default CDN or fall back to the first available one
cdn = cdns.get(default_cdn) or next(iter(cdns.values()))
# Generate the MPD URL by replacing 'playlist.json' with 'playlist.mpd'
mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd")
tracks = HLS.from_url(url=mpd_url).to_tracks(language="en")
# Extract thumbnail URL from config_data
thumbnail_base_url = config_data["video"]["thumbs"]["base"]
thumbnail_url = f"{thumbnail_base_url}"
thumbnail_response = self.session.get(thumbnail_url)
if thumbnail_response.status_code == 200:
thumbnail_filename = f"{title.id}_thumbnail.jpg"
thumbnail_path = config.directories.temp / thumbnail_filename
# Ensure the directory exists
os.makedirs(config.directories.temp, exist_ok=True)
# Save the thumbnail file
with open(thumbnail_path, "wb") as f:
f.write(thumbnail_response.content)
# Create an Attachment object
thumbnail_attachment = Attachment(
path=thumbnail_path, name=thumbnail_filename, mime_type="image/jpeg", description="Thumbnail"
)
# Add the attachment to the tracks
tracks.attachments.append(thumbnail_attachment)
return tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
pass