diff --git a/pywidevine/cdm.py b/pywidevine/cdm.py new file mode 100644 index 0000000..3fa9207 --- /dev/null +++ b/pywidevine/cdm.py @@ -0,0 +1,368 @@ +import base64 +import random +import subprocess +import sys +import time +from pathlib import Path +from typing import Union, Optional +from uuid import UUID + +from Crypto.Cipher import AES, PKCS1_OAEP +from Crypto.Hash import SHA1, HMAC, SHA256, CMAC +from Crypto.PublicKey import RSA +from Crypto.Random import get_random_bytes +from Crypto.Signature import pss +from Crypto.Util import Padding +from construct import Container +from google.protobuf.message import DecodeError + +from pywidevine.utils import get_binary_path +from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, LicenseRequest, ProtocolVersion, \ + SignedDrmCertificate, DrmCertificate, EncryptedClientIdentification, ClientIdentification, License +from pywidevine.device import Device +from pywidevine.key import Key +from pywidevine.pssh import PSSH + + +class Cdm: + system_id = b"\xed\xef\x8b\xa9\x79\xd6\x4a\xce\xa3\xc8\x27\xdc\xd5\x1d\x21\xed" + uuid = UUID(bytes=system_id) + urn = f"urn:uuid:{uuid}" + key_format = urn + service_certificate_challenge = b"\x08\x04" + common_privacy_cert = ("CAUSxwUKwQIIAxIQFwW5F8wSBIaLBjM6L3cqjBiCtIKSBSKOAjCCAQoCggEBAJntWzsyfateJO/DtiqVtZhSCtW8y" + "zdQPgZFuBTYdrjfQFEEQa2M462xG7iMTnJaXkqeB5UpHVhYQCOn4a8OOKkSeTkwCGELbxWMh4x+Ib/7/up34QGeHl" + "eB6KRfRiY9FOYOgFioYHrc4E+shFexN6jWfM3rM3BdmDoh+07svUoQykdJDKR+ql1DghjduvHK3jOS8T1v+2RC/TH" + "hv0CwxgTRxLpMlSCkv5fuvWCSmvzu9Vu69WTi0Ods18Vcc6CCuZYSC4NZ7c4kcHCCaA1vZ8bYLErF8xNEkKdO7Dev" + "Sy8BDFnoKEPiWC8La59dsPxebt9k+9MItHEbzxJQAZyfWgkCAwEAAToUbGljZW5zZS53aWRldmluZS5jb20SgAOuN" + "HMUtag1KX8nE4j7e7jLUnfSSYI83dHaMLkzOVEes8y96gS5RLknwSE0bv296snUE5F+bsF2oQQ4RgpQO8GVK5uk5M" + "4PxL/CCpgIqq9L/NGcHc/N9XTMrCjRtBBBbPneiAQwHL2zNMr80NQJeEI6ZC5UYT3wr8+WykqSSdhV5Cs6cD7xdn9" + "qm9Nta/gr52u/DLpP3lnSq8x2/rZCR7hcQx+8pSJmthn8NpeVQ/ypy727+voOGlXnVaPHvOZV+WRvWCq5z3CqCLl5" + "+Gf2Ogsrf9s2LFvE7NVV2FvKqcWTw4PIV9Sdqrd+QLeFHd/SSZiAjjWyWOddeOrAyhb3BHMEwg2T7eTo/xxvF+YkP" + "j89qPwXCYcOxF+6gjomPwzvofcJOxkJkoMmMzcFBDopvab5tDQsyN9UPLGhGC98X/8z8QSQ+spbJTYLdgFenFoGq4" + "7gLwDS6NWYYQSqzE3Udf2W7pzk4ybyG4PHBYV3s4cyzdq8amvtE/sNSdOKReuHpfQ=") + + NUM_OF_SESSIONS = 0 + MAX_NUM_OF_SESSIONS = 50 # most common limit + + def __init__( + self, + device: Device, + pssh: Union[Container, bytes, str], + license_type: LicenseType = LicenseType.STREAMING, + raw: bool = False + ): + """ + Open a Widevine Content Decryption Module (CDM) session. + + Parameters: + device: Widevine Device containing the Client ID, Device Private Key, and + more device-specific information. + pssh: Protection System Specific Header Box or Init Data. This should be a + compliant mp4 pssh box, or just the init data (Widevine Cenc Header). + license_type: Type of License you wish to exchange, often `STREAMING`. + The `OFFLINE` Licenses are for Offline licensing of Downloaded content. + raw: This should be set to True if the PSSH data provided is arbitrary data. + E.g., a PSSH Box where the init data is not a Widevine Cenc Header, or + is simply arbitrary data. + + Devices have a limit on how many sessions can be open and active concurrently. + The limit is different for each device and security level, most commonly 50. + This limit is handled by the OEM Crypto API. Multiple sessions can be open at + a time and sessions should be closed when no longer needed. + + If an API or System requests a Widevine Session ID, it is best to provide it + the real Session ID created here (self.session_id) instead of an arbitrary or + random value. + """ + if not device: + raise ValueError("A Widevine Device must be provided.") + if not pssh: + raise ValueError("A PSSH Box must be provided.") + + if self.NUM_OF_SESSIONS >= self.MAX_NUM_OF_SESSIONS: + raise ValueError( + f"Too many Sessions open {self.NUM_OF_SESSIONS}/{self.MAX_NUM_OF_SESSIONS}. " + f"Close some Sessions to be able to open more." + ) + + self.NUM_OF_SESSIONS += 1 + + self.device = device + self.init_data = pssh + self.license_type = license_type + self.raw = raw + + if not self.raw: + # we only want the init_data of the pssh box + self.init_data = PSSH.get_as_box(pssh).init_data + + self.session_id = self.create_session_id(self.device) + self.service_certificate: Optional[SignedMessage] = None + self.license_request: Optional[SignedMessage] = None + + def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage: + """ + Set a Service Privacy Certificate for Privacy Mode. (optional but recommended) + + Parameters: + certificate: Signed Message in Base64 or Bytes form obtained from the Service. + Some services have their own, but most use the common privacy cert, + (common_privacy_cert). + + Returns the parsed Signed Message if successful, otherwise raises a DecodeError. + + The Service Certificate is used to encrypt Client IDs in Licenses. This is also + known as Privacy Mode and may be required for some services or for some devices. + Chrome CDM requires it as of the enforcement of VMP (Verified Media Path). + """ + if isinstance(certificate, str): + certificate = base64.b64decode(certificate) # assuming base64 + + signed_message = SignedMessage() + try: + signed_message.ParseFromString(certificate) + except DecodeError: + raise ValueError("Could not parse certificate as a Signed Message.") + + self.service_certificate = signed_message + return signed_message + + def get_license_challenge(self, privacy_mode: bool = True) -> bytes: + """ + Get a License Challenge to send to a License Server. + + Parameters: + privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the + privacy certificate is not set yet, this does nothing. + + Returns a SignedMessage containing a LicenseRequest message. It's signed with + the Private Key of the device provision. + """ + license_request = LicenseRequest() + license_request.type = LicenseRequest.RequestType.Value("NEW") + license_request.request_time = int(time.time()) + license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1") + license_request.key_control_nonce = random.randrange(1, 2 ** 31) + + license_request.content_id.widevine_pssh_data.pssh_data.append(self.init_data) + license_request.content_id.widevine_pssh_data.license_type = self.license_type + license_request.content_id.widevine_pssh_data.request_id = self.session_id + + if self.service_certificate and privacy_mode: + # encrypt the client id for privacy mode + license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id( + client_id=self.device.client_id, + service_certificate=self.service_certificate + )) + else: + license_request.client_id.CopyFrom(self.device.client_id) + + license_message = SignedMessage() + license_message.type = SignedMessage.MessageType.Value("LICENSE_REQUEST") + license_message.msg = license_request.SerializeToString() + + license_message.signature = pss. \ + new(self.device.private_key). \ + sign(SHA1.new(license_message.msg)) + + # store it for later, we need it for deriving keys when parsing a license + self.license_request = license_message + + return license_message.SerializeToString() + + def parse_license(self, license_message: Union[bytes, str]) -> list[Key]: + # TODO: What if the CDM generates a 2nd challenge, overwriting the previous making it mismatch + # for deriving keys? We need to make self.license_request always match the license_message + if not self.license_request: + raise ValueError("Cannot parse a license message without first making a license request") + + if not license_message: + raise ValueError("Cannot parse an empty license_message as a SignedMessage") + + if isinstance(license_message, str): + license_message = base64.b64decode(license_message) + if isinstance(license_message, bytes): + signed_message = SignedMessage() + try: + signed_message.ParseFromString(license_message) + except DecodeError: + raise ValueError("Failed to parse license_message as a SignedMessage") + license_message = signed_message + if not isinstance(license_message, SignedMessage): + raise ValueError(f"Expecting license_response to be a SignedMessage, got {license_message!r}") + + licence = License() + licence.ParseFromString(license_message.msg) + + session_key = PKCS1_OAEP. \ + new(self.device.private_key). \ + decrypt(license_message.session_key) + + enc_key, mac_key_server, mac_key_client = self.derive_keys(self.license_request.msg, session_key) + + license_signature = HMAC. \ + new(mac_key_server, digestmod=SHA256). \ + update(licence.SerializeToString()). \ + digest() + + if license_message.signature != license_signature: + raise ValueError("The License Signature doesn't match the Signature listed in the Message") + + return [ + Key.from_key_container(key, enc_key) + for key in licence.key + ] + + @staticmethod + def decrypt(content_keys: dict[UUID, str], input_: Path, output: Path, temp: Optional[Path] = None): + """ + Decrypt a Widevine-encrypted file using Shaka-packager. + Shaka-packager is much more stable than mp4decrypt. + + Raises: + EnvironmentError if the Shaka Packager executable could not be found. + ValueError if the track has not yet been downloaded. + SubprocessError if Shaka Packager returned a non-zero exit code. + """ + if not content_keys: + raise ValueError("Cannot decrypt without any Content Keys") + if not input_: + raise ValueError("Cannot decrypt nothing, specify an input path") + if not output: + raise ValueError("Cannot decrypt nowhere, specify an output path") + + platform = {"win32": "win", "darwin": "osx"}.get(sys.platform, sys.platform) + executable = get_binary_path("shaka-packager", f"packager-{platform}", f"packager-{platform}-x64") + if not executable: + raise EnvironmentError("Shaka Packager executable not found but is required") + + args = [ + f"input={input_},stream=0,output={output}", + "--enable_raw_key_decryption", "--keys", + ",".join([ + *[ + "label={}:key_id={}:key={}".format(i, kid.hex, key.lower()) + for i, (kid, key) in enumerate(content_keys.items()) + ], + *[ + # Apple TV+ needs this as their files do not use the KID supplied in the manifest + "label={}:key_id={}:key={}".format(i, "00" * 16, key.lower()) + for i, (kid, key) in enumerate(content_keys.items(), len(content_keys)) + ] + ]), + ] + + if temp: + temp.mkdir(parents=True, exist_ok=True) + args.extend(["--temp_dir", temp]) + + try: + subprocess.check_call([executable, *args]) + except subprocess.CalledProcessError as e: + raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}") + + def create_session_id(self, device: Device) -> bytes: + """Create a Session ID based on OEM Crypto API session values.""" + if device.type == device.Types.ANDROID: + session_id = "{hex:16X}{counter}".format( + hex=random.getrandbits(64), + counter=f"{self.NUM_OF_SESSIONS:02}" + ) + session_id.ljust(32, "0") + return session_id.encode("ascii") + + if device.type == device.Types.CHROME: + return get_random_bytes(16) + + raise ValueError(f"Device Type {device.type.name} is not implemented") + + @staticmethod + def encrypt_client_id( + client_id: ClientIdentification, + service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate], + key: bytes = None, + iv: bytes = None + ) -> EncryptedClientIdentification: + """Encrypt the Client ID with the Service's Privacy Certificate.""" + privacy_key = key or get_random_bytes(16) + privacy_iv = iv or get_random_bytes(16) + + if isinstance(service_certificate, SignedMessage): + signed_service_certificate = SignedDrmCertificate() + signed_service_certificate.ParseFromString(service_certificate.msg) + service_certificate = signed_service_certificate + + if isinstance(service_certificate, SignedDrmCertificate): + service_service_drm_certificate = DrmCertificate() + service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate) + service_certificate = service_service_drm_certificate + + if not isinstance(service_certificate, DrmCertificate): + raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}") + + enc_client_id = EncryptedClientIdentification() + enc_client_id.provider_id = service_certificate.provider_id + enc_client_id.service_certificate_serial_number = service_certificate.serial_number + + enc_client_id.encrypted_client_id = AES. \ + new(privacy_key, AES.MODE_CBC, privacy_iv). \ + encrypt(Padding.pad(client_id.SerializeToString(), 16)) + + enc_client_id.encrypted_privacy_key = PKCS1_OAEP. \ + new(RSA.importKey(service_certificate.public_key)). \ + encrypt(privacy_key) + enc_client_id.encrypted_client_id_iv = privacy_iv + + return enc_client_id + + @staticmethod + def derive_keys(msg: bytes, key: bytes) -> tuple[bytes, bytes, bytes]: + """ + Returns 3 keys derived from the input message. + Key can either be a pre-provision device aes key, provision key, or a session key. + + For provisioning: + - enc: aes key used for unwrapping RSA key out of response + - mac_key_server: hmac-sha256 key used for verifying provisioning response + - mac_key_client: hmac-sha256 key used for signing provisioning request + + When used with a session key: + - enc: decrypting content and other keys + - mac_key_server: verifying response + - mac_key_client: renewals + + With key as pre-provision device key, it can be used to provision and get an + RSA device key and token/cert with key as session key (OAEP wrapped with the + post-provision RSA device key), it can be used to decrypt content and signing + keys and verify licenses. + """ + + def get_enc_context(message: bytes) -> bytes: + label = b"ENCRYPTION" + key_size = 16 * 8 # 128-bit + return label + b"\x00" + message + key_size.to_bytes(4, "big") + + def get_mac_context(message: bytes) -> bytes: + label = b"AUTHENTICATION" + key_size = 32 * 8 * 2 # 512-bit + return label + b"\x00" + message + key_size.to_bytes(4, "big") + + def _derive(session_key: bytes, context: bytes, counter: int) -> bytes: + return CMAC.new(session_key, ciphermod=AES). \ + update(counter.to_bytes(1, "big") + context). \ + digest() + + enc_context = get_enc_context(msg) + mac_context = get_mac_context(msg) + + enc_key = _derive(key, enc_context, 1) + mac_key_server = _derive(key, mac_context, 1) + mac_key_server += _derive(key, mac_context, 2) + mac_key_client = _derive(key, mac_context, 3) + mac_key_client += _derive(key, mac_context, 4) + + return enc_key, mac_key_server, mac_key_client + + +__ALL__ = (Cdm,) diff --git a/pywidevine/utils.py b/pywidevine/utils.py new file mode 100644 index 0000000..6556d3e --- /dev/null +++ b/pywidevine/utils.py @@ -0,0 +1,12 @@ +import shutil +from pathlib import Path +from typing import Optional + + +def get_binary_path(*names: str) -> Optional[Path]: + """Get the path of the first found binary name.""" + for name in names: + path = shutil.which(name) + if path: + return Path(path) + return None