2025-08-03 01:47:10 +02:00
from __future__ import annotations
from http . cookiejar import MozillaCookieJar
from typing import Any , Optional , Union
from functools import partial
from pathlib import Path
import sys
import re
import click
import webvtt
import requests
from click import Context
from bs4 import BeautifulSoup
from devine . core . credential import Credential
from devine . core . service import Service
from devine . core . titles import Movie , Movies , Episode , Series
from devine . core . tracks import Track , Chapter , Tracks , Subtitle
from devine . core . manifests . hls import HLS
class NebulaSubtitle ( Subtitle ) :
STYLE_RE = re . compile ( ' ::cue \\ (v \\ [voice= " (.+) " \\ ] \\ ) { color: ([^;]+); (.*)} ' )
RGB_RE = re . compile ( " rgb \\ ((.+), ?(.+), ?(.+) \\ ) " )
def download (
self ,
session : requests . Session ,
prepare_drm : partial ,
max_workers : Optional [ int ] = None ,
progress : Optional [ partial ] = None
) :
# Track.download chooses file extension based on class name so use
# this hack to keep it happy
self . __class__ . __name__ = " Subtitle "
# Skip Subtitle.download and use Track.download directly. The pycaption
# calls in Subtitle.download are not needed here and mangle the WebVTT
# styling Nebula uses
Track . download ( self , session , prepare_drm , max_workers , progress )
def convert ( self , codec : Subtitle . Codec ) - > Path :
if codec != Subtitle . Codec . SubRip :
return super ( ) . convert ( codec )
output_path = self . path . with_suffix ( f " . { codec . value . lower ( ) } " )
vtt = webvtt . read ( self . path )
styles = dict ( )
for group in vtt . styles :
for style in group . text . splitlines ( ) :
if match := self . STYLE_RE . match ( style ) :
name , color , extra = match . groups ( )
if " rgb " in color :
r , g , b = self . RGB_RE . match ( color ) . groups ( )
color = " # {0:02x} {1:02x} {2:02x} " . format ( int ( r ) , int ( g ) , int ( b ) )
bold = " bold " in extra
styles [ name . lower ( ) ] = { " color " : color , " bold " : bold }
2025-08-11 01:46:38 +02:00
2025-08-03 01:47:10 +02:00
count = 1
new_subs = [ ]
for caption in vtt :
soup = BeautifulSoup ( caption . raw_text , features = " html.parser " )
for tag in soup . find_all ( " v " ) :
name = " " . join ( tag . attrs . keys ( ) )
2025-08-11 01:46:38 +02:00
# Nebula subtitles sometimes contain references to non-existent style groups, just
# remove the tag entirely in those cases
style = styles . get ( name )
if not style :
tag . replaceWithChildren ( )
continue
2025-08-03 01:47:10 +02:00
tag . name = " font "
tag . attrs = { " color " : style [ " color " ] }
if style [ " bold " ] :
tag . wrap ( soup . new_tag ( " b " ) )
2025-08-11 01:46:38 +02:00
2025-08-03 01:47:10 +02:00
text = str ( soup )
new_subs . append ( f " { count } " )
new_subs . append ( f " { caption . start } --> { caption . end } " )
new_subs . append ( f " { text } \n " )
count + = 1
output_path . write_text ( " \n " . join ( new_subs ) , encoding = " utf8 " )
self . path = output_path
self . codec = codec
if callable ( self . OnConverted ) :
self . OnConverted ( codec )
return output_path
class NBLA ( Service ) :
"""
Service code for Nebula ( https : / / nebula . tv )
\b
Version : 1.0 .0
Author : lambda
Authorization : Credentials
Robustness :
Unencrypted : 2160 p , AAC2 .0
"""
VIDEO_RE = r " https?://(?:www \ .)?nebula \ .tv/videos/(?P<slug>.+) "
CHANNEL_RE = r " ^https?://(?:www \ .)?nebula \ .tv/(?P<slug>.+) "
@staticmethod
@click.command ( name = " NBLA " , short_help = " https://nebula.tv " , help = __doc__ )
@click.argument ( " title " , type = str )
@click.pass_context
def cli ( ctx : Context , * * kwargs : Any ) - > NBLA :
return NBLA ( ctx , * * kwargs )
def __init__ ( self , ctx : Context , title : str ) :
self . title = title
super ( ) . __init__ ( ctx )
def authenticate ( self , cookies : Optional [ MozillaCookieJar ] = None , credential : Optional [ Credential ] = None ) - > None :
cache = self . cache . get ( f " key_ { credential . sha1 } " )
if not cache or cache . expired :
self . log . info ( " Key is missing or expired, logging in... " )
data = {
" email " : credential . username ,
" password " : credential . password ,
}
r = self . session . post ( self . config [ " endpoints " ] [ " login " ] , json = data )
r . raise_for_status ( )
key = r . json ( ) . get ( " key " )
cache . set ( key )
else :
key = cache . data
r = self . session . post ( self . config [ " endpoints " ] [ " authorization " ] , headers = { " Authorization " : f " Token { key } " } )
r . raise_for_status ( )
self . jwt = r . json ( ) [ " token " ]
self . session . headers . update ( { " Authorization " : f " Bearer { self . jwt } " } )
def get_titles ( self ) - > Union [ Movies , Series ] :
if video_match := re . match ( self . VIDEO_RE , self . title ) :
r = self . session . get ( self . config [ " endpoints " ] [ " video " ] . format ( slug = video_match . group ( " slug " ) ) )
video = r . json ( )
# Simplest scenario: This is a video on a non-episodic channel, return it as movie
if video [ " channel_type " ] != " episodic " :
return Movies ( [
Movie (
id_ = video [ " id " ] ,
service = self . __class__ ,
name = video [ " title " ] ,
year = video [ " published_at " ] [ 0 : 4 ] ,
language = " en "
)
] )
# For episodic videos, things are trickier: There is no way to get the season
# and episode number from the video endpoint, so we instead have to iterate
# through all seasons and filter for the video id.
return self . get_content ( video [ " channel_slug " ] , video_id_filter = video [ " id " ] )
# If the link did not match the video regex, try using it as slug for the content
# API to fetch a whole channel/season
elif channel_match := re . match ( self . CHANNEL_RE , self . title ) :
return self . get_content ( channel_match . group ( " slug " ) )
def get_tracks ( self , title : Union [ Episode , Movie ] ) - > Tracks :
r = self . session . get ( self . config [ " endpoints " ] [ " manifest " ] . format ( video_id = title . id , jwt = self . jwt ) , allow_redirects = False )
manifest_url = r . headers [ " Location " ]
tracks = HLS . from_url ( manifest_url ) . to_tracks ( title . language )
subs = [ ]
for subtitle in tracks . subtitles :
subs . append ( NebulaSubtitle (
id_ = subtitle . id ,
url = subtitle . url ,
language = subtitle . language ,
is_original_lang = subtitle . is_original_lang ,
descriptor = subtitle . descriptor ,
name = subtitle . name ,
codec = subtitle . codec ,
forced = subtitle . forced ,
sdh = subtitle . sdh ,
) )
tracks . subtitles = subs
return tracks
def get_chapters ( self , title : Union [ Episode , Movie ] ) - > list [ Chapter ] :
return [ ]
def search ( self ) - > Generator [ SearchResult , None , None ] :
pass
#self.title
r = self . session . get ( self . config [ " endpoints " ] [ " search " ] , params = params )
r . raise_for_status ( )
# for result in results["results"]:
# yield SearchResult(
# id_=result["brand"].get("websafeTitle"),
# title=result["brand"].get("title"),
# description=result["brand"].get("description"),
# label=result.get("label"),
# url=result["brand"].get("href"),
# )
### Service specific functions
def season_to_episodes ( self , channel , season , video_id_filter ) :
try :
season_number = int ( season [ " label " ] )
except ValueError :
# Some shows such have some non-integer season numbers (Such as
# Jet Lag: The Game season 13.5). These are generally listed as specials
# (Season 0) on TMDB, so treat them the same way.
#
# Specials episode numbers will then likely be off, use caution and
# check TMDB for manual corrections.
season_number = 0
self . log . warn ( f " Could not extract season information, guessing season { season_number } " )
for episode_number , episode in enumerate ( season [ " episodes " ] , start = 1 ) :
if not episode [ " video " ] or ( video_id_filter and video_id_filter != episode [ " video " ] [ " id " ] ) :
continue
yield Episode (
id_ = episode [ " video " ] [ " id " ] ,
service = self . __class__ ,
title = channel [ " title " ] ,
name = episode [ " title " ] ,
language = " en " ,
year = episode [ " video " ] [ " published_at " ] [ 0 : 4 ] ,
season = season_number ,
number = episode_number ,
)
def get_content ( self , slug , video_id_filter = None ) :
r = self . session . get ( self . config [ " endpoints " ] [ " content " ] . format ( slug = slug ) )
content = r . json ( )
if content [ " type " ] == " season " :
r = self . session . get ( self . config [ " endpoints " ] [ " content " ] . format ( slug = content [ " video_channel_slug " ] ) )
channel = r . json ( )
return Series ( self . season_to_episodes ( channel , content , video_id_filter ) )
elif content [ " type " ] == " video_channel " and content [ " channel_type " ] == " episodic " :
episodes = [ ]
for season_data in content [ " episodic " ] [ " seasons " ] :
# We could also use the generic content endpoint to retrieve
# seasons, but this is how the nebula web app does it.
r = self . session . get ( self . config [ " endpoints " ] [ " season " ] . format ( id = season_data [ " id " ] ) )
episodes . extend ( self . season_to_episodes ( content , r . json ( ) , video_id_filter ) )
return Series ( episodes )
elif content [ " type " ] == " video_channel " :
self . log . error ( " Non-episodic channel URL passed. Treating it as a show with a single season. If you want to download non-episodic content as a movie, pass the direct video URL instead. " )
r = self . session . get ( self . config [ " endpoints " ] [ " video_channel_episodes " ] . format ( id = content [ " id " ] ) )
episodes = r . json ( ) [ ' results ' ]
# Non-episodic channel names tend to have a format of "Creator Name — Show Name"
if " — " in content [ " title " ] :
show_title = content [ " title " ] . split ( " — " , maxsplit = 1 ) [ 1 ]
else :
show_title = content [ " title " ]
season = [ ]
episode_number = 0
for episode in episodes :
if ' trailer ' in episode [ ' title ' ] . lower ( ) :
continue
episode_number + = 1
season . append ( Episode (
id_ = episode [ " id " ] ,
service = self . __class__ ,
title = show_title ,
name = episode [ " title " ] ,
language = " en " ,
year = episode [ " published_at " ] [ 0 : 4 ] ,
season = 1 ,
number = episode_number ,
) )
return Series ( season )
else :
self . log . error ( " Unsupported content type " )
sys . exit ( 1 )