devine-services/services/WTCH/__init__.py

238 lines
8.6 KiB
Python
Raw Permalink Normal View History

import re
import os
import json
import click
from urllib.parse import urljoin
from typing import Optional, Union
from http.cookiejar import CookieJar
from bs4 import BeautifulSoup
from devine.core.config import config
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks
from devine.core.credential import Credential
from devine.core.manifests import HLS
from devine.core.tracks.attachment import Attachment
class WTCH(Service):
"""
Service code for watchertv.com
Author: @sp4rk.y
Authorization: Cookies or Credentials
Security: None
"""
TITLE_RE = r"^(?:https?://(?:www\.)?watchertv\.com/)([^/]+)(?:/.*)?$"
SERIES_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
@staticmethod
@click.command(name="WTCH", short_help="https://watchertv.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return WTCH(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
# Use the URL from the config
response = self.session.post(
self.config["endpoints"]["login_url"],
data=login_data,
allow_redirects=False,
)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info("No login credentials provided, proceeding without authentication")
def _get_authenticity_token(self):
signin_page = self.session.get(self.config["endpoints"]["login_url"]).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.SERIES_RE, self.title)
if match:
title_id = match.group(1)
else:
title_id = self.title
base_url = self.config["endpoints"]["episode_metadata_url"].format(title_id=title_id)
episodes = []
season_urls = []
response = self.session.get(base_url)
soup = BeautifulSoup(response.text, "html.parser")
season_select = soup.find("select", class_="js-switch-season")
if season_select:
for option in season_select.find_all("option"):
season_url = urljoin(response.url, option["value"])
season_urls.append(season_url)
else:
season_urls.append(base_url)
for season_url in season_urls:
params = {
"page": 1,
"per_page": 1000,
"html": "1",
"ajax": "1",
}
season_response = self.session.get(season_url, params=params)
season_soup = BeautifulSoup(season_response.text, "html.parser")
season_number_match = re.search(r"/season:(\d+)", season_url)
if season_number_match:
season_number = int(season_number_match.group(1))
else:
season_number = None
items = season_soup.find_all("div", class_="browse-item-card")
if not items:
continue
for item in items:
episode_link = item.find("a", class_="browse-item-link")
if episode_link:
episode_url = episode_link["href"]
episode_data_json = episode_link.get("data-track-event-properties")
if episode_data_json:
episode_data = json.loads(episode_data_json)
else:
continue
episode_id = episode_data.get("id")
episode_title = episode_data.get("label")
episode_number_elem = item.find("span", class_="media-identifier media-episode")
if episode_number_elem:
episode_number_text = episode_number_elem.text.strip()
episode_number_match = re.search(r"Episode (\d+)", episode_number_text)
if episode_number_match:
episode_number = int(episode_number_match.group(1))
else:
continue
else:
continue
show_title = self.title.split("/")[-1].replace("-", " ").title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None,
data={"url": episode_url},
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
tracks = Tracks()
episode_url = title.data["url"]
episode_page = self.session.get(episode_url).text
embed_url_match = re.search(self.config["endpoints"]["embed_url_regex"], episode_page)
if not embed_url_match:
raise ValueError("Could not find embed_url in the episode page")
embed_url = embed_url_match.group(1).replace("&amp;", "&")
headers = {k: v.format(episode_url=episode_url) for k, v in self.config["headers"].items()}
# Fetch the embed page content
embed_page = self.session.get(embed_url, headers=headers).text
# Extract the config URL using regex
config_url_match = re.search(self.config["endpoints"]["config_url_regex"], embed_page)
if config_url_match:
config_url = config_url_match.group(1).replace("\\u0026", "&")
else:
raise ValueError("Config URL not found on the embed page.")
config_data = self.session.get(config_url, headers=headers).json()
# Retrieve the CDN information from the config data
cdns = config_data["request"]["files"]["hls"]["cdns"]
default_cdn = config_data["request"]["files"]["hls"]["default_cdn"]
# Select the default CDN or fall back to the first available one
cdn = cdns.get(default_cdn) or next(iter(cdns.values()))
# Generate the MPD URL by replacing 'playlist.json' with 'playlist.mpd'
mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd")
tracks = HLS.from_url(url=mpd_url).to_tracks(language="en")
# Extract thumbnail URL from config_data
thumbnail_base_url = config_data["video"]["thumbs"]["base"]
thumbnail_url = f"{thumbnail_base_url}"
thumbnail_response = self.session.get(thumbnail_url)
if thumbnail_response.status_code == 200:
thumbnail_filename = f"{title.id}_thumbnail.jpg"
thumbnail_path = config.directories.temp / thumbnail_filename
# Ensure the directory exists
os.makedirs(config.directories.temp, exist_ok=True)
# Save the thumbnail file
with open(thumbnail_path, "wb") as f:
f.write(thumbnail_response.content)
# Create an Attachment object
thumbnail_attachment = Attachment(
path=thumbnail_path,
name=thumbnail_filename,
mime_type="image/jpeg",
description="Thumbnail",
)
# Add the attachment to the tracks
tracks.attachments.append(thumbnail_attachment)
return tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
# No DRM
pass