mirror of
https://github.com/devine-dl/devine.git
synced 2025-04-29 17:49:44 +00:00
This applies the X-TIMESTAMP-MAP data to timestamps as it reads through a concatenated (merged) WebVTT file to correct timestamps on segmented WebVTT streams. It then removes the X-TIMESTAMP-MAP header. The timescale and segment duration information is saved in the Subtitle's data dictionary under the hls/dash key: timescale (dash-only) and segment_durations. Note that this information will only be available post-download. This is done regardless if you are converting to another subtitle or not, since the downloader automatically and forcefully concatenated the segmented subtitle data. We do not support the use of segmented Subtitles for downloading or otherwise, nor do we plan to.
773 lines
31 KiB
Python
773 lines
31 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import html
|
|
import logging
|
|
import math
|
|
import re
|
|
import sys
|
|
from copy import copy
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional, Union
|
|
from urllib.parse import urljoin, urlparse
|
|
from uuid import UUID
|
|
from zlib import crc32
|
|
|
|
import requests
|
|
from langcodes import Language, tag_is_valid
|
|
from lxml.etree import Element, ElementTree
|
|
from pywidevine.cdm import Cdm as WidevineCdm
|
|
from pywidevine.pssh import PSSH
|
|
from requests import Session
|
|
|
|
from devine.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
|
|
from devine.core.downloaders import requests as requests_downloader
|
|
from devine.core.drm import Widevine
|
|
from devine.core.events import events
|
|
from devine.core.tracks import Audio, Subtitle, Tracks, Video
|
|
from devine.core.utilities import is_close_match, try_ensure_utf8
|
|
from devine.core.utils.xml import load_xml
|
|
|
|
|
|
class DASH:
|
|
def __init__(self, manifest, url: str):
|
|
if manifest is None:
|
|
raise ValueError("DASH manifest must be provided.")
|
|
if manifest.tag != "MPD":
|
|
raise TypeError(f"Expected 'MPD' document, but received a '{manifest.tag}' document instead.")
|
|
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
self.manifest = manifest
|
|
self.url = url
|
|
|
|
@classmethod
|
|
def from_url(cls, url: str, session: Optional[Session] = None, **args: Any) -> DASH:
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
if not session:
|
|
session = Session()
|
|
elif not isinstance(session, Session):
|
|
raise TypeError(f"Expected session to be a {Session}, not {session!r}")
|
|
|
|
res = session.get(url, **args)
|
|
if res.url != url:
|
|
url = res.url
|
|
|
|
if not res.ok:
|
|
raise requests.ConnectionError(
|
|
"Failed to request the MPD document.",
|
|
response=res
|
|
)
|
|
|
|
return DASH.from_text(res.text, url)
|
|
|
|
@classmethod
|
|
def from_text(cls, text: str, url: str) -> DASH:
|
|
if not text:
|
|
raise ValueError("DASH manifest Text must be provided.")
|
|
if not isinstance(text, str):
|
|
raise TypeError(f"Expected text to be a {str}, not {text!r}")
|
|
|
|
if not url:
|
|
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
|
|
if not isinstance(url, str):
|
|
raise TypeError(f"Expected url to be a {str}, not {url!r}")
|
|
|
|
manifest = load_xml(text)
|
|
|
|
return cls(manifest, url)
|
|
|
|
def to_tracks(
|
|
self,
|
|
language: Optional[Union[str, Language]] = None,
|
|
period_filter: Optional[Callable] = None
|
|
) -> Tracks:
|
|
"""
|
|
Convert an MPEG-DASH document to Video, Audio and Subtitle Track objects.
|
|
|
|
Parameters:
|
|
language: The Title's Original Recorded Language. It will also be used as a fallback
|
|
track language value if the manifest does not list language information.
|
|
period_filter: Filter out period's within the manifest.
|
|
|
|
All Track URLs will be a list of segment URLs.
|
|
"""
|
|
tracks = Tracks()
|
|
|
|
for period in self.manifest.findall("Period"):
|
|
if callable(period_filter) and period_filter(period):
|
|
continue
|
|
|
|
for adaptation_set in period.findall("AdaptationSet"):
|
|
if self.is_trick_mode(adaptation_set):
|
|
# we don't want trick mode streams (they are only used for fast-forward/rewind)
|
|
continue
|
|
|
|
for rep in adaptation_set.findall("Representation"):
|
|
get = partial(self._get, adaptation_set=adaptation_set, representation=rep)
|
|
findall = partial(self._findall, adaptation_set=adaptation_set, representation=rep, both=True)
|
|
segment_base = rep.find("SegmentBase")
|
|
|
|
codecs = get("codecs")
|
|
content_type = get("contentType")
|
|
mime_type = get("mimeType")
|
|
|
|
if not content_type and mime_type:
|
|
content_type = mime_type.split("/")[0]
|
|
if not content_type and not mime_type:
|
|
raise ValueError("Unable to determine the format of a Representation, cannot continue...")
|
|
|
|
if mime_type == "application/mp4" or content_type == "application":
|
|
# likely mp4-boxed subtitles
|
|
# TODO: It may not actually be subtitles
|
|
try:
|
|
real_codec = Subtitle.Codec.from_mime(codecs)
|
|
content_type = "text"
|
|
mime_type = f"application/mp4; codecs='{real_codec.value.lower()}'"
|
|
except ValueError:
|
|
raise ValueError(f"Unsupported content type '{content_type}' with codecs of '{codecs}'")
|
|
|
|
if content_type == "text" and mime_type and "/mp4" not in mime_type:
|
|
# mimeType likely specifies the subtitle codec better than `codecs`
|
|
codecs = mime_type.split("/")[1]
|
|
|
|
if content_type == "video":
|
|
track_type = Video
|
|
track_codec = Video.Codec.from_codecs(codecs)
|
|
track_fps = get("frameRate")
|
|
if not track_fps and segment_base is not None:
|
|
track_fps = segment_base.get("timescale")
|
|
|
|
track_args = dict(
|
|
range_=self.get_video_range(
|
|
codecs,
|
|
findall("SupplementalProperty"),
|
|
findall("EssentialProperty")
|
|
),
|
|
bitrate=get("bandwidth") or None,
|
|
width=get("width") or 0,
|
|
height=get("height") or 0,
|
|
fps=track_fps or None
|
|
)
|
|
elif content_type == "audio":
|
|
track_type = Audio
|
|
track_codec = Audio.Codec.from_codecs(codecs)
|
|
track_args = dict(
|
|
bitrate=get("bandwidth") or None,
|
|
channels=next(iter(
|
|
rep.xpath("AudioChannelConfiguration/@value")
|
|
or adaptation_set.xpath("AudioChannelConfiguration/@value")
|
|
), None),
|
|
joc=self.get_ddp_complexity_index(adaptation_set, rep),
|
|
descriptive=self.is_descriptive(adaptation_set)
|
|
)
|
|
elif content_type == "text":
|
|
track_type = Subtitle
|
|
track_codec = Subtitle.Codec.from_codecs(codecs or "vtt")
|
|
track_args = dict(
|
|
cc=self.is_closed_caption(adaptation_set),
|
|
sdh=self.is_sdh(adaptation_set),
|
|
forced=self.is_forced(adaptation_set)
|
|
)
|
|
elif content_type == "image":
|
|
# we don't want what's likely thumbnails for the seekbar
|
|
continue
|
|
else:
|
|
raise ValueError(f"Unknown Track Type '{content_type}'")
|
|
|
|
track_lang = self.get_language(adaptation_set, rep, fallback=language)
|
|
if not track_lang:
|
|
msg = "Language information could not be derived from a Representation."
|
|
if language is None:
|
|
msg += " No fallback language was provided when calling DASH.to_tracks()."
|
|
elif not tag_is_valid((str(language) or "").strip()) or str(language).startswith("und"):
|
|
msg += f" The fallback language provided is also invalid: {language}"
|
|
raise ValueError(msg)
|
|
|
|
# for some reason it's incredibly common for services to not provide
|
|
# a good and actually unique track ID, sometimes because of the lang
|
|
# dialect not being represented in the id, or the bitrate, or such.
|
|
# this combines all of them as one and hashes it to keep it small(ish).
|
|
track_id = hex(crc32("{codec}-{lang}-{bitrate}-{base_url}-{ids}-{track_args}".format(
|
|
codec=codecs,
|
|
lang=track_lang,
|
|
bitrate=get("bitrate"),
|
|
base_url=(rep.findtext("BaseURL") or "").split("?")[0],
|
|
ids=[get("audioTrackId"), get("id"), period.get("id")],
|
|
track_args=track_args
|
|
).encode()))[2:]
|
|
|
|
tracks.add(track_type(
|
|
id_=track_id,
|
|
url=self.url,
|
|
codec=track_codec,
|
|
language=track_lang,
|
|
is_original_lang=language and is_close_match(track_lang, [language]),
|
|
descriptor=Video.Descriptor.DASH,
|
|
data={
|
|
"dash": {
|
|
"manifest": self.manifest,
|
|
"period": period,
|
|
"adaptation_set": adaptation_set,
|
|
"representation": rep
|
|
}
|
|
},
|
|
**track_args
|
|
))
|
|
|
|
# only get tracks from the first main-content period
|
|
break
|
|
|
|
return tracks
|
|
|
|
@staticmethod
|
|
def download_track(
|
|
track: AnyTrack,
|
|
save_path: Path,
|
|
save_dir: Path,
|
|
progress: partial,
|
|
session: Optional[Session] = None,
|
|
proxy: Optional[str] = None,
|
|
max_workers: Optional[int] = None,
|
|
license_widevine: Optional[Callable] = None
|
|
):
|
|
if not session:
|
|
session = Session()
|
|
elif not isinstance(session, Session):
|
|
raise TypeError(f"Expected session to be a {Session}, not {session!r}")
|
|
|
|
if proxy:
|
|
session.proxies.update({
|
|
"all": proxy
|
|
})
|
|
|
|
log = logging.getLogger("DASH")
|
|
|
|
manifest: ElementTree = track.data["dash"]["manifest"]
|
|
period: Element = track.data["dash"]["period"]
|
|
adaptation_set: Element = track.data["dash"]["adaptation_set"]
|
|
representation: Element = track.data["dash"]["representation"]
|
|
|
|
track.drm = DASH.get_drm(
|
|
representation.findall("ContentProtection") +
|
|
adaptation_set.findall("ContentProtection")
|
|
)
|
|
|
|
manifest_base_url = manifest.findtext("BaseURL")
|
|
if not manifest_base_url:
|
|
manifest_base_url = track.url
|
|
elif not re.match("^https?://", manifest_base_url, re.IGNORECASE):
|
|
manifest_base_url = urljoin(track.url, f"./{manifest_base_url}")
|
|
period_base_url = urljoin(manifest_base_url, period.findtext("BaseURL"))
|
|
rep_base_url = urljoin(period_base_url, representation.findtext("BaseURL"))
|
|
|
|
period_duration = period.get("duration") or manifest.get("mediaPresentationDuration")
|
|
init_data: Optional[bytes] = None
|
|
|
|
segment_template = representation.find("SegmentTemplate")
|
|
if segment_template is None:
|
|
segment_template = adaptation_set.find("SegmentTemplate")
|
|
|
|
segment_list = representation.find("SegmentList")
|
|
if segment_list is None:
|
|
segment_list = adaptation_set.find("SegmentList")
|
|
|
|
segment_base = representation.find("SegmentBase")
|
|
if segment_base is None:
|
|
segment_base = adaptation_set.find("SegmentBase")
|
|
|
|
segments: list[tuple[str, Optional[str]]] = []
|
|
segment_timescale: float = 0
|
|
segment_durations: list[int] = []
|
|
track_kid: Optional[UUID] = None
|
|
|
|
if segment_template is not None:
|
|
segment_template = copy(segment_template)
|
|
start_number = int(segment_template.get("startNumber") or 1)
|
|
segment_timeline = segment_template.find("SegmentTimeline")
|
|
segment_timescale = float(segment_template.get("timescale") or 1)
|
|
|
|
for item in ("initialization", "media"):
|
|
value = segment_template.get(item)
|
|
if not value:
|
|
continue
|
|
if not re.match("^https?://", value, re.IGNORECASE):
|
|
if not rep_base_url:
|
|
raise ValueError("Resolved Segment URL is not absolute, and no Base URL is available.")
|
|
value = urljoin(rep_base_url, value)
|
|
if not urlparse(value).query:
|
|
manifest_url_query = urlparse(track.url).query
|
|
if manifest_url_query:
|
|
value += f"?{manifest_url_query}"
|
|
segment_template.set(item, value)
|
|
|
|
init_url = segment_template.get("initialization")
|
|
if init_url:
|
|
res = session.get(DASH.replace_fields(
|
|
init_url,
|
|
Bandwidth=representation.get("bandwidth"),
|
|
RepresentationID=representation.get("id")
|
|
))
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
|
|
if segment_timeline is not None:
|
|
current_time = 0
|
|
for s in segment_timeline.findall("S"):
|
|
if s.get("t"):
|
|
current_time = int(s.get("t"))
|
|
for _ in range(1 + (int(s.get("r") or 0))):
|
|
segment_durations.append(current_time)
|
|
current_time += int(s.get("d"))
|
|
seg_num_list = list(range(start_number, len(segment_durations) + start_number))
|
|
|
|
for t, n in zip(segment_durations, seg_num_list):
|
|
segments.append((
|
|
DASH.replace_fields(
|
|
segment_template.get("media"),
|
|
Bandwidth=representation.get("bandwidth"),
|
|
Number=n,
|
|
RepresentationID=representation.get("id"),
|
|
Time=t
|
|
), None
|
|
))
|
|
else:
|
|
if not period_duration:
|
|
raise ValueError("Duration of the Period was unable to be determined.")
|
|
period_duration = DASH.pt_to_sec(period_duration)
|
|
segment_duration = float(segment_template.get("duration")) or 1
|
|
total_segments = math.ceil(period_duration / (segment_duration / segment_timescale))
|
|
|
|
for s in range(start_number, start_number + total_segments):
|
|
segments.append((
|
|
DASH.replace_fields(
|
|
segment_template.get("media"),
|
|
Bandwidth=representation.get("bandwidth"),
|
|
Number=s,
|
|
RepresentationID=representation.get("id"),
|
|
Time=s
|
|
), None
|
|
))
|
|
# TODO: Should we floor/ceil/round, or is int() ok?
|
|
segment_durations.append(int(segment_duration))
|
|
elif segment_list is not None:
|
|
segment_timescale = float(segment_list.get("timescale") or 1)
|
|
|
|
init_data = None
|
|
initialization = segment_list.find("Initialization")
|
|
if initialization is not None:
|
|
source_url = initialization.get("sourceURL")
|
|
if not source_url:
|
|
source_url = rep_base_url
|
|
elif not re.match("^https?://", source_url, re.IGNORECASE):
|
|
source_url = urljoin(rep_base_url, f"./{source_url}")
|
|
|
|
if initialization.get("range"):
|
|
init_range_header = {"Range": f"bytes={initialization.get('range')}"}
|
|
else:
|
|
init_range_header = None
|
|
|
|
res = session.get(url=source_url, headers=init_range_header)
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
|
|
segment_urls = segment_list.findall("SegmentURL")
|
|
for segment_url in segment_urls:
|
|
media_url = segment_url.get("media")
|
|
if not media_url:
|
|
media_url = rep_base_url
|
|
elif not re.match("^https?://", media_url, re.IGNORECASE):
|
|
media_url = urljoin(rep_base_url, f"./{media_url}")
|
|
|
|
segments.append((
|
|
media_url,
|
|
segment_url.get("mediaRange")
|
|
))
|
|
segment_durations.append(int(segment_url.get("duration") or 1))
|
|
elif segment_base is not None:
|
|
media_range = None
|
|
init_data = None
|
|
initialization = segment_base.find("Initialization")
|
|
if initialization is not None:
|
|
if initialization.get("range"):
|
|
init_range_header = {"Range": f"bytes={initialization.get('range')}"}
|
|
else:
|
|
init_range_header = None
|
|
|
|
res = session.get(url=rep_base_url, headers=init_range_header)
|
|
res.raise_for_status()
|
|
init_data = res.content
|
|
track_kid = track.get_key_id(init_data)
|
|
total_size = res.headers.get("Content-Range", "").split("/")[-1]
|
|
if total_size:
|
|
media_range = f"{len(init_data)}-{total_size}"
|
|
|
|
segments.append((
|
|
rep_base_url,
|
|
media_range
|
|
))
|
|
elif rep_base_url:
|
|
segments.append((
|
|
rep_base_url,
|
|
None
|
|
))
|
|
else:
|
|
log.error("Could not find a way to get segments from this MPD manifest.")
|
|
log.debug(track.url)
|
|
sys.exit(1)
|
|
|
|
# TODO: Should we floor/ceil/round, or is int() ok?
|
|
track.data["dash"]["timescale"] = int(segment_timescale)
|
|
track.data["dash"]["segment_durations"] = segment_durations
|
|
|
|
if not track.drm and isinstance(track, (Video, Audio)):
|
|
try:
|
|
track.drm = [Widevine.from_init_data(init_data)]
|
|
except Widevine.Exceptions.PSSHNotFound:
|
|
# it might not have Widevine DRM, or might not have found the PSSH
|
|
log.warning("No Widevine PSSH was found for this track, is it DRM free?")
|
|
|
|
if track.drm:
|
|
# last chance to find the KID, assumes first segment will hold the init data
|
|
track_kid = track_kid or track.get_key_id(url=segments[0][0], session=session)
|
|
# TODO: What if we don't want to use the first DRM system?
|
|
drm = track.drm[0]
|
|
if isinstance(drm, Widevine):
|
|
# license and grab content keys
|
|
try:
|
|
if not license_widevine:
|
|
raise ValueError("license_widevine func must be supplied to use Widevine DRM")
|
|
progress(downloaded="LICENSING")
|
|
license_widevine(drm, track_kid=track_kid)
|
|
progress(downloaded="[yellow]LICENSED")
|
|
except Exception: # noqa
|
|
DOWNLOAD_CANCELLED.set() # skip pending track downloads
|
|
progress(downloaded="[red]FAILED")
|
|
raise
|
|
else:
|
|
drm = None
|
|
|
|
if DOWNLOAD_LICENCE_ONLY.is_set():
|
|
progress(downloaded="[yellow]SKIPPED")
|
|
return
|
|
|
|
progress(total=len(segments))
|
|
|
|
downloader = track.downloader
|
|
if downloader.__name__ == "aria2c" and any(bytes_range is not None for url, bytes_range in segments):
|
|
# aria2(c) is shit and doesn't support the Range header, fallback to the requests downloader
|
|
downloader = requests_downloader
|
|
|
|
for status_update in downloader(
|
|
urls=[
|
|
{
|
|
"url": url,
|
|
"headers": {
|
|
"Range": f"bytes={bytes_range}"
|
|
} if bytes_range else {}
|
|
}
|
|
for url, bytes_range in segments
|
|
],
|
|
output_dir=save_dir,
|
|
filename="{i:0%d}.mp4" % (len(str(len(segments)))),
|
|
headers=session.headers,
|
|
cookies=session.cookies,
|
|
proxy=proxy,
|
|
max_workers=max_workers
|
|
):
|
|
file_downloaded = status_update.get("file_downloaded")
|
|
if file_downloaded:
|
|
events.emit(events.Types.SEGMENT_DOWNLOADED, track=track, segment=file_downloaded)
|
|
else:
|
|
downloaded = status_update.get("downloaded")
|
|
if downloaded and downloaded.endswith("/s"):
|
|
status_update["downloaded"] = f"DASH {downloaded}"
|
|
progress(**status_update)
|
|
|
|
# see https://github.com/devine-dl/devine/issues/71
|
|
for control_file in save_dir.glob("*.aria2__temp"):
|
|
control_file.unlink()
|
|
|
|
segments_to_merge = [
|
|
x
|
|
for x in sorted(save_dir.iterdir())
|
|
if x.is_file()
|
|
]
|
|
with open(save_path, "wb") as f:
|
|
if init_data:
|
|
f.write(init_data)
|
|
if len(segments_to_merge) > 1:
|
|
progress(downloaded="Merging", completed=0, total=len(segments_to_merge))
|
|
for segment_file in segments_to_merge:
|
|
segment_data = segment_file.read_bytes()
|
|
# TODO: fix encoding after decryption?
|
|
if (
|
|
not drm and isinstance(track, Subtitle) and
|
|
track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML)
|
|
):
|
|
segment_data = try_ensure_utf8(segment_data)
|
|
segment_data = segment_data.decode("utf8"). \
|
|
replace("‎", html.unescape("‎")). \
|
|
replace("‏", html.unescape("‏")). \
|
|
encode("utf8")
|
|
f.write(segment_data)
|
|
f.flush()
|
|
segment_file.unlink()
|
|
progress(advance=1)
|
|
|
|
track.path = save_path
|
|
events.emit(events.Types.TRACK_DOWNLOADED, track=track)
|
|
|
|
if drm:
|
|
progress(downloaded="Decrypting", completed=0, total=100)
|
|
drm.decrypt(save_path)
|
|
track.drm = None
|
|
events.emit(
|
|
events.Types.TRACK_DECRYPTED,
|
|
track=track,
|
|
drm=drm,
|
|
segment=None
|
|
)
|
|
progress(downloaded="Decrypting", advance=100)
|
|
|
|
save_dir.rmdir()
|
|
|
|
progress(downloaded="Downloaded")
|
|
|
|
@staticmethod
|
|
def _get(
|
|
item: str,
|
|
adaptation_set: Element,
|
|
representation: Optional[Element] = None
|
|
) -> Optional[Any]:
|
|
"""Helper to get a requested item from the Representation, otherwise from the AdaptationSet."""
|
|
adaptation_set_item = adaptation_set.get(item)
|
|
if representation is None:
|
|
return adaptation_set_item
|
|
|
|
representation_item = representation.get(item)
|
|
if representation_item is not None:
|
|
return representation_item
|
|
|
|
return adaptation_set_item
|
|
|
|
@staticmethod
|
|
def _findall(
|
|
item: str,
|
|
adaptation_set: Element,
|
|
representation: Optional[Element] = None,
|
|
both: bool = False
|
|
) -> list[Any]:
|
|
"""
|
|
Helper to get all requested items from the Representation, otherwise from the AdaptationSet.
|
|
Optionally, you may pass both=True to keep both values (where available).
|
|
"""
|
|
adaptation_set_items = adaptation_set.findall(item)
|
|
if representation is None:
|
|
return adaptation_set_items
|
|
|
|
representation_items = representation.findall(item)
|
|
|
|
if both:
|
|
return representation_items + adaptation_set_items
|
|
|
|
if representation_items:
|
|
return representation_items
|
|
|
|
return adaptation_set_items
|
|
|
|
@staticmethod
|
|
def get_language(
|
|
adaptation_set: Element,
|
|
representation: Optional[Element] = None,
|
|
fallback: Optional[Union[str, Language]] = None
|
|
) -> Optional[Language]:
|
|
"""
|
|
Get Language (if any) from the AdaptationSet or Representation.
|
|
|
|
A fallback language may be provided if no language information could be
|
|
retrieved.
|
|
"""
|
|
options = []
|
|
|
|
if representation is not None:
|
|
options.append(representation.get("lang"))
|
|
# derive language from somewhat common id string format
|
|
# the format is typically "{rep_id}_{lang}={bitrate}" or similar
|
|
rep_id = representation.get("id")
|
|
if rep_id:
|
|
m = re.match(r"\w+_(\w+)=\d+", rep_id)
|
|
if m:
|
|
options.append(m.group(1))
|
|
|
|
options.append(adaptation_set.get("lang"))
|
|
|
|
if fallback:
|
|
options.append(fallback)
|
|
|
|
for option in options:
|
|
option = (str(option) or "").strip()
|
|
if not tag_is_valid(option) or option.startswith("und"):
|
|
continue
|
|
return Language.get(option)
|
|
|
|
@staticmethod
|
|
def get_video_range(
|
|
codecs: str,
|
|
all_supplemental_props: list[Element],
|
|
all_essential_props: list[Element]
|
|
) -> Video.Range:
|
|
if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")):
|
|
return Video.Range.DV
|
|
|
|
return Video.Range.from_cicp(
|
|
primaries=next((
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries"
|
|
), 0),
|
|
transfer=next((
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics"
|
|
), 0),
|
|
matrix=next((
|
|
int(x.get("value"))
|
|
for x in all_supplemental_props + all_essential_props
|
|
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients"
|
|
), 0)
|
|
)
|
|
|
|
@staticmethod
|
|
def is_trick_mode(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Trick-Mode stream."""
|
|
essential_props = adaptation_set.findall("EssentialProperty")
|
|
supplemental_props = adaptation_set.findall("SupplementalProperty")
|
|
|
|
return any(
|
|
prop.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode"
|
|
for prop in essential_props + supplemental_props
|
|
)
|
|
|
|
@staticmethod
|
|
def is_descriptive(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is Descriptive."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value")) in (
|
|
("urn:mpeg:dash:role:2011", "descriptive"),
|
|
("urn:tva:metadata:cs:AudioPurposeCS:2007", "1")
|
|
)
|
|
for x in adaptation_set.findall("Accessibility")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_forced(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Forced Subtitle."""
|
|
return any(
|
|
x.get("schemeIdUri") == "urn:mpeg:dash:role:2011"
|
|
and x.get("value") in ("forced-subtitle", "forced_subtitle")
|
|
for x in adaptation_set.findall("Role")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_sdh(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is for the Hearing Impaired."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value")) == ("urn:tva:metadata:cs:AudioPurposeCS:2007", "2")
|
|
for x in adaptation_set.findall("Accessibility")
|
|
)
|
|
|
|
@staticmethod
|
|
def is_closed_caption(adaptation_set: Element) -> bool:
|
|
"""Check if contents of Adaptation Set is a Closed Caption Subtitle."""
|
|
return any(
|
|
(x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption")
|
|
for x in adaptation_set.findall("Role")
|
|
)
|
|
|
|
@staticmethod
|
|
def get_ddp_complexity_index(adaptation_set: Element, representation: Optional[Element]) -> Optional[int]:
|
|
"""Get the DD+ Complexity Index (if any) from the AdaptationSet or Representation."""
|
|
return next((
|
|
int(x.get("value"))
|
|
for x in DASH._findall("SupplementalProperty", adaptation_set, representation, both=True)
|
|
if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018"
|
|
), None)
|
|
|
|
@staticmethod
|
|
def get_drm(protections: list[Element]) -> list[Widevine]:
|
|
drm = []
|
|
|
|
for protection in protections:
|
|
# TODO: Add checks for PlayReady, FairPlay, maybe more
|
|
urn = (protection.get("schemeIdUri") or "").lower()
|
|
if urn != WidevineCdm.urn:
|
|
continue
|
|
|
|
pssh = protection.findtext("pssh")
|
|
if not pssh:
|
|
continue
|
|
pssh = PSSH(pssh)
|
|
|
|
kid = protection.get("kid")
|
|
if kid:
|
|
kid = UUID(bytes=base64.b64decode(kid))
|
|
|
|
default_kid = protection.get("default_KID")
|
|
if default_kid:
|
|
kid = UUID(default_kid)
|
|
|
|
if not pssh.key_ids and not kid:
|
|
# weird manifest, look across all protections for a default_KID
|
|
kid = next((
|
|
UUID(protection.get("default_KID"))
|
|
for protection in protections
|
|
if protection.get("default_KID")
|
|
), None)
|
|
|
|
drm.append(Widevine(
|
|
pssh=pssh,
|
|
kid=kid
|
|
))
|
|
|
|
return drm
|
|
|
|
@staticmethod
|
|
def pt_to_sec(d: Union[str, float]) -> float:
|
|
if isinstance(d, float):
|
|
return d
|
|
has_ymd = d[0:8] == "P0Y0M0DT"
|
|
if d[0:2] != "PT" and not has_ymd:
|
|
raise ValueError("Input data is not a valid time string.")
|
|
if has_ymd:
|
|
d = d[6:].upper() # skip `P0Y0M0DT`
|
|
else:
|
|
d = d[2:].upper() # skip `PT`
|
|
m = re.findall(r"([\d.]+.)", d)
|
|
return sum(
|
|
float(x[0:-1]) * {"H": 60 * 60, "M": 60, "S": 1}[x[-1].upper()]
|
|
for x in m
|
|
)
|
|
|
|
@staticmethod
|
|
def replace_fields(url: str, **kwargs: Any) -> str:
|
|
for field, value in kwargs.items():
|
|
url = url.replace(f"${field}$", str(value))
|
|
m = re.search(fr"\${re.escape(field)}%([a-z0-9]+)\$", url, flags=re.I)
|
|
if m:
|
|
url = url.replace(m.group(), f"{value:{m.group(1)}}")
|
|
return url
|
|
|
|
|
|
__all__ = ("DASH",)
|