mirror of
https://github.com/devine-dl/pywidevine.git
synced 2025-04-30 01:59:44 +00:00
632 lines
28 KiB
Python
632 lines
28 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import binascii
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Union, Optional
|
|
from uuid import UUID
|
|
|
|
from Crypto.Cipher import AES, PKCS1_OAEP
|
|
from Crypto.Hash import SHA1, HMAC, SHA256, CMAC
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.Random import get_random_bytes
|
|
from Crypto.Signature import pss
|
|
from Crypto.Util import Padding
|
|
from google.protobuf.message import DecodeError
|
|
|
|
from pywidevine.device import Device
|
|
from pywidevine.exceptions import TooManySessions, InvalidSession, InvalidLicenseType, SignatureMismatch, \
|
|
InvalidInitData, InvalidLicenseMessage, NoKeysLoaded, InvalidContext
|
|
from pywidevine.key import Key
|
|
from pywidevine.license_protocol_pb2 import DrmCertificate, SignedMessage, SignedDrmCertificate, LicenseType, \
|
|
LicenseRequest, ProtocolVersion, ClientIdentification, EncryptedClientIdentification, License
|
|
from pywidevine.pssh import PSSH
|
|
from pywidevine.session import Session
|
|
from pywidevine.utils import get_binary_path
|
|
|
|
|
|
class Cdm:
|
|
system_id = b"\xed\xef\x8b\xa9\x79\xd6\x4a\xce\xa3\xc8\x27\xdc\xd5\x1d\x21\xed"
|
|
uuid = UUID(bytes=system_id)
|
|
urn = f"urn:uuid:{uuid}"
|
|
key_format = urn
|
|
service_certificate_challenge = b"\x08\x04"
|
|
common_privacy_cert = ("CAUSxwUKwQIIAxIQFwW5F8wSBIaLBjM6L3cqjBiCtIKSBSKOAjCCAQoCggEBAJntWzsyfateJO/DtiqVtZhSCtW8y"
|
|
"zdQPgZFuBTYdrjfQFEEQa2M462xG7iMTnJaXkqeB5UpHVhYQCOn4a8OOKkSeTkwCGELbxWMh4x+Ib/7/up34QGeHl"
|
|
"eB6KRfRiY9FOYOgFioYHrc4E+shFexN6jWfM3rM3BdmDoh+07svUoQykdJDKR+ql1DghjduvHK3jOS8T1v+2RC/TH"
|
|
"hv0CwxgTRxLpMlSCkv5fuvWCSmvzu9Vu69WTi0Ods18Vcc6CCuZYSC4NZ7c4kcHCCaA1vZ8bYLErF8xNEkKdO7Dev"
|
|
"Sy8BDFnoKEPiWC8La59dsPxebt9k+9MItHEbzxJQAZyfWgkCAwEAAToUbGljZW5zZS53aWRldmluZS5jb20SgAOuN"
|
|
"HMUtag1KX8nE4j7e7jLUnfSSYI83dHaMLkzOVEes8y96gS5RLknwSE0bv296snUE5F+bsF2oQQ4RgpQO8GVK5uk5M"
|
|
"4PxL/CCpgIqq9L/NGcHc/N9XTMrCjRtBBBbPneiAQwHL2zNMr80NQJeEI6ZC5UYT3wr8+WykqSSdhV5Cs6cD7xdn9"
|
|
"qm9Nta/gr52u/DLpP3lnSq8x2/rZCR7hcQx+8pSJmthn8NpeVQ/ypy727+voOGlXnVaPHvOZV+WRvWCq5z3CqCLl5"
|
|
"+Gf2Ogsrf9s2LFvE7NVV2FvKqcWTw4PIV9Sdqrd+QLeFHd/SSZiAjjWyWOddeOrAyhb3BHMEwg2T7eTo/xxvF+YkP"
|
|
"j89qPwXCYcOxF+6gjomPwzvofcJOxkJkoMmMzcFBDopvab5tDQsyN9UPLGhGC98X/8z8QSQ+spbJTYLdgFenFoGq4"
|
|
"7gLwDS6NWYYQSqzE3Udf2W7pzk4ybyG4PHBYV3s4cyzdq8amvtE/sNSdOKReuHpfQ=")
|
|
root_signed_cert = SignedDrmCertificate()
|
|
root_signed_cert.ParseFromString(base64.b64decode(
|
|
"CpwDCAASAQAY3ZSIiwUijgMwggGKAoIBgQC0/jnDZZAD2zwRlwnoaM3yw16b8udNI7EQ24dl39z7nzWgVwNTTPZtNX2meNuzNtI/nECplSZy"
|
|
"f7i+Zt/FIZh4FRZoXS9GDkPLioQ5q/uwNYAivjQji6tTW3LsS7VIaVM+R1/9Cf2ndhOPD5LWTN+udqm62SIQqZ1xRdbX4RklhZxTmpfrhNfM"
|
|
"qIiCIHAmIP1+QFAn4iWTb7w+cqD6wb0ptE2CXMG0y5xyfrDpihc+GWP8/YJIK7eyM7l97Eu6iR8nuJuISISqGJIOZfXIbBH/azbkdDTKjDOx"
|
|
"+biOtOYS4AKYeVJeRTP/Edzrw1O6fGAaET0A+9K3qjD6T15Id1sX3HXvb9IZbdy+f7B4j9yCYEy/5CkGXmmMOROtFCXtGbLynwGCDVZEiMg1"
|
|
"7B8RsyTgWQ035Ec86kt/lzEcgXyUikx9aBWE/6UI/Rjn5yvkRycSEbgj7FiTPKwS0ohtQT3F/hzcufjUUT4H5QNvpxLoEve1zqaWVT94tGSC"
|
|
"UNIzX5ECAwEAARKAA1jx1k0ECXvf1+9dOwI5F/oUNnVKOGeFVxKnFO41FtU9v0KG9mkAds2T9Hyy355EzUzUrgkYU0Qy7OBhG+XaE9NVxd0a"
|
|
"y5AeflvG6Q8in76FAv6QMcxrA4S9IsRV+vXyCM1lQVjofSnaBFiC9TdpvPNaV4QXezKHcLKwdpyywxXRESYqI3WZPrl3IjINvBoZwdVlkHZV"
|
|
"dA8OaU1fTY8Zr9/WFjGUqJJfT7x6Mfiujq0zt+kw0IwKimyDNfiKgbL+HIisKmbF/73mF9BiC9yKRfewPlrIHkokL2yl4xyIFIPVxe9enz2F"
|
|
"RXPia1BSV0z7kmxmdYrWDRuu8+yvUSIDXQouY5OcCwEgqKmELhfKrnPsIht5rvagcizfB0fbiIYwFHghESKIrNdUdPnzJsKlVshWTwApHQh7"
|
|
"evuVicPumFSePGuUBRMS9nG5qxPDDJtGCHs9Mmpoyh6ckGLF7RC5HxclzpC5bc3ERvWjYhN0AqdipPpV2d7PouaAdFUGSdUCDA=="
|
|
))
|
|
root_cert = DrmCertificate()
|
|
root_cert.ParseFromString(root_signed_cert.drm_certificate)
|
|
|
|
MAX_NUM_OF_SESSIONS = 50 # most common limit
|
|
|
|
def __init__(
|
|
self,
|
|
device_type: Union[Device.Types, str],
|
|
system_id: int,
|
|
security_level: int,
|
|
client_id: ClientIdentification,
|
|
rsa_key: RSA.RsaKey
|
|
):
|
|
"""Initialize a Widevine Content Decryption Module (CDM)."""
|
|
if not device_type:
|
|
raise ValueError("Device Type must be provided")
|
|
if isinstance(device_type, str):
|
|
device_type = Device.Types[device_type]
|
|
if not isinstance(device_type, Device.Types):
|
|
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
|
|
|
|
if not system_id:
|
|
raise ValueError("System ID must be provided")
|
|
if not isinstance(system_id, int):
|
|
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
|
|
|
|
if not security_level:
|
|
raise ValueError("Security Level must be provided")
|
|
if not isinstance(security_level, int):
|
|
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
|
|
|
if not client_id:
|
|
raise ValueError("Client ID must be provided")
|
|
if not isinstance(client_id, ClientIdentification):
|
|
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
|
|
|
|
if not rsa_key:
|
|
raise ValueError("RSA Key must be provided")
|
|
if not isinstance(rsa_key, RSA.RsaKey):
|
|
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
|
|
|
|
self.device_type = device_type
|
|
self.system_id = system_id
|
|
self.security_level = security_level
|
|
self.__client_id = client_id
|
|
|
|
self.__signer = pss.new(rsa_key)
|
|
self.__decrypter = PKCS1_OAEP.new(rsa_key)
|
|
|
|
self.__sessions: dict[bytes, Session] = {}
|
|
|
|
@classmethod
|
|
def from_device(cls, device: Device) -> Cdm:
|
|
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
|
|
return cls(
|
|
device_type=device.type,
|
|
system_id=device.system_id,
|
|
security_level=device.security_level,
|
|
client_id=device.client_id,
|
|
rsa_key=device.private_key
|
|
)
|
|
|
|
def open(self) -> bytes:
|
|
"""
|
|
Open a Widevine Content Decryption Module (CDM) session.
|
|
|
|
Raises:
|
|
TooManySessions: If the session cannot be opened as limit has been reached.
|
|
"""
|
|
if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
|
|
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
|
|
|
|
session = Session(len(self.__sessions) + 1)
|
|
self.__sessions[session.id] = session
|
|
|
|
return session.id
|
|
|
|
def close(self, session_id: bytes) -> None:
|
|
"""
|
|
Close a Widevine Content Decryption Module (CDM) session.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
del self.__sessions[session_id]
|
|
|
|
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
|
"""
|
|
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
|
|
|
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
|
|
known as Privacy Mode and may be required for some services or for some devices.
|
|
Chrome CDM requires it as of the enforcement of VMP (Verified Media Path).
|
|
|
|
We reject direct DrmCertificates as they do not have signature verification and
|
|
cannot be verified. You must provide a SignedDrmCertificate or a SignedMessage
|
|
containing a SignedDrmCertificate.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
certificate: SignedDrmCertificate (or SignedMessage containing one) in Base64
|
|
or Bytes form obtained from the Service. Some services have their own,
|
|
but most use the common privacy cert, (common_privacy_cert). If None, it
|
|
will remove the current certificate.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
DecodeError: If the certificate could not be parsed as a SignedDrmCertificate
|
|
nor a SignedMessage containing a SignedDrmCertificate.
|
|
SignatureMismatch: If the Signature of the SignedDrmCertificate does not
|
|
match the underlying DrmCertificate.
|
|
|
|
Returns the Service Provider ID of the verified DrmCertificate if successful.
|
|
If certificate is None, it will return the now unset certificate's Provider ID.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
if certificate is None:
|
|
drm_certificate = DrmCertificate()
|
|
drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
|
|
session.service_certificate = None
|
|
return drm_certificate.provider_id
|
|
|
|
if isinstance(certificate, str):
|
|
try:
|
|
certificate = base64.b64decode(certificate) # assuming base64
|
|
except binascii.Error:
|
|
raise DecodeError("Could not decode certificate string as Base64, expected bytes.")
|
|
elif not isinstance(certificate, bytes):
|
|
raise DecodeError(f"Expecting Certificate to be bytes, not {certificate!r}")
|
|
|
|
signed_message = SignedMessage()
|
|
signed_drm_certificate = SignedDrmCertificate()
|
|
|
|
try:
|
|
signed_message.ParseFromString(certificate)
|
|
if signed_message.SerializeToString() == certificate:
|
|
signed_drm_certificate.ParseFromString(signed_message.msg)
|
|
else:
|
|
signed_drm_certificate.ParseFromString(certificate)
|
|
if signed_drm_certificate.SerializeToString() != certificate:
|
|
raise DecodeError("partial parse")
|
|
# Craft a SignedMessage as it's stored as a SignedMessage
|
|
signed_message.Clear()
|
|
signed_message.msg = signed_drm_certificate.SerializeToString()
|
|
# we don't need to sign this message, this is normal
|
|
except DecodeError as e:
|
|
# could be a direct unsigned DrmCertificate, but reject those anyway
|
|
raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}")
|
|
|
|
try:
|
|
pss. \
|
|
new(RSA.import_key(self.root_cert.public_key)). \
|
|
verify(
|
|
msg_hash=SHA1.new(signed_drm_certificate.drm_certificate),
|
|
signature=signed_drm_certificate.signature
|
|
)
|
|
except (ValueError, TypeError):
|
|
raise SignatureMismatch("Signature Mismatch on SignedDrmCertificate, rejecting certificate")
|
|
else:
|
|
session.service_certificate = signed_message
|
|
drm_certificate = DrmCertificate()
|
|
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
|
return drm_certificate.provider_id
|
|
|
|
def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]:
|
|
"""
|
|
Get the currently set Service Privacy Certificate of the Session.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
|
|
Returns the Service Certificate if one is set, otherwise None.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
return session.service_certificate
|
|
|
|
def get_license_challenge(
|
|
self,
|
|
session_id: bytes,
|
|
pssh: PSSH,
|
|
type_: Union[int, str] = LicenseType.STREAMING,
|
|
privacy_mode: bool = True
|
|
) -> bytes:
|
|
"""
|
|
Get a License Request (Challenge) to send to a License Server.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
pssh: PSSH Object to get the init data from.
|
|
type_: Type of License you wish to exchange, often `STREAMING`. The `OFFLINE`
|
|
Licenses are for Offline licensing of Downloaded content.
|
|
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
|
|
privacy certificate is not set yet, this does nothing.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
InvalidInitData: If the Init Data (or PSSH box) provided is invalid.
|
|
InvalidLicenseType: If the type_ parameter value is not a License Type. It
|
|
must be a LicenseType enum, or a string/int representing the enum's keys
|
|
or values.
|
|
|
|
Returns a SignedMessage containing a LicenseRequest message. It's signed with
|
|
the Private Key of the device provision.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
if not pssh:
|
|
raise InvalidInitData("A pssh must be provided.")
|
|
if not isinstance(pssh, PSSH):
|
|
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
|
|
|
|
try:
|
|
if isinstance(type_, int):
|
|
LicenseType.Name(int(type_))
|
|
elif isinstance(type_, str):
|
|
type_ = LicenseType.Value(type_)
|
|
elif not isinstance(type_, LicenseType):
|
|
raise InvalidLicenseType()
|
|
except ValueError:
|
|
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
|
|
|
|
if self.device_type == Device.Types.ANDROID:
|
|
# OEMCrypto's request_id seems to be in AES CTR Counter block form with no suffix
|
|
# Bytes 5-8 does not seem random, in real tests they have been consecutive \x00 or \xFF
|
|
# Real example: A0DCE548000000000500000000000000
|
|
request_id = (get_random_bytes(4) + (b"\x00" * 4)) # (?)
|
|
request_id += session.number.to_bytes(8, "little") # counter
|
|
# as you can see in the real example, it is stored as uppercase hex and re-encoded
|
|
# it's really 16 bytes of data, but it's stored as a 32-char HEX string (32 bytes)
|
|
request_id = request_id.hex().upper().encode()
|
|
else:
|
|
request_id = get_random_bytes(16)
|
|
|
|
license_request = LicenseRequest()
|
|
license_request.type = LicenseRequest.RequestType.Value("NEW")
|
|
license_request.request_time = int(time.time())
|
|
license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1")
|
|
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
|
|
|
|
# pssh_data may be either a WidevineCencHeader or custom data
|
|
# we have to assume the pssh.init_data value is valid, we cannot test
|
|
license_request.content_id.widevine_pssh_data.pssh_data.append(pssh.init_data)
|
|
license_request.content_id.widevine_pssh_data.license_type = type_
|
|
license_request.content_id.widevine_pssh_data.request_id = request_id
|
|
|
|
if session.service_certificate and privacy_mode:
|
|
# encrypt the client id for privacy mode
|
|
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
|
|
client_id=self.__client_id,
|
|
service_certificate=session.service_certificate
|
|
))
|
|
else:
|
|
license_request.client_id.CopyFrom(self.__client_id)
|
|
|
|
license_message = SignedMessage()
|
|
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
|
|
license_message.msg = license_request.SerializeToString()
|
|
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
|
|
|
|
session.context[request_id] = self.derive_context(license_message.msg)
|
|
|
|
return license_message.SerializeToString()
|
|
|
|
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
|
|
"""
|
|
Load Keys from a License Message from a License Server Response.
|
|
|
|
License Messages can only be loaded a single time. An InvalidContext error will
|
|
be raised if you attempt to parse a License Message more than once.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
license_message: A SignedMessage containing a License message.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
InvalidLicenseMessage: The License message could not be decoded as a Signed
|
|
Message or License message.
|
|
InvalidContext: If the Session has no Context Data. This is likely to happen
|
|
if the License Challenge was not made by this CDM instance, or was not
|
|
by this CDM at all. It could also happen if the Session is closed after
|
|
calling parse_license but not before it got the context data.
|
|
SignatureMismatch: If the Signature of the License SignedMessage does not
|
|
match the underlying License.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
if not license_message:
|
|
raise InvalidLicenseMessage("Cannot parse an empty license_message")
|
|
|
|
if isinstance(license_message, str):
|
|
try:
|
|
license_message = base64.b64decode(license_message)
|
|
except (binascii.Error, binascii.Incomplete) as e:
|
|
raise InvalidLicenseMessage(f"Could not decode license_message as Base64, {e}")
|
|
|
|
if isinstance(license_message, bytes):
|
|
signed_message = SignedMessage()
|
|
try:
|
|
signed_message.ParseFromString(license_message)
|
|
if signed_message.SerializeToString() != license_message:
|
|
raise DecodeError(license_message)
|
|
except DecodeError as e:
|
|
raise InvalidLicenseMessage(f"Could not parse license_message as a SignedMessage, {e}")
|
|
license_message = signed_message
|
|
|
|
if not isinstance(license_message, SignedMessage):
|
|
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
|
|
|
|
if license_message.type != SignedMessage.MessageType.LICENSE:
|
|
raise InvalidLicenseMessage(
|
|
f"Expecting a LICENSE message, not a "
|
|
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
|
|
)
|
|
|
|
licence = License()
|
|
licence.ParseFromString(license_message.msg)
|
|
|
|
context = session.context.get(licence.id.request_id)
|
|
if not context:
|
|
raise InvalidContext("Cannot parse a license message without first making a license request")
|
|
|
|
enc_key, mac_key_server, _ = self.derive_keys(
|
|
*context,
|
|
key=self.__decrypter.decrypt(license_message.session_key)
|
|
)
|
|
|
|
computed_signature = HMAC. \
|
|
new(mac_key_server, digestmod=SHA256). \
|
|
update(licence.SerializeToString()). \
|
|
digest()
|
|
|
|
if license_message.signature != computed_signature:
|
|
raise SignatureMismatch("Signature Mismatch on License Message, rejecting license")
|
|
|
|
session.keys = [
|
|
Key.from_key_container(key, enc_key)
|
|
for key in licence.key
|
|
]
|
|
|
|
del session.context[licence.id.request_id]
|
|
|
|
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
|
|
"""
|
|
Get Keys from the loaded License message.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
type_: (optional) Key Type to filter by and return.
|
|
|
|
Raises:
|
|
InvalidSession: If the Session identifier is invalid.
|
|
TypeError: If the provided type_ is an unexpected value type.
|
|
ValueError: If the provided type_ is not a valid Key Type.
|
|
"""
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
try:
|
|
if isinstance(type_, str):
|
|
type_ = License.KeyContainer.KeyType.Value(type_)
|
|
elif isinstance(type_, int):
|
|
License.KeyContainer.KeyType.Name(type_) # only test
|
|
elif type_ is not None:
|
|
raise TypeError(f"Expected type_ to be a {License.KeyContainer.KeyType} or int, not {type_!r}")
|
|
except ValueError as e:
|
|
raise ValueError(f"Could not parse type_ as a {License.KeyContainer.KeyType}, {e}")
|
|
|
|
return [
|
|
key
|
|
for key in session.keys
|
|
if not type_ or key.type == License.KeyContainer.KeyType.Name(type_)
|
|
]
|
|
|
|
def decrypt(
|
|
self,
|
|
session_id: bytes,
|
|
input_file: Union[Path, str],
|
|
output_file: Union[Path, str],
|
|
temp_dir: Optional[Union[Path, str]] = None,
|
|
exists_ok: bool = False
|
|
):
|
|
"""
|
|
Decrypt a Widevine-encrypted file using Shaka-packager.
|
|
Shaka-packager is much more stable than mp4decrypt.
|
|
|
|
Parameters:
|
|
session_id: Session identifier.
|
|
input_file: File to be decrypted with Session's currently loaded keys.
|
|
output_file: Location to save decrypted file.
|
|
temp_dir: Directory to store temporary data while decrypting.
|
|
exists_ok: Allow overwriting the output_file if it exists.
|
|
|
|
Raises:
|
|
ValueError: If the input or output paths have not been supplied or are
|
|
invalid.
|
|
FileNotFoundError: If the input file path does not exist.
|
|
FileExistsError: If the output file path already exists. Ignored if exists_ok
|
|
is set to True.
|
|
NoKeysLoaded: No License was parsed for this Session, No Keys available.
|
|
EnvironmentError: If the shaka-packager executable could not be found.
|
|
subprocess.CalledProcessError: If the shaka-packager call returned a non-zero
|
|
exit code.
|
|
"""
|
|
if not input_file:
|
|
raise ValueError("Cannot decrypt nothing, specify an input path")
|
|
if not output_file:
|
|
raise ValueError("Cannot decrypt nowhere, specify an output path")
|
|
|
|
if not isinstance(input_file, (Path, str)):
|
|
raise ValueError(f"Expecting input_file to be a Path or str, got {input_file!r}")
|
|
if not isinstance(output_file, (Path, str)):
|
|
raise ValueError(f"Expecting output_file to be a Path or str, got {output_file!r}")
|
|
if not isinstance(temp_dir, (Path, str)) and temp_dir is not None:
|
|
raise ValueError(f"Expecting temp_dir to be a Path or str, got {temp_dir!r}")
|
|
|
|
input_file = Path(input_file)
|
|
output_file = Path(output_file)
|
|
if temp_dir:
|
|
temp_dir = Path(temp_dir)
|
|
|
|
if not input_file.is_file():
|
|
raise FileNotFoundError(f"Input file does not exist, {input_file}")
|
|
if output_file.is_file() and not exists_ok:
|
|
raise FileExistsError(f"Output file already exists, {output_file}")
|
|
|
|
session = self.__sessions.get(session_id)
|
|
if not session:
|
|
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
|
|
|
if not session.keys:
|
|
raise NoKeysLoaded("No Keys are loaded yet, cannot decrypt")
|
|
|
|
platform = {"win32": "win", "darwin": "osx"}.get(sys.platform, sys.platform)
|
|
executable = get_binary_path("shaka-packager", f"packager-{platform}", f"packager-{platform}-x64")
|
|
if not executable:
|
|
raise EnvironmentError("Shaka Packager executable not found but is required")
|
|
|
|
args = [
|
|
f"input={input_file},stream=0,output={output_file}",
|
|
"--enable_raw_key_decryption",
|
|
"--keys", ",".join([
|
|
label
|
|
for i, key in enumerate(session.keys)
|
|
for label in [
|
|
f"label=1_{i}:key_id={key.kid.hex}:key={key.key.hex()}",
|
|
# some services need the KID blanked, e.g., Apple TV+
|
|
f"label=2_{i}:key_id={'0' * 32}:key={key.key.hex()}"
|
|
]
|
|
if key.type == "CONTENT"
|
|
])
|
|
]
|
|
|
|
if temp_dir:
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
args.extend(["--temp_dir", temp_dir])
|
|
|
|
subprocess.check_call([executable, *args])
|
|
|
|
@staticmethod
|
|
def encrypt_client_id(
|
|
client_id: ClientIdentification,
|
|
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate],
|
|
key: bytes = None,
|
|
iv: bytes = None
|
|
) -> EncryptedClientIdentification:
|
|
"""Encrypt the Client ID with the Service's Privacy Certificate."""
|
|
privacy_key = key or get_random_bytes(16)
|
|
privacy_iv = iv or get_random_bytes(16)
|
|
|
|
if isinstance(service_certificate, SignedMessage):
|
|
signed_drm_certificate = SignedDrmCertificate()
|
|
signed_drm_certificate.ParseFromString(service_certificate.msg)
|
|
service_certificate = signed_drm_certificate
|
|
if isinstance(service_certificate, SignedDrmCertificate):
|
|
drm_certificate = DrmCertificate()
|
|
drm_certificate.ParseFromString(service_certificate.drm_certificate)
|
|
service_certificate = drm_certificate
|
|
if not isinstance(service_certificate, DrmCertificate):
|
|
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
|
|
|
|
enc_client_id = EncryptedClientIdentification()
|
|
enc_client_id.provider_id = service_certificate.provider_id
|
|
enc_client_id.service_certificate_serial_number = service_certificate.serial_number
|
|
|
|
enc_client_id.encrypted_client_id = AES. \
|
|
new(privacy_key, AES.MODE_CBC, privacy_iv). \
|
|
encrypt(Padding.pad(client_id.SerializeToString(), 16))
|
|
|
|
enc_client_id.encrypted_privacy_key = PKCS1_OAEP. \
|
|
new(RSA.importKey(service_certificate.public_key)). \
|
|
encrypt(privacy_key)
|
|
enc_client_id.encrypted_client_id_iv = privacy_iv
|
|
|
|
return enc_client_id
|
|
|
|
@staticmethod
|
|
def derive_context(message: bytes) -> tuple[bytes, bytes]:
|
|
"""Returns 2 Context Data used for computing the AES Encryption and HMAC Keys."""
|
|
|
|
def _get_enc_context(msg: bytes) -> bytes:
|
|
label = b"ENCRYPTION"
|
|
key_size = 16 * 8 # 128-bit
|
|
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
|
|
|
def _get_mac_context(msg: bytes) -> bytes:
|
|
label = b"AUTHENTICATION"
|
|
key_size = 32 * 8 * 2 # 512-bit
|
|
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
|
|
|
return _get_enc_context(message), _get_mac_context(message)
|
|
|
|
@staticmethod
|
|
def derive_keys(enc_context: bytes, mac_context: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
|
|
"""
|
|
Returns 3 keys derived from the input message.
|
|
Key can either be a pre-provision device aes key, provision key, or a session key.
|
|
|
|
For provisioning:
|
|
- enc: aes key used for unwrapping RSA key out of response
|
|
- mac_key_server: hmac-sha256 key used for verifying provisioning response
|
|
- mac_key_client: hmac-sha256 key used for signing provisioning request
|
|
|
|
When used with a session key:
|
|
- enc: decrypting content and other keys
|
|
- mac_key_server: verifying response
|
|
- mac_key_client: renewals
|
|
|
|
With key as pre-provision device key, it can be used to provision and get an
|
|
RSA device key and token/cert with key as session key (OAEP wrapped with the
|
|
post-provision RSA device key), it can be used to decrypt content and signing
|
|
keys and verify licenses.
|
|
"""
|
|
|
|
def _derive(session_key: bytes, context: bytes, counter: int) -> bytes:
|
|
return CMAC. \
|
|
new(session_key, ciphermod=AES). \
|
|
update(counter.to_bytes(1, "big") + context). \
|
|
digest()
|
|
|
|
enc_key = _derive(key, enc_context, 1)
|
|
mac_key_server = _derive(key, mac_context, 1)
|
|
mac_key_server += _derive(key, mac_context, 2)
|
|
mac_key_client = _derive(key, mac_context, 3)
|
|
mac_key_client += _derive(key, mac_context, 4)
|
|
|
|
return enc_key, mac_key_server, mac_key_client
|
|
|
|
|
|
__ALL__ = (Cdm,)
|