unshackle-services/GLA/__init__.py

471 lines
18 KiB
Python
Raw Normal View History

2026-04-21 10:47:12 +02:00
import base64
import hashlib
import json
import re
import time
from collections.abc import Generator
from datetime import datetime
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urljoin, parse_qs, urlparse
import click
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH, HLS
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video
class GLA(Service):
"""
Service code for gagaoolala.com
Version: 1.0.1
Authorization: Email/Password or Cookies (PHPSESSID)
Security: FHD@L3 (Widevine/PlayReady DRM via ExpressPlay)
Use full URL: https://www.gagaoolala.com/en/videos/6184/candy-2026
Or title ID: 6184 (slug will be fetched from page if needed)
"""
# Updated regex to optionally capture slug
TITLE_RE = r"^(?:https?://(?:www\.)?gagaoolala\.com/(?:en/)?videos/)?(?P<title_id>\d+)(?:/(?P<slug>[^/?#]+))?"
GEOFENCE = ()
NO_SUBTITLES = False
VIDEO_RANGE_MAP = {
"SDR": "sdr",
"HDR10": "hdr10",
"DV": "dolby_vision",
}
@staticmethod
@click.command(name="GLA", short_help="https://www.gagaoolala.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.option("-d", "--device", type=str, default="firefox_linux", help="Select device profile")
@click.pass_context
def cli(ctx, **kwargs):
return GLA(ctx, **kwargs)
def __init__(self, ctx, title, movie, device, email=None, password=None):
super().__init__(ctx)
self.title = title
self.movie = movie
self.device = device
self.email = email
self.password = password
self.cdm = ctx.obj.cdm
# Override codec/range for L3 CDM limitations
if self.cdm and self.cdm.security_level == 3:
self.track_request.codecs = [Video.Codec.AVC]
self.track_request.ranges = [Video.Range.SDR]
if self.config is None:
raise Exception("Config is missing!")
self.profile = ctx.parent.params.get("profile") or "default"
self.user_id = None
self.license_data = {}
self.slug = None # Store slug for API calls
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if cookies:
self.session.cookies.update(cookies)
for cookie in cookies:
if cookie.name == "gli":
self.user_id = cookie.value
break
return
if not credential or not credential.username or not credential.password:
raise EnvironmentError("Service requires Cookies or Credential (email/password) for Authentication.")
login_url = "https://www.gagaoolala.com/en/user/login"
login_data = {
"email": credential.username,
"passwd": credential.password,
}
headers = {
"User-Agent": self.config["client"][self.device]["user_agent"],
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "en-US,en;q=0.9",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Origin": "https://www.gagaoolala.com",
"Referer": login_url,
}
response = self.session.post(login_url, data=login_data, headers=headers)
response.raise_for_status()
result = response.json()
if not result.get("success"):
error_msg = result.get("msg") or result.get("data", {}).get("msg") or "Unknown error"
raise AuthenticationError(f"Login failed: {error_msg}")
self.user_id = result.get("data", {}).get("user_line_uid")
def search(self) -> Generator[SearchResult, None, None]:
search_url = "https://www.gagaoolala.com/en/search"
params = {"q": self.title}
headers = {
"User-Agent": self.config["client"][self.device]["user_agent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"X-Requested-With": "XMLHttpRequest",
}
response = self.session.get(search_url, params=params, headers=headers)
response.raise_for_status()
content_type = response.headers.get("Content-Type", "")
if "application/json" in content_type:
try:
data = response.json()
except json.JSONDecodeError:
data = None
else:
data = None
if not data:
html = response.text
json_ld_match = re.search(
r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>\s*({.*?"@context".*?})\s*</script>',
html,
re.DOTALL | re.IGNORECASE
)
if json_ld_match:
json_str = json_ld_match.group(1)
json_str = json_str.replace(r'\/', '/').replace(r'\"', '"')
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
self.log.debug(f"Failed to parse JSON-LD: {e}")
data = None
else:
fallback_match = re.search(
r'(\{[^{}]*"@context"[^{}]*"itemListElement"[^{}]*\[\s*\{[^{}]*"url"[^{}]*\][^{}]*\})',
html,
re.DOTALL
)
if fallback_match:
try:
data = json.loads(fallback_match.group(1))
except json.JSONDecodeError:
data = None
if not data or "itemListElement" not in data:
self.log.warning(f"No search results found for '{self.title}'")
return
for item in data["itemListElement"]:
url = item.get("url", "")
if not url:
continue
match = re.match(self.TITLE_RE, url)
if not match:
continue
title_id = match.group("title_id")
slug = match.group("slug")
# Extract title name from slug or URL
title_name = slug if slug else url.rstrip("/").split("/")[-1]
if "-" in title_name:
parts = title_name.rsplit("-", 1)
# Remove year suffix if present (e.g., candy-2026 -> candy)
if parts[-1].isdigit() and len(parts[-1]) == 4:
title_name = parts[0]
title_name = title_name.replace("-", " ").title()
# Detect series vs movie
is_series = bool(slug and ("-e" in slug or slug.endswith("-e01")))
yield SearchResult(
id_=title_id,
title=title_name,
label="SERIES" if is_series else "MOVIE",
url=url,
)
def _clean_title(self, raw_title: str, slug: Optional[str] = None) -> str:
"""Clean up page titles by removing SEO/marketing suffixes."""
title = re.sub(r'\s*\|\s*GagaOOLala\s*$', '', raw_title).strip()
seo_patterns = [
r'\s*-\s*Watch\s+Online.*$',
r'\s*-\s*Find\s+Your\s+Story.*$',
r'\s*-\s*Watch\s+BL\s+Movies.*$',
r'\s*-\s*Stream\s+Online.*$',
r'\s*-\s*Free\s+Streaming.*$',
r'\s*-\s*GagaOOLala.*$',
]
for pattern in seo_patterns:
title = re.sub(pattern, '', title, flags=re.IGNORECASE)
title = re.sub(r'\s*-\s*$', '', title).strip()
if slug:
slug_title = slug.replace('-', ' ').title()
year_match = re.search(r'(\d{4})$', slug)
if year_match:
year = year_match.group(1)
slug_title = re.sub(r'\s*\d{4}\s*$', '', slug_title).strip()
candidate = f"{slug_title} ({year})"
if len(candidate) < len(title) or title.lower().startswith(slug_title.lower()):
return candidate
return title if title else f"Title {self.title}"
def get_titles(self) -> Titles_T:
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Could not parse title ID from: {self.title}")
title_id = match.group("title_id")
self.slug = match.group("slug")
video_url = f"https://www.gagaoolala.com/en/videos/{title_id}"
if self.slug:
video_url += f"/{self.slug}"
response = self.session.get(video_url)
if response.status_code == 404 and self.slug:
self.log.warning(f"URL with slug returned 404, trying without slug")
video_url = f"https://www.gagaoolala.com/en/videos/{title_id}"
response = self.session.get(video_url)
response.raise_for_status()
episodes_match = re.search(r'var\s+videoEpisodes\s*=\s*(\[.*?\]);\s*var\s+videoSeasons', response.text, re.DOTALL)
if episodes_match:
episodes_data = json.loads(episodes_match.group(1))
series_episodes = [ep for ep in episodes_data if ep.get("is_series")]
if series_episodes:
first_name = series_episodes[0].get("name", "")
base_title = re.sub(r'\s*Episode\s*\d+.*$', '', first_name).strip()
if not base_title and self.slug:
base_title = self._clean_title(self.slug.replace('-', ' ').title(), None)
if not base_title:
base_title = f"Series {title_id}"
episodes = []
for ep in series_episodes:
ep_slug = ep.get("slug", f"{self.slug}-e{ep.get('episode', 1)}" if self.slug else None)
episodes.append(
Episode(
id_=str(ep["id"]),
service=self.__class__,
title=base_title,
season=ep.get("season", 1),
number=ep.get("episode", 1),
name=ep.get("name", f"Episode {ep.get('episode', 1)}"),
description=None,
year=None,
language=Language.get("en"),
data={**ep, "slug": ep_slug, "parent_slug": self.slug},
)
)
return Series(episodes)
title_match = re.search(r'<title>([^<]+)</title>', response.text)
raw_title = title_match.group(1) if title_match else (self.slug or f"Movie {title_id}")
movie_title = self._clean_title(raw_title, self.slug)
year = None
year_match = re.search(r'\((\d{4})\)\s*$', movie_title)
if year_match:
year = int(year_match.group(1))
movie_title = re.sub(r'\s*\(\d{4}\)\s*$', '', movie_title).strip()
elif self.slug:
slug_year = re.search(r'(\d{4})$', self.slug)
if slug_year:
year = int(slug_year.group(1))
return Movies(
[
Movie(
id_=title_id,
service=self.__class__,
name=movie_title,
description=None,
year=year,
language=Language.get("en"),
data={"url": video_url, "slug": self.slug},
)
]
)
def get_tracks(self, title: Title_T) -> Tracks:
def _fetch_variant(
title: Title_T,
codec: Optional[Video.Codec],
range_: Video.Range,
) -> Tracks:
vcodec_str = "H265" if codec == Video.Codec.HEVC else "H264"
range_str = range_.name
video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr")
tracks = self._fetch_manifest(title)
if codec:
tracks.videos = [v for v in tracks.videos if v.codec == codec]
if range_ != Video.Range.SDR:
tracks.videos = [v for v in tracks.videos if v.range == range_]
if not tracks.videos:
raise ValueError(f"No tracks available for {codec} {range_}")
return tracks
tracks = self._get_tracks_for_variants(title, _fetch_variant)
return tracks
def get_tracks(self, title: Title_T) -> Tracks:
def _fetch_variant(
title: Title_T,
codec: Optional[Video.Codec],
range_: Video.Range,
) -> Tracks:
vcodec_str = "H265" if codec == Video.Codec.HEVC else "H264"
range_str = range_.name
video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr")
tracks = self._fetch_manifest(title)
if codec:
tracks.videos = [v for v in tracks.videos if v.codec == codec]
if range_ != Video.Range.SDR:
tracks.videos = [v for v in tracks.videos if v.range == range_]
if not tracks.videos:
raise ValueError(f"No tracks available for {codec} {range_}")
return tracks
return self._get_tracks_for_variants(title, _fetch_variant)
def _fetch_manifest(self, title: Title_T) -> Tracks:
timestamp = int(time.time())
slug = title.data.get("slug") if isinstance(title.data, dict) else None
if not slug:
slug = title.data.get("parent_slug") if isinstance(title.data, dict) else self.slug
if not slug:
match = re.match(self.TITLE_RE, self.title)
if match:
slug = match.group("slug")
if slug:
play_url = f"https://www.gagaoolala.com/api/v1.0/en/videos/{title.id}/{slug}/play"
else:
play_url = f"https://www.gagaoolala.com/api/v1.0/en/videos/{title.id}/play"
self.log.warning(f"No slug available, attempting play request without slug: {play_url}")
params = {"t": timestamp}
response = self.session.get(play_url, params=params)
response.raise_for_status()
playback = response.json()
if not playback.get("success"):
raise ValueError(f"Failed to get playback info: {playback}")
data = playback["data"]
drm_info = data.get("drm")
if drm_info:
self.license_data = {
"widevine": drm_info.get("widevine", {}).get("LA_URL"),
"playready": drm_info.get("playready", {}).get("LA_URL"),
}
else:
self.license_data = {}
manifest_url = data.get("dash") or data.get("m3u8")
if not manifest_url:
raise ValueError("No manifest URL found in playback response")
if ".mpd" in manifest_url:
tracks = DASH.from_url(url=manifest_url, session=self.session).to_tracks(language=title.language)
elif ".m3u8" in manifest_url:
tracks = HLS.from_url(url=manifest_url, session=self.session).to_tracks(language=title.language)
else:
raise ValueError(f"Unsupported manifest format: {manifest_url}")
for video in tracks.videos:
if video.codec == Video.Codec.HEVC and video.profile and "Main10" in str(video.profile):
video.range = Video.Range.HDR10
else:
video.range = Video.Range.SDR
return tracks
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_: any) -> str:
return self.config.get("certificate", "")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
if not self.license_data.get("widevine"):
raise ValueError("Widevine license URL not available for this title")
license_url = self.license_data["widevine"]
headers = {
"User-Agent": self.config["client"][self.device].get("license_user_agent",
self.config["client"][self.device]["user_agent"]),
"Content-Type": "application/octet-stream",
}
response = self.session.post(
url=license_url,
data=challenge,
headers=headers,
)
response.raise_for_status()
return response.content
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
if not self.license_data.get("playready"):
raise ValueError("PlayReady license URL not available for this title")
license_url = self.license_data["playready"]
headers = {
"User-Agent": self.config["client"][self.device].get("license_user_agent",
self.config["client"][self.device]["user_agent"]),
"Content-Type": "text/xml",
"SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense",
}
response = self.session.post(
url=license_url,
data=challenge,
headers=headers,
)
response.raise_for_status()
return response.content