devine-services/services/DROP/__init__.py

153 lines
5.7 KiB
Python

import re
from typing import Optional, Union
from http.cookiejar import CookieJar
import json
from bs4 import BeautifulSoup
import click
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks, Subtitle
from devine.core.manifests import DASH
from devine.core.credential import Credential
class DROP(Service):
"""
Service code for DROPOUT.tv
Authorization: Cookies or Credentials
"""
# Updated regex to capture anything between / and /season or end of the URL
SERIES_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = r"https?://(?:www\.)?dropout\.tv/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
LOGIN_URL = "https://www.dropout.tv/login"
@staticmethod
@click.command(name="DROP", short_help="https://www.dropout.tv", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return DROP(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
response = self.session.post(self.LOGIN_URL, data=login_data, allow_redirects=False)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info("No login credentials provided, proceeding without authentication")
def _get_authenticity_token(self):
signin_page = self.session.get(self.LOGIN_URL).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.TITLE_RE, self.title)
if match:
title_id = match.group("id")
else:
title_id = self.title
url = f"https://www.dropout.tv/{title_id}"
response = self.session.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
episodes = []
season_urls = []
# Extract season URLs
season_select = soup.find('select', class_='js-switch-season')
if season_select:
for option in season_select.find_all('option'):
season_urls.append(option['value'])
for season_url in season_urls:
season_response = self.session.get(season_url)
season_soup = BeautifulSoup(season_response.text, 'html.parser')
season_number = int(re.search(r'/season:(\d+)', season_url).group(1))
for item in season_soup.find_all('div', class_='browse-item-card'):
episode_link = item.find('a', class_='browse-item-link')
if episode_link:
episode_url = episode_link['href']
episode_data = json.loads(episode_link['data-track-event-properties'])
episode_id = episode_data['id']
episode_title = episode_data['label']
episode_number_elem = item.find('span', class_='media-identifier media-episode')
episode_number = int(re.search(r'Episode (\d+)', episode_number_elem.text).group(1)) if episode_number_elem else None
show_title = self.title.split('/')[-1].replace('-', ' ').title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None, # You might want to extract this from somewhere else
data={'url': episode_url}
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
vimeo_id = title.data["vimeo_id"]
config_url = f"https://player.vimeo.com/video/{vimeo_id}/config"
config_data = self.session.get(config_url).json()
hls_url = config_data['request']['files']['hls']['cdns']['fastly_skyfire']['url']
tracks = DASH.from_url(url=hls_url, session=self.session).to_tracks()
# Add subtitles if available
for text_track in config_data['request'].get('text_tracks', []):
if text_track['kind'] == 'captions':
tracks.add(Subtitle(
id_=text_track['id'],
url=text_track['url'],
codec=Subtitle.Codec.VTT,
language=text_track['lang'],
is_original_lang=text_track['lang'] == config_data['video']['lang'],
))
return tracks
def get_chapters(self, title):
# Implement if DROPOUT.tv provides chapter information
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
# Implement the logic to fetch the Widevine license
# This might involve making a request to a license server
pass