diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py index 21ba9cf..49c7884 100644 --- a/pyplayready/__init__.py +++ b/pyplayready/__init__.py @@ -9,6 +9,8 @@ from pyplayready.remote.remotecdm import * from pyplayready.system.bcert import * from pyplayready.system.pssh import * from pyplayready.system.session import * +from pyplayready.misc.drmresults import * +from pyplayready.misc.exceptions import * -__version__ = "0.6.0" +__version__ = "0.6.3" diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py index 7b7d7a8..bfe59b1 100644 --- a/pyplayready/cdm.py +++ b/pyplayready/cdm.py @@ -4,7 +4,7 @@ import base64 import time from typing import List, Union, Optional from uuid import UUID -import xml.etree.ElementTree as ET +from lxml import etree as ET import xmltodict from Crypto.Cipher import AES @@ -16,12 +16,12 @@ from Crypto.Util.strxor import strxor from ecpy.curves import Point, Curve from pyplayready.crypto import Crypto -from pyplayready.drmresults import DRMResult +from pyplayready.misc.drmresults import DrmResult from pyplayready.system.bcert import CertificateChain from pyplayready.crypto.ecc_key import ECCKey from pyplayready.license.key import Key from pyplayready.license.xmrlicense import XMRLicense, XMRObjectTypes -from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException) +from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException) from pyplayready.system.session import Session from pyplayready.system.wrmheader import WRMHeader @@ -29,7 +29,10 @@ from pyplayready.system.wrmheader import WRMHeader class Cdm: MAX_NUM_OF_SESSIONS = 16 - rgbMagicConstantZero = bytes.fromhex("7ee9ed4af773224f00b8ea7efb027cbb") + MagicConstantZero = bytes([ + 0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f, + 0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb + ]) def __init__( self, @@ -288,13 +291,14 @@ class Cdm: raise InvalidSession("Cannot parse a license message without first making a license request") try: - root = ET.fromstring(licence) + parser = ET.XMLParser(remove_blank_text=True) + root = ET.XML(licence.encode(), parser) faults = root.findall(".//{http://schemas.xmlsoap.org/soap/envelope/}Fault") for fault in faults: status_codes = fault.findall(".//StatusCode") for status_code in status_codes: - code = DRMResult.from_code(status_code.text) + code = DrmResult.from_code(status_code.text) raise ServerException(f"[{status_code.text}] ({code.name}) {code.message}") license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License") @@ -328,7 +332,7 @@ class Cdm: embedded_root_license = content_key.encrypted_key[:144] embedded_leaf_license = content_key.encrypted_key[144:] - rgb_key = strxor(ck, self.rgbMagicConstantZero) + rgb_key = strxor(ck, self.MagicConstantZero) content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key) aux_key = next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"] @@ -352,6 +356,7 @@ class Cdm: key=ck )) except Exception as e: + raise raise InvalidLicense(f"Unable to parse license: {e}") def get_keys(self, session_id: bytes) -> List[Key]: diff --git a/pyplayready/crypto/ecc_key.py b/pyplayready/crypto/ecc_key.py index 2baab46..55996b1 100644 --- a/pyplayready/crypto/ecc_key.py +++ b/pyplayready/crypto/ecc_key.py @@ -29,7 +29,7 @@ class ECCKey: if not isinstance(private_key, int): raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}") - # The public is always derived from the private key; loading the other stuff won't work + # The public key is always derived from the private key; loading the other stuff won't work key = ECC.construct( curve='P-256', d=private_key, diff --git a/pyplayready/crypto/key_wrap.py b/pyplayready/crypto/key_wrap.py new file mode 100644 index 0000000..db3de43 --- /dev/null +++ b/pyplayready/crypto/key_wrap.py @@ -0,0 +1,52 @@ +from Crypto.Cipher import AES +from Crypto.Hash import CMAC + +from cryptography.hazmat.primitives.keywrap import aes_key_unwrap + +def derive_wrapping_key() -> bytes: + """ + NIST SP 800-108 (Rev. 1) + "Recommendation for Key Derivation Using Pseudorandom Functions" + + https://doi.org/10.6028/NIST.SP.800-108r1 + """ + + KeyDerivationCertificatePrivateKeysWrap = bytes([ + 0x9c, 0xe9, 0x34, 0x32, 0xc7, 0xd7, 0x40, 0x16, + 0xba, 0x68, 0x47, 0x63, 0xf8, 0x01, 0xe1, 0x36 + ]) + + CTK_TEST = bytes([ + 0x8B, 0x22, 0x2F, 0xFD, 0x1E, 0x76, 0x19, 0x56, + 0x59, 0xCF, 0x27, 0x03, 0x89, 0x8C, 0x42, 0x7F + ]) + + cmac = CMAC.new(CTK_TEST, ciphermod=AES) + + cmac.update(bytes([ + 1, # Iterations + *KeyDerivationCertificatePrivateKeysWrap, + 0, # Separator + *bytes(16), # Context + 0, 128 # Length in bits of return value + ])) + + derived_wrapping_key = cmac.digest() + + return derived_wrapping_key + +def unwrap_wrapped_key(wrapped_key: bytes) -> bytes: + """ + IETF RFC 3394 + "Advanced Encryption Standard (AES) Key Wrap Algorithm" + + https://www.rfc-editor.org/rfc/rfc3394 + """ + + wrapping_key = derive_wrapping_key() + unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key) + + # bytes 0 -32: unwrapped key + # bytes 32-48: random bytes + + return unwrapped_key[:32] \ No newline at end of file diff --git a/pyplayready/device/__init__.py b/pyplayready/device/__init__.py index c4aaaf2..84f9cba 100644 --- a/pyplayready/device/__init__.py +++ b/pyplayready/device/__init__.py @@ -5,10 +5,10 @@ from enum import IntEnum from pathlib import Path from typing import Union, Any, Optional -from pyplayready.device.structs import DeviceStructs -from pyplayready.exceptions import OutdatedDevice -from pyplayready.system.bcert import CertificateChain from pyplayready.crypto.ecc_key import ECCKey +from pyplayready.device.structs import DeviceStructs +from pyplayready.misc.exceptions import OutdatedDevice +from pyplayready.system.bcert import CertificateChain class Device: @@ -73,13 +73,13 @@ class Device: with Path(path).open(mode="rb") as f: return cls.loads(f.read()) - def dumps(self) -> bytes: - if not self.group_key: - raise OutdatedDevice("Cannot dump a v2 device, re-create it or use a Device with a version of 3 or higher") + def dumps(self, version: int = CURRENT_VERSION) -> bytes: + if self.group_key is None and version == self.CURRENT_VERSION: + raise OutdatedDevice("Cannot dump device as version 3 without having a group key. Either provide a group key or set argument version=2") return DeviceStructs.prd.build(dict( - version=self.CURRENT_VERSION, - group_key=self.group_key.dumps(), + version=version, + group_key=self.group_key.dumps() if self.group_key else None, encryption_key=self.encryption_key.dumps(), signing_key=self.signing_key.dumps(), group_certificate_length=len(self.group_certificate.dumps()), diff --git a/pyplayready/license/key.py b/pyplayready/license/key.py index 54ecca0..8ad60c5 100644 --- a/pyplayready/license/key.py +++ b/pyplayready/license/key.py @@ -49,11 +49,6 @@ class Key: @staticmethod def kid_to_uuid(kid: Union[str, bytes]) -> UUID: - """ - Convert a Key ID from a string or bytes to a UUID object. - At first, this may seem very simple, but some types of Key IDs - may not be 16 bytes and some may be decimal vs. hex. - """ if isinstance(kid, str): kid = base64.b64decode(kid) if not kid: diff --git a/pyplayready/main.py b/pyplayready/main.py index 7362500..8898aa6 100644 --- a/pyplayready/main.py +++ b/pyplayready/main.py @@ -8,11 +8,13 @@ import requests from Crypto.Random import get_random_bytes from pyplayready import __version__, InvalidCertificateChain, InvalidLicense -from pyplayready.system.bcert import CertificateChain, Certificate from pyplayready.cdm import Cdm -from pyplayready.device import Device from pyplayready.crypto.ecc_key import ECCKey -from pyplayready.exceptions import OutdatedDevice +from pyplayready.crypto.key_wrap import unwrap_wrapped_key +from pyplayready.device import Device +from pyplayready.misc.exceptions import OutdatedDevice +from pyplayready.system.bcert import CertificateChain, Certificate, BCertCertType, BCertObjType, BCertFeatures, \ + BCertKeyType, BCertKeyUsage from pyplayready.system.pssh import PSSH @@ -68,6 +70,10 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None: data=challenge ) + if license_response.status_code != 200: + log.error("Failed to send Challenge: [%s] %s", license_response.status_code, license_response.text) + return + licence = license_response.text log.debug(licence) @@ -126,37 +132,62 @@ def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> Non @main.command() -@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key") -@click.option("-e", "--encryption_key", type=Path, required=False, help="Optional Device ECC private encryption key") -@click.option("-s", "--signing_key", type=Path, required=False, help="Optional Device ECC private signing key") -@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain") +@click.option("-k", "--group_key", type=Path, help="Device ECC private group key (zgpriv.dat)") +@click.option("-pk", "--protected_group_key", type=Path, help="Protected Device ECC private group key (zgpriv_protected.dat)") +@click.option("-e", "--encryption_key", type=Path, help="Optional Device ECC private encryption key (zprivencr.dat)") +@click.option("-s", "--signing_key", type=Path, help="Optional Device ECC private signing key (zprivsig.dat)") +@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain (bgroupcert.dat)") @click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") @click.pass_context def create_device( ctx: click.Context, group_key: Path, + protected_group_key: Path, encryption_key: Optional[Path], signing_key: Optional[Path], group_certificate: Path, output: Optional[Path] = None ) -> None: """Create a Playready Device (.prd) file from an ECC private group key and group certificate chain""" - if not group_key.is_file(): - raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx) + if bool(group_key) == bool(protected_group_key): + raise click.UsageError("You must provide exactly one of group_key or protected_group_key.", ctx) if not group_certificate.is_file(): raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx) log = logging.getLogger("create-device") - encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate() - signing_key = ECCKey.load(signing_key) if signing_key else ECCKey.generate() + if group_key: + if not group_key.is_file(): + raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx) + + group_key = ECCKey.load(group_key) + elif protected_group_key: + if not protected_group_key.is_file(): + raise click.UsageError("protected_group_key: Not a path to a file, or it doesn't exist.", ctx) + + wrapped_key = protected_group_key.read_bytes() + unwrapped_key = unwrap_wrapped_key(wrapped_key) + group_key = ECCKey.loads(unwrapped_key) - group_key = ECCKey.load(group_key) certificate_chain = CertificateChain.load(group_certificate) - if certificate_chain.get(0).get_issuer_key() != group_key.public_bytes(): + if certificate_chain.get(0).get_type() == BCertCertType.DEVICE: + raise InvalidCertificateChain("Device has already been provisioned") + + if certificate_chain.get(0).get_type() != BCertCertType.ISSUER: + raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate if type DEVICE") + + if not certificate_chain.get(0).contains_public_key(group_key): raise InvalidCertificateChain("Group key does not match this certificate") + certificate_chain.verify_chain( + check_expiry=True, + cert_type=BCertCertType.ISSUER + ) + + encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate() + signing_key = ECCKey.load(signing_key) if signing_key else ECCKey.generate() + new_certificate = Certificate.new_leaf_cert( cert_id=get_random_bytes(16), security_level=certificate_chain.get_security_level(), @@ -168,7 +199,10 @@ def create_device( ) certificate_chain.prepend(new_certificate) - certificate_chain.verify() + certificate_chain.verify_chain( + check_expiry=True, + cert_type=BCertCertType.DEVICE + ) device = Device( group_key=group_key.dumps(), @@ -201,10 +235,83 @@ def create_device( log.info(" + Saved to: %s", out_path.absolute()) +@main.command() +@click.option("-e", "--encryption_key", type=Path, required=True, help="Optional Device ECC private encryption key (zprivencr.dat)") +@click.option("-s", "--signing_key", type=Path, required=True, help="Optional Device ECC private signing key (zprivsig.dat)") +@click.option("-c", "--group_certificate", type=Path, required=True, help="Provisioned device group certificate chain") +@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") +@click.pass_context +def build_device( + ctx: click.Context, + encryption_key: Optional[Path], + signing_key: Optional[Path], + group_certificate: Path, + output: Optional[Path] = None +) -> None: + """ + Build a V2 Playready Device (.prd) file from encryption/signing ECC private keys and a group certificate chain. + Your group certificate chain's leaf certificate must be of type DEVICE (be already provisioned) for this to work. + """ + if not encryption_key.is_file(): + raise click.UsageError("encryption_key: Not a path to a file, or it doesn't exist.", ctx) + if not signing_key.is_file(): + raise click.UsageError("signing_key: Not a path to a file, or it doesn't exist.", ctx) + if not group_certificate.is_file(): + raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx) + + log = logging.getLogger("build-device") + + encryption_key = ECCKey.load(encryption_key) + signing_key = ECCKey.load(signing_key) + + certificate_chain = CertificateChain.load(group_certificate) + leaf_certificate = certificate_chain.get(0) + + if not leaf_certificate.contains_public_key(encryption_key.public_bytes()): + raise InvalidCertificateChain("Leaf certificate does not contain encryption public key") + + if not leaf_certificate.contains_public_key(signing_key.public_bytes()): + raise InvalidCertificateChain("Leaf certificate does not contain signing public key") + + certificate_chain.verify_chain( + check_expiry=True, + cert_type=BCertCertType.DEVICE + ) + + device = Device( + group_key=None, + encryption_key=encryption_key.dumps(), + signing_key=signing_key.dumps(), + group_certificate=certificate_chain.dumps(), + ) + + if output and output.suffix: + if output.suffix.lower() != ".prd": + log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") + out_path = output + else: + out_dir = output or Path.cwd() + out_path = out_dir / f"{device.get_name()}.prd" + + if out_path.exists(): + log.error(f"A file already exists at the path '{out_path}', cannot overwrite.") + return + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(device.dumps(version=2)) + + log.info("Built Playready Device (.prd) file, %s", out_path.name) + log.info(" + Security Level: %s", device.security_level) + log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps())) + log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps())) + log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps())) + log.info(" + Saved to: %s", out_path.absolute()) + + @main.command() @click.argument("prd_path", type=Path) -@click.option("-e", "--encryption_key", type=Path, required=False, help="Optional Device ECC private encryption key") -@click.option("-s", "--signing_key", type=Path, required=False, help="Optional Device ECC private signing key") +@click.option("-e", "--encryption_key", type=Path, help="Optional Device ECC private encryption key") +@click.option("-s", "--signing_key", type=Path, help="Optional Device ECC private signing key") @click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") @click.pass_context def reprovision_device( @@ -231,6 +338,9 @@ def reprovision_device( if device.group_key is None: raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher") + if device.group_certificate.get(0).get_type() != BCertCertType.DEVICE: + raise InvalidCertificateChain("Device is not provisioned") + device.group_certificate.remove(0) encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate() @@ -250,6 +360,11 @@ def reprovision_device( ) device.group_certificate.prepend(new_certificate) + device.group_certificate.verify_chain( + check_expiry=True, + cert_type=BCertCertType.DEVICE + ) + if output and output.suffix: if output.suffix.lower() != ".prd": log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") @@ -262,6 +377,83 @@ def reprovision_device( log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name) +@main.command() +@click.option("-d", "--device", type=Path, default=None, help="PRD Device") +@click.option("-c", "--chain", type=Path, default=None, help="BCert Chain (bgroupcert.dat, bdevcert.dat)") +@click.pass_context +def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -> None: + """ + Inspect a (device's) Certificate Chain to display information about each of its Certificates. + """ + if bool(device) == bool(chain): + raise click.UsageError("You must provide exactly one of device or chain.", ctx) + + if device: + if not device.is_file(): + raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx) + + device = Device.load(device) + chai = device.group_certificate + elif chain: + if not chain.is_file(): + raise click.UsageError("chain: Not a path to a file, or it doesn't exist.", ctx) + + chai = CertificateChain.load(chain) + else: + return None # suppress warning + + log = logging.getLogger("inspect") + log.info("Certificate Chain Inspection:") + + log.info(f" + Version: {chai.parsed.version}") + log.info(f" + Certificate Count: {chai.parsed.certificate_count}") + + for i in range(chai.count()): + cert = chai.get(i) + + log.info(f" Certificate {i}:") + + basic_info = cert.get_attribute(BCertObjType.BASIC) + if basic_info: + log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}") + log.info(f" + Security Level: SL{basic_info.attribute.security_level}") + log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}") + log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}") + + manufacturer_info = cert.get_attribute(BCertObjType.MANUFACTURER) + if manufacturer_info: + manu_attr = manufacturer_info.attribute + + def un_pad(name: bytes): + return name.rstrip(b'\x00').decode("utf-8", errors="ignore") + + log.info( f" + Name: {un_pad(manu_attr.manufacturer_name)} {un_pad(manu_attr.model_name)} {un_pad(manu_attr.model_number)}") + + feature_info = cert.get_attribute(BCertObjType.FEATURE) + if feature_info and feature_info.attribute.feature_count > 0: + features = list(map( + lambda x: BCertFeatures(x).name, + feature_info.attribute.features + )) + log.info(f" + Features: {', '.join(features)}") + + key_info = cert.get_attribute(BCertObjType.KEY) + if key_info and key_info.attribute.key_count > 0: + key_attr = key_info.attribute + log.info(f" + Cert Keys:") + for idx, key in enumerate(key_attr.cert_keys): + log.info(f" + Key {idx}:") + log.info(f" + Type: {BCertKeyType(key.type).name}") + log.info(f" + Key Length: {key.length} bits") + usages = list(map( + lambda x: BCertKeyUsage(x).name, + key.usages + )) + if len(usages) > 0: + log.info(f" + Usages: {', '.join(usages)}") + + return None + @main.command() @click.argument("prd_path", type=Path) diff --git a/pyplayready/drmresults.py b/pyplayready/misc/drmresults.py similarity index 99% rename from pyplayready/drmresults.py rename to pyplayready/misc/drmresults.py index 2a4feca..248c2d3 100644 --- a/pyplayready/drmresults.py +++ b/pyplayready/misc/drmresults.py @@ -1,7 +1,7 @@ from enum import Enum -class DRMResult(Enum): +class DrmResult(Enum): """Holds Playready DRM results (error codes and their messages)""" DRM_SUCCESS = (0x00000000, "Operation was successful.") @@ -902,7 +902,7 @@ class DRMResult(Enum): @staticmethod def from_code(code: str): """Get the error message for a given error code.""" - for error in DRMResult: + for error in DrmResult: if error.value[0] == int(code, 16): return error raise ValueError("Invalid DRMResult") diff --git a/pyplayready/exceptions.py b/pyplayready/misc/exceptions.py similarity index 90% rename from pyplayready/exceptions.py rename to pyplayready/misc/exceptions.py index a388cb2..dd63c01 100644 --- a/pyplayready/exceptions.py +++ b/pyplayready/misc/exceptions.py @@ -1,3 +1,5 @@ +from pyplayready.misc.drmresults import DrmResult + class PyPlayreadyException(Exception): """Exceptions used by pyplayready.""" @@ -39,4 +41,4 @@ class OutdatedDevice(PyPlayreadyException): class ServerException(PyPlayreadyException): - """Recasted on the client if found in license response.""" \ No newline at end of file + """Recasted on the client if found in license response.""" diff --git a/pyplayready/remote/remotecdm.py b/pyplayready/remote/remotecdm.py index 404858f..bb76e34 100644 --- a/pyplayready/remote/remotecdm.py +++ b/pyplayready/remote/remotecdm.py @@ -10,7 +10,7 @@ from pyplayready.cdm import Cdm from pyplayready.device import Device from pyplayready.license.key import Key -from pyplayready.exceptions import (DeviceMismatch, InvalidInitData) +from pyplayready.misc.exceptions import (DeviceMismatch, InvalidInitData) from pyplayready.system.wrmheader import WRMHeader diff --git a/pyplayready/remote/serve.py b/pyplayready/remote/serve.py index c83695b..1480e43 100644 --- a/pyplayready/remote/serve.py +++ b/pyplayready/remote/serve.py @@ -8,7 +8,7 @@ from pyplayready import __version__, PSSH from pyplayready.cdm import Cdm from pyplayready.device import Device -from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh) +from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh) routes = web.RouteTableDef() diff --git a/pyplayready/system/bcert.py b/pyplayready/system/bcert.py index 293cce7..e2496c6 100644 --- a/pyplayready/system/bcert.py +++ b/pyplayready/system/bcert.py @@ -1,10 +1,12 @@ from __future__ import annotations + import collections.abc # monkey patch for construct 2.8.8 compatibility if not hasattr(collections, 'Sequence'): collections.Sequence = collections.abc.Sequence +import time import base64 from pathlib import Path from typing import Union, Optional @@ -17,7 +19,7 @@ from construct import Int16ub, Array from construct import Struct, this from pyplayready.crypto import Crypto -from pyplayready.exceptions import InvalidCertificateChain, InvalidCertificate +from pyplayready.misc.exceptions import InvalidCertificateChain, InvalidCertificate from pyplayready.crypto.ecc_key import ECCKey @@ -445,46 +447,91 @@ class Certificate(_BCertStructs): bcert_obj=cert ) - def get_attribute(self, type_: int): + def get_attribute(self, type_: int) -> Optional[Container]: for attribute in self.parsed.attributes: if attribute.tag == type_: return attribute - def get_security_level(self) -> int: - basic_info_attribute = self.get_attribute(BCertObjType.BASIC).attribute - if basic_info_attribute: - return basic_info_attribute.security_level + return None - @staticmethod - def _unpad(name: bytes): - return name.rstrip(b'\x00').decode("utf-8", errors="ignore") + def get_security_level(self) -> Optional[int]: + basic_info = self.get_attribute(BCertObjType.BASIC) + if basic_info: + return basic_info.attribute.security_level + + return None + + def get_name(self) -> Optional[str]: + manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER) - def get_name(self): - manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER).attribute if manufacturer_info: - return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}" + manufacturer_info_attr = manufacturer_info.attribute + + def un_pad(name: bytes): + return name.rstrip(b'\x00').decode("utf-8", errors="ignore") + + return f"{un_pad(manufacturer_info_attr.manufacturer_name)} {un_pad(manufacturer_info_attr.model_name)} {un_pad(manufacturer_info_attr.model_number)}" + + return None + + def get_type(self) -> Optional[int]: + basic_info = self.get_attribute(BCertObjType.BASIC) + if basic_info: + return basic_info.attribute.cert_type + + return None + + def get_expiration_date(self) -> Optional[int]: + basic_info = self.get_attribute(BCertObjType.BASIC) + if basic_info: + return basic_info.attribute.expiration_date + + return None def get_issuer_key(self) -> Optional[bytes]: + signature_object = self.get_attribute(BCertObjType.SIGNATURE) + if not signature_object: + return None + + return signature_object.attribute.signature_key + + def get_key_by_usage(self, key_usage: BCertKeyUsage) -> Optional[bytes]: key_info_object = self.get_attribute(BCertObjType.KEY) if not key_info_object: - return + return None - key_info_attribute = key_info_object.attribute - return next(map(lambda key: key.key, filter(lambda key: 6 in key.usages, key_info_attribute.cert_keys)), None) + for key in key_info_object.attribute.cert_keys: + for usage in key.usages: + if usage == key_usage: + return key.key + + return None + + def contains_public_key(self, public_key: Union[ECCKey, bytes]) -> bool: + if isinstance(public_key, ECCKey): + public_key = public_key.public_bytes() + + key_info_object = self.get_attribute(BCertObjType.KEY) + if not key_info_object: + return False + + for key in key_info_object.attribute.cert_keys: + if key.key == public_key: + return True + + return False def dumps(self) -> bytes: return self._BCERT.build(self.parsed) - def verify(self, public_key: bytes, index: int): + def verify_signature(self) -> None: signature_object = self.get_attribute(BCertObjType.SIGNATURE) if not signature_object: - raise InvalidCertificate(f"No signature object found in certificate {index}") + raise InvalidCertificate(f"No signature object found in certificate") signature_attribute = signature_object.attribute raw_signature_key = signature_attribute.signature_key - if public_key != raw_signature_key: - raise InvalidCertificate(f"Signature keys of certificate {index} do not match") signature_key = ECC.construct( curve='P-256', @@ -492,22 +539,25 @@ class Certificate(_BCertStructs): point_y=int.from_bytes(raw_signature_key[32:], 'big') ) - sign_payload = self.dumps()[:-signature_object.length] + sign_payload = self.dumps()[:self.parsed.certificate_length] if not Crypto.ecc256_verify( public_key=signature_key, data=sign_payload, signature=signature_attribute.signature ): - raise InvalidCertificate(f"Signature of certificate {index} is not authentic") - - return self.get_issuer_key() + raise InvalidCertificate(f"Signature of certificate is not authentic") class CertificateChain(_BCertStructs): """Represents a BCertChain""" - ECC256MSBCertRootIssuerPubKey = bytes.fromhex("864d61cff2256e422c568b3c28001cfb3e1527658584ba0521b79b1828d936de1d826a8fc3e6e7fa7a90d5ca2946f1f64a2efb9f5dcffe7e434eb44293fac5ab") + MSPlayReadyRootIssuerPubKey = bytes([ + 0x86, 0x4D, 0x61, 0xCF, 0xF2, 0x25, 0x6E, 0x42, 0x2C, 0x56, 0x8B, 0x3C, 0x28, 0x00, 0x1C, 0xFB, + 0x3E, 0x15, 0x27, 0x65, 0x85, 0x84, 0xBA, 0x05, 0x21, 0xB7, 0x9B, 0x18, 0x28, 0xD9, 0x36, 0xDE, + 0x1D, 0x82, 0x6A, 0x8F, 0xC3, 0xE6, 0xE7, 0xFA, 0x7A, 0x90, 0xD5, 0xCA, 0x29, 0x46, 0xF1, 0xF6, + 0x4A, 0x2E, 0xFB, 0x9F, 0x5D, 0xCF, 0xFE, 0x7E, 0x43, 0x4E, 0xB4, 0x42, 0x93, 0xFA, 0xC5, 0xAB + ]) def __init__( self, @@ -541,24 +591,66 @@ class CertificateChain(_BCertStructs): return self._BCERT_CHAIN.build(self.parsed) def get_security_level(self) -> int: - # not sure if there's a better way than this return self.get(0).get_security_level() def get_name(self) -> str: return self.get(0).get_name() - def verify(self) -> bool: - issuer_key = self.ECC256MSBCertRootIssuerPubKey + def verify_chain( + self, + check_expiry: bool = False, + cert_type: Optional[BCertCertType] = None + ) -> bool: + # There should be 1-6 certificates in a chain + if not (1 <= self.count() <= 6): + raise InvalidCertificateChain("An invalid maximum license chain depth") - try: - for i in reversed(range(self.count())): - certificate = self.get(i) - issuer_key = certificate.verify(issuer_key, i) + for i in range(self.count()): + if i == 0 and cert_type: + if self.get(i).get_type() != cert_type: + raise InvalidCertificateChain("Invalid certificate type") - if not issuer_key and i != 0: - raise InvalidCertificate(f"Certificate {i} is not valid") - except InvalidCertificate as e: - raise InvalidCertificateChain(e) + self.get(i).verify_signature() + + if check_expiry: + if time.time() >= self.get(i).get_expiration_date(): + raise InvalidCertificateChain(f"Certificate {i} has expired") + + if i > 0: + if not self._verify_adjacent_certs(self.get(i - 1), self.get(i)): + raise InvalidCertificateChain("Adjacent certificate validation failed") + + if i == (self.count() - 1): + if self.get(i).get_issuer_key() != self.MSPlayReadyRootIssuerPubKey: + raise InvalidCertificateChain("Root certificate issuer missmatch") + + return True + + @staticmethod + def _verify_adjacent_certs(child_cert: Certificate, parent_cert: Certificate) -> bool: + if parent_cert.get_type() != BCertCertType.ISSUER: + return False + + if child_cert.get_security_level() > parent_cert.get_expiration_date(): + return False + + key_info = parent_cert.get_attribute(BCertObjType.KEY) + if not key_info: + return False + + issuer_key = child_cert.get_issuer_key() + + issuer_key_match = False + for key in key_info.attribute.cert_keys: + if key.key == issuer_key: + issuer_key_match = True + + if not issuer_key_match: + return False + + # TODO: + # check issuer rights + # check issuer features/key usages return True diff --git a/pyplayready/system/pssh.py b/pyplayready/system/pssh.py index 760c01b..669c8ca 100644 --- a/pyplayready/system/pssh.py +++ b/pyplayready/system/pssh.py @@ -5,12 +5,12 @@ from uuid import UUID from construct import Struct, Int32ul, Int16ul, this, Bytes, Switch, Int8ub, Int24ub, Int32ub, Const, Container, \ ConstructError, Rebuild, Default, If, PrefixedArray, Prefixed, GreedyBytes -from pyplayready.exceptions import InvalidPssh +from pyplayready.misc.exceptions import InvalidPssh from pyplayready.system.wrmheader import WRMHeader class _PlayreadyPSSHStructs: - PSSHBox = Struct( + PsshBox = Struct( "length" / Int32ub, "pssh" / Const(b"pssh"), "version" / Rebuild(Int8ub, lambda ctx: 1 if (hasattr(ctx, "key_ids") and ctx.key_ids) else 0), @@ -58,7 +58,7 @@ class PSSH(_PlayreadyPSSHStructs): self.wrm_headers: List[WRMHeader] try: # PSSH Box -> PlayReady Header - box = self.PSSHBox.parse(data) + box = self.PsshBox.parse(data) if self._is_utf_16_le(box.data): self.wrm_headers = [WRMHeader(box.data)] else: diff --git a/pyplayready/system/wrmheader.py b/pyplayready/system/wrmheader.py index 292c0a3..1b636d2 100644 --- a/pyplayready/system/wrmheader.py +++ b/pyplayready/system/wrmheader.py @@ -33,7 +33,7 @@ class WRMHeader: def _missing_(cls, value): return cls.UNKNOWN - _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]] + _RETURN_STRUCTURE = Optional[Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]]] def __init__(self, data: Union[str, bytes]): """Load a WRM Header from either a string, base64 encoded data or bytes""" @@ -141,5 +141,7 @@ class WRMHeader: elif self.version == self.Version.VERSION_4_3_0_0: return self._read_v4_3_0_0(data) + return None + def dumps(self) -> str: return self._raw_data.decode("utf-16-le") diff --git a/pyproject.toml b/pyproject.toml index eae9b53..61f76d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyplayready" -version = "0.6.0" +version = "0.6.3" description = "pyplayready CDM (Content Decryption Module) implementation in Python." license = "CC BY-NC-ND 4.0" authors = ["DevLARLEY, Erevoc", "DevataDev"] @@ -39,6 +39,8 @@ click = "^8.1.7" xmltodict = "^0.14.2" PyYAML = "^6.0.1" aiohttp = {version = "^3.9.1", optional = true} +cryptography = "^45.0.6" +lxml = "^6.0.0" [tool.poetry.scripts] pyplayready = "pyplayready.main:main" diff --git a/requirements.txt b/requirements.txt index c543673..4027d4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,6 @@ construct==2.8.8 click PyYAML aiohttp -xmltodict \ No newline at end of file +xmltodict +cryptography +lxml \ No newline at end of file