diff --git a/mp4decrypt-for-devine/binaries.py b/mp4decrypt-for-devine/binaries.py new file mode 100644 index 0000000..fbff914 --- /dev/null +++ b/mp4decrypt-for-devine/binaries.py @@ -0,0 +1,46 @@ +import shutil +import sys +from pathlib import Path +from typing import Optional + +__shaka_platform = { + "win32": "win", + "darwin": "osx" +}.get(sys.platform, sys.platform) + + +def find(*names: str) -> Optional[Path]: + """Find the path of the first found binary name.""" + for name in names: + path = shutil.which(name) + if path: + return Path(path) + return None + + +FFMPEG = find("ffmpeg") +FFProbe = find("ffprobe") +FFPlay = find("ffplay") +SubtitleEdit = find("SubtitleEdit") +ShakaPackager = find( + "shaka-packager", + "packager", + f"packager-{__shaka_platform}", + f"packager-{__shaka_platform}-arm64", + f"packager-{__shaka_platform}-x64" +) +Aria2 = find("aria2c", "aria2") +CCExtractor = find( + "ccextractor", + "ccextractorwin", + "ccextractorwinfull" +) +HolaProxy = find("hola-proxy") +MPV = find("mpv") +Caddy = find("caddy") +MP4Decrypt = find("mp4decrypt") + +__all__ = ( + "FFMPEG", "FFProbe", "FFPlay", "SubtitleEdit", "ShakaPackager", + "Aria2", "CCExtractor", "HolaProxy", "MPV", "Caddy", "MP4Decrypt", "find" +) diff --git a/mp4decrypt-for-devine/mp4decrypt.exe b/mp4decrypt-for-devine/mp4decrypt.exe new file mode 100644 index 0000000..46e2afe Binary files /dev/null and b/mp4decrypt-for-devine/mp4decrypt.exe differ diff --git a/mp4decrypt-for-devine/widevine.py b/mp4decrypt-for-devine/widevine.py new file mode 100644 index 0000000..f9b60f0 --- /dev/null +++ b/mp4decrypt-for-devine/widevine.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +import base64 +import shutil +import subprocess +import textwrap +from pathlib import Path +from typing import Any, Callable, Optional, Union +from uuid import UUID + +import m3u8 +from construct import Container +from pymp4.parser import Box +from pywidevine.cdm import Cdm as WidevineCdm +from pywidevine.pssh import PSSH +from requests import Session +from rich.text import Text + +from devine.core import binaries +from devine.core.config import config +from devine.core.console import console +from devine.core.constants import AnyTrack +from devine.core.utilities import get_boxes +from devine.core.utils.subprocess import ffprobe + +import logging + +class Widevine: + """Widevine DRM System.""" + def __init__(self, pssh: PSSH, kid: Union[UUID, str, bytes, None] = None, **kwargs: Any): + if not pssh: + raise ValueError("Provided PSSH is empty.") + if not isinstance(pssh, PSSH): + raise TypeError(f"Expected pssh to be a {PSSH}, not {pssh!r}") + + if pssh.system_id == PSSH.SystemId.PlayReady: + pssh.to_widevine() + + if kid: + if isinstance(kid, str): + kid = UUID(hex=kid) + elif isinstance(kid, bytes): + kid = UUID(bytes=kid) + if not isinstance(kid, UUID): + raise ValueError(f"Expected kid to be a {UUID}, str, or bytes, not {kid!r}") + pssh.set_key_ids([kid]) + + self._pssh = pssh + + if not self.kids: + raise Widevine.Exceptions.KIDNotFound("No Key ID was found within PSSH and none were provided.") + + self.content_keys: dict[UUID, str] = {} + self.data: dict = kwargs or {} + + @classmethod + def from_track(cls, track: AnyTrack, session: Optional[Session] = None) -> Widevine: + """ + Get PSSH and KID from within the Initiation Segment of the Track Data. + It also tries to get PSSH and KID from other track data like M3U8 data + as well as through ffprobe. + + Create a Widevine DRM System object from a track's information. + This should only be used if a PSSH could not be provided directly. + It is *rare* to need to use this. + + You may provide your own requests session to be able to use custom + headers and more. + + Raises: + PSSHNotFound - If the PSSH was not found within the data. + KIDNotFound - If the KID was not found within the data or PSSH. + """ + if not session: + session = Session() + session.headers.update(config.headers) + + kid: Optional[UUID] = None + pssh_boxes: list[Container] = [] + tenc_boxes: list[Container] = [] + + if track.descriptor == track.Descriptor.HLS: + m3u_url = track.url + master = m3u8.loads(session.get(m3u_url).text, uri=m3u_url) + pssh_boxes.extend( + Box.parse(base64.b64decode(x.uri.split(",")[-1])) + for x in (master.session_keys or master.keys) + if x and x.keyformat and x.keyformat.lower() == WidevineCdm.urn + ) + + init_data = track.get_init_segment(session=session) + if init_data: + # try get via ffprobe, needed for non mp4 data e.g. WEBM from Google Play + probe = ffprobe(init_data) + if probe: + for stream in probe.get("streams") or []: + enc_key_id = stream.get("tags", {}).get("enc_key_id") + if enc_key_id: + kid = UUID(bytes=base64.b64decode(enc_key_id)) + pssh_boxes.extend(list(get_boxes(init_data, b"pssh"))) + tenc_boxes.extend(list(get_boxes(init_data, b"tenc"))) + + pssh_boxes.sort(key=lambda b: { + PSSH.SystemId.Widevine: 0, + PSSH.SystemId.PlayReady: 1 + }[b.system_ID]) + + pssh = next(iter(pssh_boxes), None) + if not pssh: + raise Widevine.Exceptions.PSSHNotFound("PSSH was not found in track data.") + + tenc = next(iter(tenc_boxes), None) + if not kid and tenc and tenc.key_ID.int != 0: + kid = tenc.key_ID + + return cls(pssh=PSSH(pssh), kid=kid) + + @classmethod + def from_init_data(cls, init_data: bytes) -> Widevine: + """ + Get PSSH and KID from within Initialization Segment Data. + + This should only be used if a PSSH could not be provided directly. + It is *rare* to need to use this. + + Raises: + PSSHNotFound - If the PSSH was not found within the data. + KIDNotFound - If the KID was not found within the data or PSSH. + """ + if not init_data: + raise ValueError("Init data should be provided.") + if not isinstance(init_data, bytes): + raise TypeError(f"Expected init data to be bytes, not {init_data!r}") + + kid: Optional[UUID] = None + pssh_boxes: list[Container] = list(get_boxes(init_data, b"pssh")) + tenc_boxes: list[Container] = list(get_boxes(init_data, b"tenc")) + + # try get via ffprobe, needed for non mp4 data e.g. WEBM from Google Play + probe = ffprobe(init_data) + if probe: + for stream in probe.get("streams") or []: + enc_key_id = stream.get("tags", {}).get("enc_key_id") + if enc_key_id: + kid = UUID(bytes=base64.b64decode(enc_key_id)) + + pssh_boxes.sort(key=lambda b: { + PSSH.SystemId.Widevine: 0, + PSSH.SystemId.PlayReady: 1 + }[b.system_ID]) + + pssh = next(iter(pssh_boxes), None) + if not pssh: + raise Widevine.Exceptions.PSSHNotFound("PSSH was not found in track data.") + + tenc = next(iter(tenc_boxes), None) + if not kid and tenc and tenc.key_ID.int != 0: + kid = tenc.key_ID + + return cls(pssh=PSSH(pssh), kid=kid) + + @property + def pssh(self) -> PSSH: + """Get Protection System Specific Header Box.""" + return self._pssh + + @property + def kid(self) -> Optional[UUID]: + """Get first Key ID, if any.""" + return next(iter(self.kids), None) + + @property + def kids(self) -> list[UUID]: + """Get all Key IDs.""" + return self._pssh.key_ids + + def get_content_keys(self, cdm: WidevineCdm, certificate: Callable, licence: Callable) -> None: + """ + Create a CDM Session and obtain Content Keys for this DRM Instance. + The certificate and license params are expected to be a function and will + be provided with the challenge and session ID. + """ + for kid in self.kids: + if kid in self.content_keys: + continue + + session_id = cdm.open() + + try: + cdm.set_service_certificate( + session_id, + certificate( + challenge=cdm.service_certificate_challenge + ) + ) + + cdm.parse_license( + session_id, + licence( + challenge=cdm.get_license_challenge(session_id, self.pssh) + ) + ) + + self.content_keys = { + key.kid: key.key.hex() + for key in cdm.get_keys(session_id, "CONTENT") + } + if not self.content_keys: + raise Widevine.Exceptions.EmptyLicense("No Content Keys were within the License") + + if kid not in self.content_keys: + raise Widevine.Exceptions.CEKNotFound(f"No Content Key for KID {kid.hex} within the License") + finally: + cdm.close(session_id) + + def decrypt(self, path: Path) -> None: + """ + Decrypt a Track with Widevine DRM. + Raises: + EnvironmentError if the mp4decrypt executable could not be found. + ValueError if the track has not yet been downloaded. + SubprocessError if mp4decrypt returned a non-zero exit code. + """ + if not self.content_keys: + raise ValueError("Cannot decrypt a Track without any Content Keys...") + + if not binaries.MP4Decrypt: + raise EnvironmentError("mp4decrypt executable not found but is required.") + if not path or not path.exists(): + raise ValueError("Tried to decrypt a file that does not exist.") + + output_path = path.with_stem(f"{path.stem}_decrypted") + config.directories.temp.mkdir(parents=True, exist_ok=True) + + try: + keys = [f"--key \"{kid.hex}:{key.lower()}\"" for i, (kid, key) in enumerate(self.content_keys.items())] + + p = subprocess.Popen( + f'{binaries.MP4Decrypt} --show-progress {" ".join(keys)} "{path}" "{output_path}"', + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + universal_newlines=True + ) + + stream_skipped = False + had_error = False + + p.wait() + + if p.returncode != 0 or had_error: + raise subprocess.CalledProcessError(p.returncode, arguments) + + path.unlink() + if not stream_skipped: + shutil.move(output_path, path) + except subprocess.CalledProcessError as e: + if e.returncode == 0xC000013A: # STATUS_CONTROL_C_EXIT + raise KeyboardInterrupt() + raise + + class Exceptions: + class PSSHNotFound(Exception): + """PSSH (Protection System Specific Header) was not found.""" + + class KIDNotFound(Exception): + """KID (Encryption Key ID) was not found.""" + + class CEKNotFound(Exception): + """CEK (Content Encryption Key) for KID was not found in License.""" + + class EmptyLicense(Exception): + """License returned no Content Encryption Keys.""" + + +__all__ = ("Widevine",)