(WTCH): add new service integration for watchertv.com

Introduce a new service class WTCH to support watchertv.com.
Includes authentication, title retrieval, and track extraction.
Add configuration file for endpoints and headers.
This commit is contained in:
Sp4rk.y 2024-09-09 17:40:05 -06:00
parent c7e9e4bd85
commit e2ea8ea535
2 changed files with 228 additions and 0 deletions

213
services/WTCH/__init__.py Normal file
View File

@ -0,0 +1,213 @@
import re
import os
import json
import click
from typing import Optional, Union
from http.cookiejar import CookieJar
from bs4 import BeautifulSoup
from devine.core.config import config
from devine.core.service import Service
from devine.core.titles import Episode, Series
from devine.core.tracks import Tracks
from devine.core.credential import Credential
from devine.core.manifests import HLS
from devine.core.tracks.attachment import Attachment
class WTCH(Service):
"""
Service code for watchertv.com
Author: @sp4rk.y
Authorization: Cookies or Credentials
Security: None
"""
TITLE_RE = r"^(?:https?://(?:www\.)?watchertv\.com/)([^/]+)(?:/.*)?$"
SERIES_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)(?:/season:(\d+))?/?$"
EPISODE_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)/season:(\d+)/videos/([^/]+)/?$"
@staticmethod
@click.command(name="WTCH", short_help="https://watchertv.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return WTCH(ctx, **kwargs)
def __init__(self, ctx, title: str):
self.title = title
super().__init__(ctx)
def authenticate(
self,
cookies: Optional[CookieJar] = None,
credential: Optional[Credential] = None,
) -> None:
self.credentials = credential
if cookies:
self.session.cookies.update(cookies)
elif self.credentials:
login_data = {
"email": self.credentials.username,
"password": self.credentials.password,
"authenticity_token": self._get_authenticity_token(),
"utf8": "true",
}
# Use the URL from the config
response = self.session.post(
self.config["endpoints"]["login_url"],
data=login_data,
allow_redirects=False,
)
if '<div id="watch-unauthorized"' in response.text:
self.log.error("Login failed")
raise Exception("Login failed")
else:
self.log.info("Login successful")
else:
self.log.info("No login credentials provided, proceeding without authentication")
def _get_authenticity_token(self):
signin_page = self.session.get(self.config["endpoints"]["login_url"]).text
match = re.search(r'name="authenticity_token" value="(.+?)"', signin_page)
if match:
return match.group(1)
else:
self.log.error("Could not find authenticity token")
raise ValueError("Authenticity token not found")
def get_titles(self) -> Union[Series]:
match = re.match(self.SERIES_RE, self.title)
if match:
title_id = match.group(1)
else:
title_id = self.title
url = self.config["endpoints"]["episode_metadata_url"].format(title_id=title_id)
response = self.session.get(url)
soup = BeautifulSoup(response.text, "html.parser")
episodes = []
season_urls = []
season_select = soup.find("select", class_="js-switch-season")
if season_select:
for option in season_select.find_all("option"):
season_urls.append(option["value"])
for season_url in season_urls:
season_response = self.session.get(season_url)
season_soup = BeautifulSoup(season_response.text, "html.parser")
season_number = int(re.search(r"/season:(\d+)", season_url).group(1))
for item in season_soup.find_all("div", class_="browse-item-card"):
episode_link = item.find("a", class_="browse-item-link")
if episode_link:
episode_url = episode_link["href"]
episode_data = json.loads(episode_link["data-track-event-properties"])
episode_id = episode_data["id"]
episode_title = episode_data["label"]
episode_number_elem = item.find("span", class_="media-identifier media-episode")
if episode_number_elem:
episode_number_match = re.search(r"Episode (\d+)", episode_number_elem.text)
if episode_number_match:
episode_number = int(episode_number_match.group(1))
else:
continue
else:
continue
show_title = self.title.split("/")[-1].replace("-", " ").title()
episode = Episode(
id_=str(episode_id),
service=self.__class__,
title=show_title,
season=season_number,
number=episode_number,
name=episode_title,
year=None,
data={"url": episode_url},
)
episodes.append(episode)
return Series(episodes)
def get_tracks(self, title: Union[Episode]) -> Tracks:
tracks = Tracks()
episode_url = title.data["url"]
episode_page = self.session.get(episode_url).text
embed_url_match = re.search(self.config["endpoints"]["embed_url_regex"], episode_page)
if not embed_url_match:
raise ValueError("Could not find embed_url in the episode page")
embed_url = embed_url_match.group(1).replace("&amp;", "&")
headers = {k: v.format(episode_url=episode_url) for k, v in self.config["headers"].items()}
# Fetch the embed page content
embed_page = self.session.get(embed_url, headers=headers).text
# Extract the config URL using regex
config_url_match = re.search(self.config["endpoints"]["config_url_regex"], embed_page)
if config_url_match:
config_url = config_url_match.group(1).replace("\\u0026", "&")
else:
raise ValueError("Config URL not found on the embed page.")
config_data = self.session.get(config_url, headers=headers).json()
# Retrieve the CDN information from the config data
cdns = config_data["request"]["files"]["hls"]["cdns"]
default_cdn = config_data["request"]["files"]["hls"]["default_cdn"]
# Select the default CDN or fall back to the first available one
cdn = cdns.get(default_cdn) or next(iter(cdns.values()))
# Generate the MPD URL by replacing 'playlist.json' with 'playlist.mpd'
mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd")
tracks = HLS.from_url(url=mpd_url).to_tracks(language="en")
# Extract thumbnail URL from config_data
thumbnail_base_url = config_data["video"]["thumbs"]["base"]
thumbnail_url = f"{thumbnail_base_url}"
thumbnail_response = self.session.get(thumbnail_url)
if thumbnail_response.status_code == 200:
thumbnail_filename = f"{title.id}_thumbnail.jpg"
thumbnail_path = config.directories.temp / thumbnail_filename
# Ensure the directory exists
os.makedirs(config.directories.temp, exist_ok=True)
# Save the thumbnail file
with open(thumbnail_path, "wb") as f:
f.write(thumbnail_response.content)
# Create an Attachment object
thumbnail_attachment = Attachment(
path=thumbnail_path,
name=thumbnail_filename,
mime_type="image/jpeg",
description="Thumbnail",
)
# Add the attachment to the tracks
tracks.attachments.append(thumbnail_attachment)
return tracks
def get_chapters(self, title):
return []
def get_widevine_license(self, challenge: bytes, title: Union[Episode], track):
# No DRM
pass

15
services/WTCH/config.yaml Normal file
View File

@ -0,0 +1,15 @@
endpoints:
login_url: "https://www.watchertv.com/login"
episode_metadata_url: "https://www.watchertv.com/{title_id}"
embed_url_regex: 'embed_url:\s*"([^"]+)"'
config_url_regex: 'config_url":"([^"]+)"'
headers:
referer: "{episode_url}"
user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
accept: "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
accept_language: "en-US,en;q=0.5"
upgrade_insecure_requests: "1"
sec_fetch_dest: "iframe"
sec_fetch_mode: "navigate"
sec_fetch_site: "cross-site"