from __future__ import annotations import base64 import html import logging import math import re import sys from copy import copy from functools import partial from pathlib import Path from typing import Any, Callable, Optional, Union from urllib.parse import urljoin, urlparse from uuid import UUID from zlib import crc32 import requests from langcodes import Language, tag_is_valid from lxml.etree import Element, ElementTree from pywidevine.cdm import Cdm as WidevineCdm from pywidevine.pssh import PSSH from requests import Session from devine.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack from devine.core.downloaders import requests as requests_downloader from devine.core.drm import Widevine from devine.core.events import events from devine.core.tracks import Audio, Subtitle, Tracks, Video from devine.core.utilities import is_close_match, try_ensure_utf8 from devine.core.utils.xml import load_xml class DASH: def __init__(self, manifest, url: str): if manifest is None: raise ValueError("DASH manifest must be provided.") if manifest.tag != "MPD": raise TypeError(f"Expected 'MPD' document, but received a '{manifest.tag}' document instead.") if not url: raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.") if not isinstance(url, str): raise TypeError(f"Expected url to be a {str}, not {url!r}") self.manifest = manifest self.url = url @classmethod def from_url(cls, url: str, session: Optional[Session] = None, **args: Any) -> DASH: if not url: raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.") if not isinstance(url, str): raise TypeError(f"Expected url to be a {str}, not {url!r}") if not session: session = Session() elif not isinstance(session, Session): raise TypeError(f"Expected session to be a {Session}, not {session!r}") res = session.get(url, **args) if res.url != url: url = res.url if not res.ok: raise requests.ConnectionError( "Failed to request the MPD document.", response=res ) return DASH.from_text(res.text, url) @classmethod def from_text(cls, text: str, url: str) -> DASH: if not text: raise ValueError("DASH manifest Text must be provided.") if not isinstance(text, str): raise TypeError(f"Expected text to be a {str}, not {text!r}") if not url: raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.") if not isinstance(url, str): raise TypeError(f"Expected url to be a {str}, not {url!r}") manifest = load_xml(text) return cls(manifest, url) def to_tracks( self, language: Optional[Union[str, Language]] = None, period_filter: Optional[Callable] = None ) -> Tracks: """ Convert an MPEG-DASH document to Video, Audio and Subtitle Track objects. Parameters: language: The Title's Original Recorded Language. It will also be used as a fallback track language value if the manifest does not list language information. period_filter: Filter out period's within the manifest. All Track URLs will be a list of segment URLs. """ tracks = Tracks() for period in self.manifest.findall("Period"): if callable(period_filter) and period_filter(period): continue for adaptation_set in period.findall("AdaptationSet"): if self.is_trick_mode(adaptation_set): # we don't want trick mode streams (they are only used for fast-forward/rewind) continue for rep in adaptation_set.findall("Representation"): get = partial(self._get, adaptation_set=adaptation_set, representation=rep) findall = partial(self._findall, adaptation_set=adaptation_set, representation=rep, both=True) segment_base = rep.find("SegmentBase") codecs = get("codecs") content_type = get("contentType") mime_type = get("mimeType") if not content_type and mime_type: content_type = mime_type.split("/")[0] if not content_type and not mime_type: raise ValueError("Unable to determine the format of a Representation, cannot continue...") if mime_type == "application/mp4" or content_type == "application": # likely mp4-boxed subtitles # TODO: It may not actually be subtitles try: real_codec = Subtitle.Codec.from_mime(codecs) content_type = "text" mime_type = f"application/mp4; codecs='{real_codec.value.lower()}'" except ValueError: raise ValueError(f"Unsupported content type '{content_type}' with codecs of '{codecs}'") if content_type == "text" and mime_type and "/mp4" not in mime_type: # mimeType likely specifies the subtitle codec better than `codecs` codecs = mime_type.split("/")[1] if content_type == "video": track_type = Video track_codec = Video.Codec.from_codecs(codecs) track_fps = get("frameRate") if not track_fps and segment_base is not None: track_fps = segment_base.get("timescale") track_args = dict( range_=self.get_video_range( codecs, findall("SupplementalProperty"), findall("EssentialProperty") ), bitrate=get("bandwidth") or None, width=get("width") or 0, height=get("height") or 0, fps=track_fps or None ) elif content_type == "audio": track_type = Audio track_codec = Audio.Codec.from_codecs(codecs) track_args = dict( bitrate=get("bandwidth") or None, channels=next(iter( rep.xpath("AudioChannelConfiguration/@value") or adaptation_set.xpath("AudioChannelConfiguration/@value") ), None), joc=self.get_ddp_complexity_index(adaptation_set, rep), descriptive=self.is_descriptive(adaptation_set) ) elif content_type == "text": track_type = Subtitle track_codec = Subtitle.Codec.from_codecs(codecs or "vtt") track_args = dict( cc=self.is_closed_caption(adaptation_set), sdh=self.is_sdh(adaptation_set), forced=self.is_forced(adaptation_set) ) elif content_type == "image": # we don't want what's likely thumbnails for the seekbar continue else: raise ValueError(f"Unknown Track Type '{content_type}'") track_lang = self.get_language(adaptation_set, rep, fallback=language) if not track_lang: msg = "Language information could not be derived from a Representation." if language is None: msg += " No fallback language was provided when calling DASH.to_tracks()." elif not tag_is_valid((str(language) or "").strip()) or str(language).startswith("und"): msg += f" The fallback language provided is also invalid: {language}" raise ValueError(msg) # for some reason it's incredibly common for services to not provide # a good and actually unique track ID, sometimes because of the lang # dialect not being represented in the id, or the bitrate, or such. # this combines all of them as one and hashes it to keep it small(ish). track_id = hex(crc32("{codec}-{lang}-{bitrate}-{base_url}-{ids}-{track_args}".format( codec=codecs, lang=track_lang, bitrate=get("bitrate"), base_url=(rep.findtext("BaseURL") or "").split("?")[0], ids=[get("audioTrackId"), get("id"), period.get("id")], track_args=track_args ).encode()))[2:] tracks.add(track_type( id_=track_id, url=self.url, codec=track_codec, language=track_lang, is_original_lang=language and is_close_match(track_lang, [language]), descriptor=Video.Descriptor.DASH, data={ "dash": { "manifest": self.manifest, "period": period, "adaptation_set": adaptation_set, "representation": rep } }, **track_args )) # only get tracks from the first main-content period break return tracks @staticmethod def download_track( track: AnyTrack, save_path: Path, save_dir: Path, progress: partial, session: Optional[Session] = None, proxy: Optional[str] = None, max_workers: Optional[int] = None, license_widevine: Optional[Callable] = None ): if not session: session = Session() elif not isinstance(session, Session): raise TypeError(f"Expected session to be a {Session}, not {session!r}") if proxy: session.proxies.update({ "all": proxy }) log = logging.getLogger("DASH") manifest: ElementTree = track.data["dash"]["manifest"] period: Element = track.data["dash"]["period"] adaptation_set: Element = track.data["dash"]["adaptation_set"] representation: Element = track.data["dash"]["representation"] track.drm = DASH.get_drm( representation.findall("ContentProtection") + adaptation_set.findall("ContentProtection") ) manifest_base_url = manifest.findtext("BaseURL") if not manifest_base_url: manifest_base_url = track.url elif not re.match("^https?://", manifest_base_url, re.IGNORECASE): manifest_base_url = urljoin(track.url, f"./{manifest_base_url}") period_base_url = urljoin(manifest_base_url, period.findtext("BaseURL")) rep_base_url = urljoin(period_base_url, representation.findtext("BaseURL")) period_duration = period.get("duration") or manifest.get("mediaPresentationDuration") init_data: Optional[bytes] = None segment_template = representation.find("SegmentTemplate") if segment_template is None: segment_template = adaptation_set.find("SegmentTemplate") segment_list = representation.find("SegmentList") if segment_list is None: segment_list = adaptation_set.find("SegmentList") segment_base = representation.find("SegmentBase") if segment_base is None: segment_base = adaptation_set.find("SegmentBase") segments: list[tuple[str, Optional[str]]] = [] segment_timescale: float = 0 segment_durations: list[int] = [] track_kid: Optional[UUID] = None if segment_template is not None: segment_template = copy(segment_template) start_number = int(segment_template.get("startNumber") or 1) end_number = int(segment_template.get("endNumber") or 0) or None segment_timeline = segment_template.find("SegmentTimeline") segment_timescale = float(segment_template.get("timescale") or 1) for item in ("initialization", "media"): value = segment_template.get(item) if not value: continue if not re.match("^https?://", value, re.IGNORECASE): if not rep_base_url: raise ValueError("Resolved Segment URL is not absolute, and no Base URL is available.") value = urljoin(rep_base_url, value) if not urlparse(value).query: manifest_url_query = urlparse(track.url).query if manifest_url_query: value += f"?{manifest_url_query}" segment_template.set(item, value) init_url = segment_template.get("initialization") if init_url: res = session.get(DASH.replace_fields( init_url, Bandwidth=representation.get("bandwidth"), RepresentationID=representation.get("id") )) res.raise_for_status() init_data = res.content track_kid = track.get_key_id(init_data) if segment_timeline is not None: current_time = 0 for s in segment_timeline.findall("S"): if s.get("t"): current_time = int(s.get("t")) for _ in range(1 + (int(s.get("r") or 0))): segment_durations.append(current_time) current_time += int(s.get("d")) if not end_number: end_number = len(segment_durations) for t, n in zip(segment_durations, range(start_number, end_number + 1)): segments.append(( DASH.replace_fields( segment_template.get("media"), Bandwidth=representation.get("bandwidth"), Number=n, RepresentationID=representation.get("id"), Time=t ), None )) else: if not period_duration: raise ValueError("Duration of the Period was unable to be determined.") period_duration = DASH.pt_to_sec(period_duration) segment_duration = float(segment_template.get("duration")) or 1 if not end_number: end_number = math.ceil(period_duration / (segment_duration / segment_timescale)) for s in range(start_number, end_number + 1): segments.append(( DASH.replace_fields( segment_template.get("media"), Bandwidth=representation.get("bandwidth"), Number=s, RepresentationID=representation.get("id"), Time=s ), None )) # TODO: Should we floor/ceil/round, or is int() ok? segment_durations.append(int(segment_duration)) elif segment_list is not None: segment_timescale = float(segment_list.get("timescale") or 1) init_data = None initialization = segment_list.find("Initialization") if initialization is not None: source_url = initialization.get("sourceURL") if not source_url: source_url = rep_base_url elif not re.match("^https?://", source_url, re.IGNORECASE): source_url = urljoin(rep_base_url, f"./{source_url}") if initialization.get("range"): init_range_header = {"Range": f"bytes={initialization.get('range')}"} else: init_range_header = None res = session.get(url=source_url, headers=init_range_header) res.raise_for_status() init_data = res.content track_kid = track.get_key_id(init_data) segment_urls = segment_list.findall("SegmentURL") for segment_url in segment_urls: media_url = segment_url.get("media") if not media_url: media_url = rep_base_url elif not re.match("^https?://", media_url, re.IGNORECASE): media_url = urljoin(rep_base_url, f"./{media_url}") segments.append(( media_url, segment_url.get("mediaRange") )) segment_durations.append(int(segment_url.get("duration") or 1)) elif segment_base is not None: media_range = None init_data = None initialization = segment_base.find("Initialization") if initialization is not None: if initialization.get("range"): init_range_header = {"Range": f"bytes={initialization.get('range')}"} else: init_range_header = None res = session.get(url=rep_base_url, headers=init_range_header) res.raise_for_status() init_data = res.content track_kid = track.get_key_id(init_data) total_size = res.headers.get("Content-Range", "").split("/")[-1] if total_size: media_range = f"{len(init_data)}-{total_size}" segments.append(( rep_base_url, media_range )) elif rep_base_url: segments.append(( rep_base_url, None )) else: log.error("Could not find a way to get segments from this MPD manifest.") log.debug(track.url) sys.exit(1) # TODO: Should we floor/ceil/round, or is int() ok? track.data["dash"]["timescale"] = int(segment_timescale) track.data["dash"]["segment_durations"] = segment_durations if not track.drm and isinstance(track, (Video, Audio)): try: track.drm = [Widevine.from_init_data(init_data)] except Widevine.Exceptions.PSSHNotFound: # it might not have Widevine DRM, or might not have found the PSSH log.warning("No Widevine PSSH was found for this track, is it DRM free?") if track.drm: # last chance to find the KID, assumes first segment will hold the init data track_kid = track_kid or track.get_key_id(url=segments[0][0], session=session) # TODO: What if we don't want to use the first DRM system? drm = track.drm[0] if isinstance(drm, Widevine): # license and grab content keys try: if not license_widevine: raise ValueError("license_widevine func must be supplied to use Widevine DRM") progress(downloaded="LICENSING") license_widevine(drm, track_kid=track_kid) progress(downloaded="[yellow]LICENSED") except Exception: # noqa DOWNLOAD_CANCELLED.set() # skip pending track downloads progress(downloaded="[red]FAILED") raise else: drm = None if DOWNLOAD_LICENCE_ONLY.is_set(): progress(downloaded="[yellow]SKIPPED") return progress(total=len(segments)) downloader = track.downloader if downloader.__name__ == "aria2c" and any(bytes_range is not None for url, bytes_range in segments): # aria2(c) is shit and doesn't support the Range header, fallback to the requests downloader downloader = requests_downloader log.warning("Falling back to the requests downloader as aria2(c) doesn't support the Range header") for status_update in downloader( urls=[ { "url": url, "headers": { "Range": f"bytes={bytes_range}" } if bytes_range else {} } for url, bytes_range in segments ], output_dir=save_dir, filename="{i:0%d}.mp4" % (len(str(len(segments)))), headers=session.headers, cookies=session.cookies, proxy=proxy, max_workers=max_workers ): file_downloaded = status_update.get("file_downloaded") if file_downloaded: events.emit(events.Types.SEGMENT_DOWNLOADED, track=track, segment=file_downloaded) else: downloaded = status_update.get("downloaded") if downloaded and downloaded.endswith("/s"): status_update["downloaded"] = f"DASH {downloaded}" progress(**status_update) # see https://github.com/devine-dl/devine/issues/71 for control_file in save_dir.glob("*.aria2__temp"): control_file.unlink() segments_to_merge = [ x for x in sorted(save_dir.iterdir()) if x.is_file() ] with open(save_path, "wb") as f: if init_data: f.write(init_data) if len(segments_to_merge) > 1: progress(downloaded="Merging", completed=0, total=len(segments_to_merge)) for segment_file in segments_to_merge: segment_data = segment_file.read_bytes() # TODO: fix encoding after decryption? if ( not drm and isinstance(track, Subtitle) and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML) ): segment_data = try_ensure_utf8(segment_data) segment_data = segment_data.decode("utf8"). \ replace("‎", html.unescape("‎")). \ replace("‏", html.unescape("‏")). \ encode("utf8") f.write(segment_data) f.flush() segment_file.unlink() progress(advance=1) track.path = save_path events.emit(events.Types.TRACK_DOWNLOADED, track=track) if drm: progress(downloaded="Decrypting", completed=0, total=100) drm.decrypt(save_path) track.drm = None events.emit( events.Types.TRACK_DECRYPTED, track=track, drm=drm, segment=None ) progress(downloaded="Decrypting", advance=100) save_dir.rmdir() progress(downloaded="Downloaded") @staticmethod def _get( item: str, adaptation_set: Element, representation: Optional[Element] = None ) -> Optional[Any]: """Helper to get a requested item from the Representation, otherwise from the AdaptationSet.""" adaptation_set_item = adaptation_set.get(item) if representation is None: return adaptation_set_item representation_item = representation.get(item) if representation_item is not None: return representation_item return adaptation_set_item @staticmethod def _findall( item: str, adaptation_set: Element, representation: Optional[Element] = None, both: bool = False ) -> list[Any]: """ Helper to get all requested items from the Representation, otherwise from the AdaptationSet. Optionally, you may pass both=True to keep both values (where available). """ adaptation_set_items = adaptation_set.findall(item) if representation is None: return adaptation_set_items representation_items = representation.findall(item) if both: return representation_items + adaptation_set_items if representation_items: return representation_items return adaptation_set_items @staticmethod def get_language( adaptation_set: Element, representation: Optional[Element] = None, fallback: Optional[Union[str, Language]] = None ) -> Optional[Language]: """ Get Language (if any) from the AdaptationSet or Representation. A fallback language may be provided if no language information could be retrieved. """ options = [] if representation is not None: options.append(representation.get("lang")) # derive language from somewhat common id string format # the format is typically "{rep_id}_{lang}={bitrate}" or similar rep_id = representation.get("id") if rep_id: m = re.match(r"\w+_(\w+)=\d+", rep_id) if m: options.append(m.group(1)) options.append(adaptation_set.get("lang")) if fallback: options.append(fallback) for option in options: option = (str(option) or "").strip() if not tag_is_valid(option) or option.startswith("und"): continue return Language.get(option) @staticmethod def get_video_range( codecs: str, all_supplemental_props: list[Element], all_essential_props: list[Element] ) -> Video.Range: if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")): return Video.Range.DV return Video.Range.from_cicp( primaries=next(( int(x.get("value")) for x in all_supplemental_props + all_essential_props if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries" ), 0), transfer=next(( int(x.get("value")) for x in all_supplemental_props + all_essential_props if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics" ), 0), matrix=next(( int(x.get("value")) for x in all_supplemental_props + all_essential_props if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients" ), 0) ) @staticmethod def is_trick_mode(adaptation_set: Element) -> bool: """Check if contents of Adaptation Set is a Trick-Mode stream.""" essential_props = adaptation_set.findall("EssentialProperty") supplemental_props = adaptation_set.findall("SupplementalProperty") return any( prop.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode" for prop in essential_props + supplemental_props ) @staticmethod def is_descriptive(adaptation_set: Element) -> bool: """Check if contents of Adaptation Set is Descriptive.""" return any( (x.get("schemeIdUri"), x.get("value")) in ( ("urn:mpeg:dash:role:2011", "descriptive"), ("urn:tva:metadata:cs:AudioPurposeCS:2007", "1") ) for x in adaptation_set.findall("Accessibility") ) @staticmethod def is_forced(adaptation_set: Element) -> bool: """Check if contents of Adaptation Set is a Forced Subtitle.""" return any( x.get("schemeIdUri") == "urn:mpeg:dash:role:2011" and x.get("value") in ("forced-subtitle", "forced_subtitle") for x in adaptation_set.findall("Role") ) @staticmethod def is_sdh(adaptation_set: Element) -> bool: """Check if contents of Adaptation Set is for the Hearing Impaired.""" return any( (x.get("schemeIdUri"), x.get("value")) == ("urn:tva:metadata:cs:AudioPurposeCS:2007", "2") for x in adaptation_set.findall("Accessibility") ) @staticmethod def is_closed_caption(adaptation_set: Element) -> bool: """Check if contents of Adaptation Set is a Closed Caption Subtitle.""" return any( (x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption") for x in adaptation_set.findall("Role") ) @staticmethod def get_ddp_complexity_index(adaptation_set: Element, representation: Optional[Element]) -> Optional[int]: """Get the DD+ Complexity Index (if any) from the AdaptationSet or Representation.""" return next(( int(x.get("value")) for x in DASH._findall("SupplementalProperty", adaptation_set, representation, both=True) if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018" ), None) @staticmethod def get_drm(protections: list[Element]) -> list[Widevine]: drm = [] for protection in protections: # TODO: Add checks for PlayReady, FairPlay, maybe more urn = (protection.get("schemeIdUri") or "").lower() if urn != WidevineCdm.urn: continue pssh = protection.findtext("pssh") if not pssh: continue pssh = PSSH(pssh) kid = protection.get("kid") if kid: kid = UUID(bytes=base64.b64decode(kid)) default_kid = protection.get("default_KID") if default_kid: kid = UUID(default_kid) if not pssh.key_ids and not kid: # weird manifest, look across all protections for a default_KID kid = next(( UUID(protection.get("default_KID")) for protection in protections if protection.get("default_KID") ), None) drm.append(Widevine( pssh=pssh, kid=kid )) return drm @staticmethod def pt_to_sec(d: Union[str, float]) -> float: if isinstance(d, float): return d has_ymd = d[0:8] == "P0Y0M0DT" if d[0:2] != "PT" and not has_ymd: raise ValueError("Input data is not a valid time string.") if has_ymd: d = d[6:].upper() # skip `P0Y0M0DT` else: d = d[2:].upper() # skip `PT` m = re.findall(r"([\d.]+.)", d) return sum( float(x[0:-1]) * {"H": 60 * 60, "M": 60, "S": 1}[x[-1].upper()] for x in m ) @staticmethod def replace_fields(url: str, **kwargs: Any) -> str: for field, value in kwargs.items(): url = url.replace(f"${field}$", str(value)) m = re.search(fr"\${re.escape(field)}%([a-z0-9]+)\$", url, flags=re.I) if m: url = url.replace(m.group(), f"{value:{m.group(1)}}") return url __all__ = ("DASH",)