unshackle-services/KOWP/__init__.py
2025-11-03 16:44:16 +01:00

274 lines
9.7 KiB
Python

import json
import re
from http.cookiejar import CookieJar
from typing import Optional, List, Dict, Any
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.search_result import SearchResult
from unshackle.core.titles import Episode, Series, Title_T, Titles_T
from unshackle.core.tracks import Subtitle, Tracks
class KOWP(Service):
"""
Service code for Kocowa Plus (kocowa.com).
Version: 1.0.0
Auth: Credential (username + password)
Security: FHD@L3
Note:
Brightcove account_id and policy key (pk) are configurable per region.
Put the custom Brightcove account ID and policy key on the config.yaml if the following doesn't work
"""
TITLE_RE = r"^(?:https?://(?:www\.)?kocowa\.com/[^/]+/season/)?(?P<title_id>\d+)"
GEOFENCE = ("US", "CA", "PA")
NO_SUBTITLES = False
@staticmethod
@click.command(name="kowp", short_help="https://www.kocowa.com")
@click.argument("title", type=str)
@click.option("--extras", is_flag=True, default=False, help="Include teasers/extras")
@click.pass_context
def cli(ctx, **kwargs):
return KOWP(ctx, **kwargs)
def __init__(self, ctx, title: str, extras: bool = False):
super().__init__(ctx)
if not self.config:
raise EnvironmentError("Missing KOWP config")
match = re.match(self.TITLE_RE, title)
if not match:
raise ValueError("Invalid Kocowa title ID or URL")
self.title_id = match.group("title_id")
self.include_extras = extras
# Load Brightcove config
bc_conf = self.config.get("brightcove", {})
self.brightcove_account_id = bc_conf.get("account_id", "6154734805001")
self.brightcove_pk = bc_conf.get("policy_key", "BCpkADawqM1FKrSBim1gusdFR73Prfums__ZmQ7uJ4yCRqv-RKrq2HtZIkVOn4gsmPdAqO007VNtaKJCmg0Uu1rpuUnjnP-f9OPklQ9l2-HS_F_sJXgT96KpUahg9XNukAraAlob6XDuoecD")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
if not credential:
raise ValueError("KOWP requires username and password")
payload = {
"username": credential.username,
"password": credential.password,
"device_id": f"{credential.username}_browser",
"device_type": "browser",
"device_model": "Firefox",
"device_version": "firefox/143.0",
"push_token": None,
"app_version": "v4.0.16",
}
r = self.session.post(
self.config["endpoints"]["login"],
json=payload,
headers={"Authorization": "anonymous", "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
res = r.json()
if res.get("code") != "0000":
raise PermissionError(f"Login failed: {res.get('message')}")
self.access_token = res["object"]["access_token"]
r = self.session.post(
self.config["endpoints"]["middleware_auth"],
json={"token": f"wA-Auth.{self.access_token}"},
headers={"Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
self.middleware_token = r.json()["token"]
def get_titles(self) -> Titles_T:
all_episodes = []
offset = 0
limit = 20
while True:
url = self.config["endpoints"]["metadata"].format(title_id=self.title_id)
sep = "&" if "?" in url else "?"
url += f"{sep}offset={offset}&limit={limit}"
r = self.session.get(
url,
headers={"Authorization": self.access_token, "Origin": "https://www.kocowa.com"}
)
r.raise_for_status()
data = r.json()["object"]
page_objects = data.get("next_episodes", {}).get("objects", [])
if not page_objects:
break
for ep in page_objects:
is_episode = ep.get("detail_type") == "episode"
is_extra = ep.get("detail_type") in ("teaser", "extra")
if is_episode or (self.include_extras and is_extra):
all_episodes.append(ep)
offset += limit
total = data.get("next_episodes", {}).get("total_count", 0)
if len(all_episodes) >= total or len(page_objects) < limit:
break
episodes = []
series_title = data["meta"]["title"].get("en") or "Unknown"
for ep in all_episodes:
meta = ep["meta"]
ep_type = "Episode" if ep["detail_type"] == "episode" else ep["detail_type"].capitalize()
ep_num = meta.get("episode_number", 0)
title = meta["title"].get("en") or f"{ep_type} {ep_num}"
desc = meta["description"].get("en") or ""
episodes.append(
Episode(
id_=str(ep["id"]),
service=self.__class__,
title=series_title,
season=meta.get("season_number", 1),
number=ep_num,
name=title,
description=desc,
year=None,
language=Language.get("en"),
data=ep,
)
)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# Authorize playback
r = self.session.post(
self.config["endpoints"]["authorize"].format(episode_id=title.id),
headers={"Authorization": f"Bearer {self.middleware_token}"}
)
r.raise_for_status()
auth_data = r.json()
if not auth_data.get("Success"):
raise PermissionError("Playback authorization failed")
self.playback_token = auth_data["token"]
# Fetch Brightcove manifest
manifest_url = (
f"https://edge.api.brightcove.com/playback/v1/accounts/{self.brightcove_account_id}/videos/ref:{title.id}"
)
r = self.session.get(
manifest_url,
headers={"Accept": f"application/json;pk={self.brightcove_pk}"}
)
r.raise_for_status()
manifest = r.json()
# Get DASH URL + Widevine license
dash_url = widevine_url = None
for src in manifest.get("sources", []):
if src.get("type") == "application/dash+xml":
dash_url = src["src"]
widevine_url = (
src.get("key_systems", {})
.get("com.widevine.alpha", {})
.get("license_url")
)
if dash_url and widevine_url:
break
if not dash_url or not widevine_url:
raise ValueError("No Widevine DASH stream found")
self.widevine_license_url = widevine_url
tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language)
# Add ALL subtitles from manifest
for sub in manifest.get("text_tracks", []):
srclang = sub.get("srclang")
if not srclang or srclang == "thumbnails":
continue
tracks.add(
Subtitle(
id_=sub["id"],
url=sub["src"],
codec=Subtitle.Codec.WebVTT,
language=Language.get(srclang),
sdh=True,
)
)
return tracks
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
r = self.session.post(
self.widevine_license_url,
data=challenge,
headers={
"BCOV-Auth": self.playback_token,
"Content-Type": "application/octet-stream",
"Origin": "https://www.kocowa.com",
"Referer": "https://www.kocowa.com/",
}
)
r.raise_for_status()
return r.content
# def search(self) -> List[SearchResult]:
# if not hasattr(self, 'title_id') or not isinstance(self.title_id, str):
# query = getattr(self, 'title', '') # fallback if title_id isn't set yet
# else:
# query = self.title_id
#
# url = "https://prod-fms.kocowa.com/api/v01/fe/gks/autocomplete"
# params = {
# "search_category": "All",
# "search_input": query,
# "include_webtoon": "true",
# }
#
# r = self.session.get(
# url,
# params=params,
# headers={
# "Authorization": self.access_token,
# "Origin": "https://www.kocowa.com ",
# "Referer": "https://www.kocowa.com/ ",
# }
# )
# r.raise_for_status()
# response = r.json()
# contents = response.get("object", {}).get("contents", [])
#
# results = []
# for item in contents:
# if item.get("detail_type") != "season":
# continue # skip non-season items (e.g., actors, webtoons)
#
# meta = item["meta"]
# title_en = meta["title"].get("en") or "[No Title]"
# description_en = meta["description"].get("en") or ""
# show_id = str(item["id"])
#
# results.append(
# SearchResult(
# id_=show_id,
# title=title_en,
# description=description_en,
# label="season",
# url=f"https://www.kocowa.com/en_us/season/{show_id}/placeholder"
# )
# )
# return results
def get_chapters(self, title: Title_T) -> list:
return []