From c8eb1893e8f80d7b78c9f1965c25da69773d0c4a Mon Sep 17 00:00:00 2001 From: stabbedbybrick <125766685+stabbedbybrick@users.noreply.github.com> Date: Thu, 4 Jul 2024 21:17:36 +0200 Subject: [PATCH] refactor(iP): Improve title information and labels --- services/iP/__init__.py | 110 ++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/services/iP/__init__.py b/services/iP/__init__.py index 9b3b8d8..09c195c 100644 --- a/services/iP/__init__.py +++ b/services/iP/__init__.py @@ -6,6 +6,7 @@ import re import sys import warnings from collections.abc import Generator +from concurrent.futures import ThreadPoolExecutor from typing import Any, Union import click @@ -73,7 +74,7 @@ class iP(Service): self.log.error("HLG tracks cannot be requested without an SSL certificate") sys.exit(1) - elif self.range and self.range[0].name == "HLG" and self.config.get("cert"): + elif self.range and self.range[0].name == "HLG": self.session.headers.update({"user-agent": self.config["user_agent"]}) self.vcodec = "H.265" @@ -104,17 +105,24 @@ class iP(Service): data = self.get_data(pid, slice_id=None) if data is None and kind == "episode": - return self.get_single_episode(pid) + return Series([self.fetch_episode(pid)]) + elif data is None: raise ValueError(f"Metadata was not found - if {pid} is an episode, use full URL as input") if "Film" in data["labels"]["category"]: + data = self.session.get(self.config["endpoints"]["episodes"].format(pid=pid)).json() + if not data.get("episodes"): + raise ValueError(f"Metadata was not found for {pid}") + + movie = data.get("episodes")[0] + return Movies( [ Movie( - id_=data["id"], - name=data["title"]["default"], - year=None, # TODO + id_=movie.get("id"), + name=movie.get("title"), + year=movie.get("release_date_time", "").split("-")[0], service=self.__class__, language="en", data=data, @@ -123,16 +131,18 @@ class iP(Service): ) else: seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]] - episodes = [ - self.create_episode(episode, data) for season in seasons for episode in season["entities"]["results"] + episode_ids = [ + episode["episode"].get("id") + for season in seasons + for episode in season["entities"]["results"] + if not episode["episode"].get("live") ] + episodes = self.get_episodes(episode_ids) return Series(episodes) def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: r = self.session.get(url=self.config["endpoints"]["playlist"].format(pid=title.id)) - if not r.ok: - self.log.error(r.text) - sys.exit(1) + r.raise_for_status() versions = r.json().get("allAvailableVersions") if not versions: @@ -265,6 +275,10 @@ class iP(Service): media = None if self.vcodec == "H.265": + if not self.config.get("cert"): + self.log.error(" - H.265 tracks cannot be requested without an SSL certificate") + sys.exit(1) + session = self.session session.mount("https://", SSLCiphers()) session.mount("http://", SSLCiphers()) @@ -279,6 +293,10 @@ class iP(Service): media = availability["media"] break + if availability.get("result"): + self.log.error(f"Error: {availability['result']}") + sys.exit(1) + else: mediaset = "iptv-all" @@ -290,47 +308,29 @@ class iP(Service): media = availability["media"] break + if availability.get("result"): + self.log.error(f"Error: {availability['result']}") + sys.exit(1) + return media - def create_episode(self, episode: dict, data: dict) -> Episode: - title = episode["episode"]["title"]["default"].strip() - subtitle = episode["episode"]["subtitle"] - series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle.get("default") or "") - season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0)) - if season_num == 0 and not data.get("slices"): - season_num = 1 - - number = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle.get("slice") or subtitle.get("default") or "") - ep_num = int(next((m.group(1) or m.group(2) for m in number), 0)) - - name = re.search(r"\d+\. (.+)", subtitle.get("slice") or "") - ep_name = name.group(1) if name else subtitle.get("slice") or "" - if not subtitle.get("slice"): - ep_name = subtitle.get("default") or "" - - return Episode( - id_=episode["episode"].get("id"), - service=self.__class__, - title=title, - season=season_num, - number=ep_num, - name=ep_name, - language="en", - data=episode, - ) - - def get_single_episode(self, pid: str) -> Series: + def fetch_episode(self, pid: str) -> Series: r = self.session.get(self.config["endpoints"]["episodes"].format(pid=pid)) r.raise_for_status() data = json.loads(r.content) - subtitle = data["episodes"][0].get("subtitle") + episode = data["episodes"][0] + subtitle = episode.get("subtitle") + year = episode.get("release_date_time", "").split("-")[0] + numeric_position = episode.get("numeric_tleo_position") if subtitle is not None: - season_match = re.search(r"Series (\d+):", subtitle) - season = int(season_match.group(1)) if season_match else 0 + series = re.finditer(r"Series (\d+):|Season (\d+):|(\d{4}/\d{2}): Episode \d+", subtitle or "") + season_num = int(next((m.group(1) or m.group(2) or m.group(3).replace("/", "") for m in series), 0)) + if season_num == 0 and not data.get("slices"): + season_num = 1 number_match = re.finditer(r"(\d+)\.|Episode (\d+)", subtitle) - number = int(next((m.group(1) or m.group(2) for m in number_match), 0)) + number = int(next((m.group(1) or m.group(2) for m in number_match), numeric_position or 0)) name_match = re.search(r"\d+\. (.+)", subtitle) name = ( name_match.group(1) @@ -340,20 +340,22 @@ class iP(Service): else "" ) - return Series( - [ - Episode( - id_=data["episodes"][0]["id"], - service=self.__class__, - title=data["episodes"][0]["title"], - season=season if subtitle else 0, - number=number if subtitle else 0, - name=name if subtitle else "", - language="en", - ) - ] + return Episode( + id_=episode.get("id"), + service=self.__class__, + title=episode.get("title"), + season=season_num if subtitle else 0, + number=number if subtitle else 0, + name=name if subtitle else "", + language="en", + year=year, ) + def get_episodes(self, episodes: list) -> list: + with ThreadPoolExecutor(max_workers=10) as executor: + tasks = list(executor.map(self.fetch_episode, episodes)) + return [task for task in tasks if task is not None] + def find(self, pattern, string, group=None): if group: m = re.search(pattern, string)