import re import os import json import click from urllib.parse import urljoin from typing import Optional, Union from http.cookiejar import CookieJar from bs4 import BeautifulSoup from devine.core.config import config from devine.core.service import Service from devine.core.titles import Episode, Series from devine.core.tracks import Tracks from devine.core.credential import Credential from devine.core.manifests import HLS from devine.core.tracks.attachment import Attachment class WTCH(Service): """ Service code for watchertv.com Author: @sp4rk.y Authorization: Cookies or Credentials Security: None """ TITLE_RE = r"^(?:https?://(?:www\.)?watchertv\.com/)([^/]+)(?:/.*)?$" SERIES_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)(?:/season:(\d+))?/?$" EPISODE_RE = r"https?://(?:www\.)?watchertv\.com/([^/]+)/season:(\d+)/videos/([^/]+)/?$" @staticmethod @click.command(name="WTCH", short_help="https://watchertv.com", help=__doc__) @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return WTCH(ctx, **kwargs) def __init__(self, ctx, title: str): self.title = title super().__init__(ctx) def authenticate( self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None, ) -> None: self.credentials = credential if cookies: self.session.cookies.update(cookies) elif self.credentials: login_data = { "email": self.credentials.username, "password": self.credentials.password, "authenticity_token": self._get_authenticity_token(), "utf8": "true", } # Use the URL from the config response = self.session.post( self.config["endpoints"]["login_url"], data=login_data, allow_redirects=False, ) if '
Union[Series]: match = re.match(self.SERIES_RE, self.title) if match: title_id = match.group(1) else: title_id = self.title base_url = self.config["endpoints"]["episode_metadata_url"].format(title_id=title_id) episodes = [] season_urls = [] response = self.session.get(base_url) soup = BeautifulSoup(response.text, "html.parser") season_select = soup.find("select", class_="js-switch-season") if season_select: for option in season_select.find_all("option"): season_url = urljoin(response.url, option["value"]) season_urls.append(season_url) else: season_urls.append(base_url) for season_url in season_urls: params = { "page": 1, "per_page": 1000, "html": "1", "ajax": "1", } season_response = self.session.get(season_url, params=params) season_soup = BeautifulSoup(season_response.text, "html.parser") season_number_match = re.search(r"/season:(\d+)", season_url) if season_number_match: season_number = int(season_number_match.group(1)) else: season_number = None items = season_soup.find_all("div", class_="browse-item-card") if not items: continue for item in items: episode_link = item.find("a", class_="browse-item-link") if episode_link: episode_url = episode_link["href"] episode_data_json = episode_link.get("data-track-event-properties") if episode_data_json: episode_data = json.loads(episode_data_json) else: continue episode_id = episode_data.get("id") episode_title = episode_data.get("label") episode_number_elem = item.find("span", class_="media-identifier media-episode") if episode_number_elem: episode_number_text = episode_number_elem.text.strip() episode_number_match = re.search(r"Episode (\d+)", episode_number_text) if episode_number_match: episode_number = int(episode_number_match.group(1)) else: continue else: continue show_title = self.title.split("/")[-1].replace("-", " ").title() episode = Episode( id_=str(episode_id), service=self.__class__, title=show_title, season=season_number, number=episode_number, name=episode_title, year=None, data={"url": episode_url}, ) episodes.append(episode) return Series(episodes) def get_tracks(self, title: Union[Episode]) -> Tracks: tracks = Tracks() episode_url = title.data["url"] episode_page = self.session.get(episode_url).text embed_url_match = re.search(self.config["endpoints"]["embed_url_regex"], episode_page) if not embed_url_match: raise ValueError("Could not find embed_url in the episode page") embed_url = embed_url_match.group(1).replace("&", "&") headers = {k: v.format(episode_url=episode_url) for k, v in self.config["headers"].items()} # Fetch the embed page content embed_page = self.session.get(embed_url, headers=headers).text # Extract the config URL using regex config_url_match = re.search(self.config["endpoints"]["config_url_regex"], embed_page) if config_url_match: config_url = config_url_match.group(1).replace("\\u0026", "&") else: raise ValueError("Config URL not found on the embed page.") config_data = self.session.get(config_url, headers=headers).json() # Retrieve the CDN information from the config data cdns = config_data["request"]["files"]["hls"]["cdns"] default_cdn = config_data["request"]["files"]["hls"]["default_cdn"] # Select the default CDN or fall back to the first available one cdn = cdns.get(default_cdn) or next(iter(cdns.values())) # Generate the MPD URL by replacing 'playlist.json' with 'playlist.mpd' mpd_url = cdn["avc_url"].replace("playlist.json", "playlist.mpd") tracks = HLS.from_url(url=mpd_url).to_tracks(language="en") # Extract thumbnail URL from config_data thumbnail_base_url = config_data["video"]["thumbs"]["base"] thumbnail_url = f"{thumbnail_base_url}" thumbnail_response = self.session.get(thumbnail_url) if thumbnail_response.status_code == 200: thumbnail_filename = f"{title.id}_thumbnail.jpg" thumbnail_path = config.directories.temp / thumbnail_filename # Ensure the directory exists os.makedirs(config.directories.temp, exist_ok=True) # Save the thumbnail file with open(thumbnail_path, "wb") as f: f.write(thumbnail_response.content) # Create an Attachment object thumbnail_attachment = Attachment( path=thumbnail_path, name=thumbnail_filename, mime_type="image/jpeg", description="Thumbnail", ) # Add the attachment to the tracks tracks.attachments.append(thumbnail_attachment) return tracks def get_chapters(self, title): return [] def get_widevine_license(self, challenge: bytes, title: Union[Episode], track): # No DRM pass