+ Improved certificate chain validation

+ Added CLI function for creating v2 devices
+ Added CLI function for inspecting certificate chains
This commit is contained in:
larley 2025-08-20 21:32:47 +02:00
parent c5c2b56f49
commit fa01639834
16 changed files with 430 additions and 84 deletions

View File

@ -9,6 +9,8 @@ from pyplayready.remote.remotecdm import *
from pyplayready.system.bcert import *
from pyplayready.system.pssh import *
from pyplayready.system.session import *
from pyplayready.misc.drmresults import *
from pyplayready.misc.exceptions import *
__version__ = "0.6.0"
__version__ = "0.6.3"

View File

@ -4,7 +4,7 @@ import base64
import time
from typing import List, Union, Optional
from uuid import UUID
import xml.etree.ElementTree as ET
from lxml import etree as ET
import xmltodict
from Crypto.Cipher import AES
@ -16,12 +16,12 @@ from Crypto.Util.strxor import strxor
from ecpy.curves import Point, Curve
from pyplayready.crypto import Crypto
from pyplayready.drmresults import DRMResult
from pyplayready.misc.drmresults import DrmResult
from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.key import Key
from pyplayready.license.xmrlicense import XMRLicense, XMRObjectTypes
from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException)
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException)
from pyplayready.system.session import Session
from pyplayready.system.wrmheader import WRMHeader
@ -29,7 +29,10 @@ from pyplayready.system.wrmheader import WRMHeader
class Cdm:
MAX_NUM_OF_SESSIONS = 16
rgbMagicConstantZero = bytes.fromhex("7ee9ed4af773224f00b8ea7efb027cbb")
MagicConstantZero = bytes([
0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f,
0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb
])
def __init__(
self,
@ -288,13 +291,14 @@ class Cdm:
raise InvalidSession("Cannot parse a license message without first making a license request")
try:
root = ET.fromstring(licence)
parser = ET.XMLParser(remove_blank_text=True)
root = ET.XML(licence.encode(), parser)
faults = root.findall(".//{http://schemas.xmlsoap.org/soap/envelope/}Fault")
for fault in faults:
status_codes = fault.findall(".//StatusCode")
for status_code in status_codes:
code = DRMResult.from_code(status_code.text)
code = DrmResult.from_code(status_code.text)
raise ServerException(f"[{status_code.text}] ({code.name}) {code.message}")
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
@ -328,7 +332,7 @@ class Cdm:
embedded_root_license = content_key.encrypted_key[:144]
embedded_leaf_license = content_key.encrypted_key[144:]
rgb_key = strxor(ck, self.rgbMagicConstantZero)
rgb_key = strxor(ck, self.MagicConstantZero)
content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
aux_key = next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"]
@ -352,6 +356,7 @@ class Cdm:
key=ck
))
except Exception as e:
raise
raise InvalidLicense(f"Unable to parse license: {e}")
def get_keys(self, session_id: bytes) -> List[Key]:

View File

@ -29,7 +29,7 @@ class ECCKey:
if not isinstance(private_key, int):
raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
# The public is always derived from the private key; loading the other stuff won't work
# The public key is always derived from the private key; loading the other stuff won't work
key = ECC.construct(
curve='P-256',
d=private_key,

View File

@ -0,0 +1,52 @@
from Crypto.Cipher import AES
from Crypto.Hash import CMAC
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
def derive_wrapping_key() -> bytes:
"""
NIST SP 800-108 (Rev. 1)
"Recommendation for Key Derivation Using Pseudorandom Functions"
https://doi.org/10.6028/NIST.SP.800-108r1
"""
KeyDerivationCertificatePrivateKeysWrap = bytes([
0x9c, 0xe9, 0x34, 0x32, 0xc7, 0xd7, 0x40, 0x16,
0xba, 0x68, 0x47, 0x63, 0xf8, 0x01, 0xe1, 0x36
])
CTK_TEST = bytes([
0x8B, 0x22, 0x2F, 0xFD, 0x1E, 0x76, 0x19, 0x56,
0x59, 0xCF, 0x27, 0x03, 0x89, 0x8C, 0x42, 0x7F
])
cmac = CMAC.new(CTK_TEST, ciphermod=AES)
cmac.update(bytes([
1, # Iterations
*KeyDerivationCertificatePrivateKeysWrap,
0, # Separator
*bytes(16), # Context
0, 128 # Length in bits of return value
]))
derived_wrapping_key = cmac.digest()
return derived_wrapping_key
def unwrap_wrapped_key(wrapped_key: bytes) -> bytes:
"""
IETF RFC 3394
"Advanced Encryption Standard (AES) Key Wrap Algorithm"
https://www.rfc-editor.org/rfc/rfc3394
"""
wrapping_key = derive_wrapping_key()
unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key)
# bytes 0 -32: unwrapped key
# bytes 32-48: random bytes
return unwrapped_key[:32]

View File

@ -5,10 +5,10 @@ from enum import IntEnum
from pathlib import Path
from typing import Union, Any, Optional
from pyplayready.device.structs import DeviceStructs
from pyplayready.exceptions import OutdatedDevice
from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.device.structs import DeviceStructs
from pyplayready.misc.exceptions import OutdatedDevice
from pyplayready.system.bcert import CertificateChain
class Device:
@ -73,13 +73,13 @@ class Device:
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
if not self.group_key:
raise OutdatedDevice("Cannot dump a v2 device, re-create it or use a Device with a version of 3 or higher")
def dumps(self, version: int = CURRENT_VERSION) -> bytes:
if self.group_key is None and version == self.CURRENT_VERSION:
raise OutdatedDevice("Cannot dump device as version 3 without having a group key. Either provide a group key or set argument version=2")
return DeviceStructs.prd.build(dict(
version=self.CURRENT_VERSION,
group_key=self.group_key.dumps(),
version=version,
group_key=self.group_key.dumps() if self.group_key else None,
encryption_key=self.encryption_key.dumps(),
signing_key=self.signing_key.dumps(),
group_certificate_length=len(self.group_certificate.dumps()),

View File

@ -49,11 +49,6 @@ class Key:
@staticmethod
def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
"""
Convert a Key ID from a string or bytes to a UUID object.
At first, this may seem very simple, but some types of Key IDs
may not be 16 bytes and some may be decimal vs. hex.
"""
if isinstance(kid, str):
kid = base64.b64decode(kid)
if not kid:

View File

@ -8,11 +8,13 @@ import requests
from Crypto.Random import get_random_bytes
from pyplayready import __version__, InvalidCertificateChain, InvalidLicense
from pyplayready.system.bcert import CertificateChain, Certificate
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.exceptions import OutdatedDevice
from pyplayready.crypto.key_wrap import unwrap_wrapped_key
from pyplayready.device import Device
from pyplayready.misc.exceptions import OutdatedDevice
from pyplayready.system.bcert import CertificateChain, Certificate, BCertCertType, BCertObjType, BCertFeatures, \
BCertKeyType, BCertKeyUsage
from pyplayready.system.pssh import PSSH
@ -68,6 +70,10 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
data=challenge
)
if license_response.status_code != 200:
log.error("Failed to send Challenge: [%s] %s", license_response.status_code, license_response.text)
return
licence = license_response.text
log.debug(licence)
@ -126,37 +132,62 @@ def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> Non
@main.command()
@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
@click.option("-e", "--encryption_key", type=Path, required=False, help="Optional Device ECC private encryption key")
@click.option("-s", "--signing_key", type=Path, required=False, help="Optional Device ECC private signing key")
@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
@click.option("-k", "--group_key", type=Path, help="Device ECC private group key (zgpriv.dat)")
@click.option("-pk", "--protected_group_key", type=Path, help="Protected Device ECC private group key (zgpriv_protected.dat)")
@click.option("-e", "--encryption_key", type=Path, help="Optional Device ECC private encryption key (zprivencr.dat)")
@click.option("-s", "--signing_key", type=Path, help="Optional Device ECC private signing key (zprivsig.dat)")
@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain (bgroupcert.dat)")
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def create_device(
ctx: click.Context,
group_key: Path,
protected_group_key: Path,
encryption_key: Optional[Path],
signing_key: Optional[Path],
group_certificate: Path,
output: Optional[Path] = None
) -> None:
"""Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
if not group_key.is_file():
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
if bool(group_key) == bool(protected_group_key):
raise click.UsageError("You must provide exactly one of group_key or protected_group_key.", ctx)
if not group_certificate.is_file():
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("create-device")
encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate()
signing_key = ECCKey.load(signing_key) if signing_key else ECCKey.generate()
if group_key:
if not group_key.is_file():
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
group_key = ECCKey.load(group_key)
elif protected_group_key:
if not protected_group_key.is_file():
raise click.UsageError("protected_group_key: Not a path to a file, or it doesn't exist.", ctx)
wrapped_key = protected_group_key.read_bytes()
unwrapped_key = unwrap_wrapped_key(wrapped_key)
group_key = ECCKey.loads(unwrapped_key)
group_key = ECCKey.load(group_key)
certificate_chain = CertificateChain.load(group_certificate)
if certificate_chain.get(0).get_issuer_key() != group_key.public_bytes():
if certificate_chain.get(0).get_type() == BCertCertType.DEVICE:
raise InvalidCertificateChain("Device has already been provisioned")
if certificate_chain.get(0).get_type() != BCertCertType.ISSUER:
raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate if type DEVICE")
if not certificate_chain.get(0).contains_public_key(group_key):
raise InvalidCertificateChain("Group key does not match this certificate")
certificate_chain.verify_chain(
check_expiry=True,
cert_type=BCertCertType.ISSUER
)
encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate()
signing_key = ECCKey.load(signing_key) if signing_key else ECCKey.generate()
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(),
@ -168,7 +199,10 @@ def create_device(
)
certificate_chain.prepend(new_certificate)
certificate_chain.verify()
certificate_chain.verify_chain(
check_expiry=True,
cert_type=BCertCertType.DEVICE
)
device = Device(
group_key=group_key.dumps(),
@ -201,10 +235,83 @@ def create_device(
log.info(" + Saved to: %s", out_path.absolute())
@main.command()
@click.option("-e", "--encryption_key", type=Path, required=True, help="Optional Device ECC private encryption key (zprivencr.dat)")
@click.option("-s", "--signing_key", type=Path, required=True, help="Optional Device ECC private signing key (zprivsig.dat)")
@click.option("-c", "--group_certificate", type=Path, required=True, help="Provisioned device group certificate chain")
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def build_device(
ctx: click.Context,
encryption_key: Optional[Path],
signing_key: Optional[Path],
group_certificate: Path,
output: Optional[Path] = None
) -> None:
"""
Build a V2 Playready Device (.prd) file from encryption/signing ECC private keys and a group certificate chain.
Your group certificate chain's leaf certificate must be of type DEVICE (be already provisioned) for this to work.
"""
if not encryption_key.is_file():
raise click.UsageError("encryption_key: Not a path to a file, or it doesn't exist.", ctx)
if not signing_key.is_file():
raise click.UsageError("signing_key: Not a path to a file, or it doesn't exist.", ctx)
if not group_certificate.is_file():
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("build-device")
encryption_key = ECCKey.load(encryption_key)
signing_key = ECCKey.load(signing_key)
certificate_chain = CertificateChain.load(group_certificate)
leaf_certificate = certificate_chain.get(0)
if not leaf_certificate.contains_public_key(encryption_key.public_bytes()):
raise InvalidCertificateChain("Leaf certificate does not contain encryption public key")
if not leaf_certificate.contains_public_key(signing_key.public_bytes()):
raise InvalidCertificateChain("Leaf certificate does not contain signing public key")
certificate_chain.verify_chain(
check_expiry=True,
cert_type=BCertCertType.DEVICE
)
device = Device(
group_key=None,
encryption_key=encryption_key.dumps(),
signing_key=signing_key.dumps(),
group_certificate=certificate_chain.dumps(),
)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
out_path = output
else:
out_dir = output or Path.cwd()
out_path = out_dir / f"{device.get_name()}.prd"
if out_path.exists():
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
return
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps(version=2))
log.info("Built Playready Device (.prd) file, %s", out_path.name)
log.info(" + Security Level: %s", device.security_level)
log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
log.info(" + Saved to: %s", out_path.absolute())
@main.command()
@click.argument("prd_path", type=Path)
@click.option("-e", "--encryption_key", type=Path, required=False, help="Optional Device ECC private encryption key")
@click.option("-s", "--signing_key", type=Path, required=False, help="Optional Device ECC private signing key")
@click.option("-e", "--encryption_key", type=Path, help="Optional Device ECC private encryption key")
@click.option("-s", "--signing_key", type=Path, help="Optional Device ECC private signing key")
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def reprovision_device(
@ -231,6 +338,9 @@ def reprovision_device(
if device.group_key is None:
raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher")
if device.group_certificate.get(0).get_type() != BCertCertType.DEVICE:
raise InvalidCertificateChain("Device is not provisioned")
device.group_certificate.remove(0)
encryption_key = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate()
@ -250,6 +360,11 @@ def reprovision_device(
)
device.group_certificate.prepend(new_certificate)
device.group_certificate.verify_chain(
check_expiry=True,
cert_type=BCertCertType.DEVICE
)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
@ -262,6 +377,83 @@ def reprovision_device(
log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
@main.command()
@click.option("-d", "--device", type=Path, default=None, help="PRD Device")
@click.option("-c", "--chain", type=Path, default=None, help="BCert Chain (bgroupcert.dat, bdevcert.dat)")
@click.pass_context
def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -> None:
"""
Inspect a (device's) Certificate Chain to display information about each of its Certificates.
"""
if bool(device) == bool(chain):
raise click.UsageError("You must provide exactly one of device or chain.", ctx)
if device:
if not device.is_file():
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
device = Device.load(device)
chai = device.group_certificate
elif chain:
if not chain.is_file():
raise click.UsageError("chain: Not a path to a file, or it doesn't exist.", ctx)
chai = CertificateChain.load(chain)
else:
return None # suppress warning
log = logging.getLogger("inspect")
log.info("Certificate Chain Inspection:")
log.info(f" + Version: {chai.parsed.version}")
log.info(f" + Certificate Count: {chai.parsed.certificate_count}")
for i in range(chai.count()):
cert = chai.get(i)
log.info(f" Certificate {i}:")
basic_info = cert.get_attribute(BCertObjType.BASIC)
if basic_info:
log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}")
log.info(f" + Security Level: SL{basic_info.attribute.security_level}")
log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}")
log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}")
manufacturer_info = cert.get_attribute(BCertObjType.MANUFACTURER)
if manufacturer_info:
manu_attr = manufacturer_info.attribute
def un_pad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
log.info( f" + Name: {un_pad(manu_attr.manufacturer_name)} {un_pad(manu_attr.model_name)} {un_pad(manu_attr.model_number)}")
feature_info = cert.get_attribute(BCertObjType.FEATURE)
if feature_info and feature_info.attribute.feature_count > 0:
features = list(map(
lambda x: BCertFeatures(x).name,
feature_info.attribute.features
))
log.info(f" + Features: {', '.join(features)}")
key_info = cert.get_attribute(BCertObjType.KEY)
if key_info and key_info.attribute.key_count > 0:
key_attr = key_info.attribute
log.info(f" + Cert Keys:")
for idx, key in enumerate(key_attr.cert_keys):
log.info(f" + Key {idx}:")
log.info(f" + Type: {BCertKeyType(key.type).name}")
log.info(f" + Key Length: {key.length} bits")
usages = list(map(
lambda x: BCertKeyUsage(x).name,
key.usages
))
if len(usages) > 0:
log.info(f" + Usages: {', '.join(usages)}")
return None
@main.command()
@click.argument("prd_path", type=Path)

View File

@ -1,7 +1,7 @@
from enum import Enum
class DRMResult(Enum):
class DrmResult(Enum):
"""Holds Playready DRM results (error codes and their messages)"""
DRM_SUCCESS = (0x00000000, "Operation was successful.")
@ -902,7 +902,7 @@ class DRMResult(Enum):
@staticmethod
def from_code(code: str):
"""Get the error message for a given error code."""
for error in DRMResult:
for error in DrmResult:
if error.value[0] == int(code, 16):
return error
raise ValueError("Invalid DRMResult")

View File

@ -1,3 +1,5 @@
from pyplayready.misc.drmresults import DrmResult
class PyPlayreadyException(Exception):
"""Exceptions used by pyplayready."""
@ -39,4 +41,4 @@ class OutdatedDevice(PyPlayreadyException):
class ServerException(PyPlayreadyException):
"""Recasted on the client if found in license response."""
"""Recasted on the client if found in license response."""

View File

@ -10,7 +10,7 @@ from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.license.key import Key
from pyplayready.exceptions import (DeviceMismatch, InvalidInitData)
from pyplayready.misc.exceptions import (DeviceMismatch, InvalidInitData)
from pyplayready.system.wrmheader import WRMHeader

View File

@ -8,7 +8,7 @@ from pyplayready import __version__, PSSH
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh)
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh)
routes = web.RouteTableDef()

View File

@ -1,10 +1,12 @@
from __future__ import annotations
import collections.abc
# monkey patch for construct 2.8.8 compatibility
if not hasattr(collections, 'Sequence'):
collections.Sequence = collections.abc.Sequence
import time
import base64
from pathlib import Path
from typing import Union, Optional
@ -17,7 +19,7 @@ from construct import Int16ub, Array
from construct import Struct, this
from pyplayready.crypto import Crypto
from pyplayready.exceptions import InvalidCertificateChain, InvalidCertificate
from pyplayready.misc.exceptions import InvalidCertificateChain, InvalidCertificate
from pyplayready.crypto.ecc_key import ECCKey
@ -445,46 +447,91 @@ class Certificate(_BCertStructs):
bcert_obj=cert
)
def get_attribute(self, type_: int):
def get_attribute(self, type_: int) -> Optional[Container]:
for attribute in self.parsed.attributes:
if attribute.tag == type_:
return attribute
def get_security_level(self) -> int:
basic_info_attribute = self.get_attribute(BCertObjType.BASIC).attribute
if basic_info_attribute:
return basic_info_attribute.security_level
return None
@staticmethod
def _unpad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
def get_security_level(self) -> Optional[int]:
basic_info = self.get_attribute(BCertObjType.BASIC)
if basic_info:
return basic_info.attribute.security_level
return None
def get_name(self) -> Optional[str]:
manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER)
def get_name(self):
manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER).attribute
if manufacturer_info:
return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
manufacturer_info_attr = manufacturer_info.attribute
def un_pad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
return f"{un_pad(manufacturer_info_attr.manufacturer_name)} {un_pad(manufacturer_info_attr.model_name)} {un_pad(manufacturer_info_attr.model_number)}"
return None
def get_type(self) -> Optional[int]:
basic_info = self.get_attribute(BCertObjType.BASIC)
if basic_info:
return basic_info.attribute.cert_type
return None
def get_expiration_date(self) -> Optional[int]:
basic_info = self.get_attribute(BCertObjType.BASIC)
if basic_info:
return basic_info.attribute.expiration_date
return None
def get_issuer_key(self) -> Optional[bytes]:
signature_object = self.get_attribute(BCertObjType.SIGNATURE)
if not signature_object:
return None
return signature_object.attribute.signature_key
def get_key_by_usage(self, key_usage: BCertKeyUsage) -> Optional[bytes]:
key_info_object = self.get_attribute(BCertObjType.KEY)
if not key_info_object:
return
return None
key_info_attribute = key_info_object.attribute
return next(map(lambda key: key.key, filter(lambda key: 6 in key.usages, key_info_attribute.cert_keys)), None)
for key in key_info_object.attribute.cert_keys:
for usage in key.usages:
if usage == key_usage:
return key.key
return None
def contains_public_key(self, public_key: Union[ECCKey, bytes]) -> bool:
if isinstance(public_key, ECCKey):
public_key = public_key.public_bytes()
key_info_object = self.get_attribute(BCertObjType.KEY)
if not key_info_object:
return False
for key in key_info_object.attribute.cert_keys:
if key.key == public_key:
return True
return False
def dumps(self) -> bytes:
return self._BCERT.build(self.parsed)
def verify(self, public_key: bytes, index: int):
def verify_signature(self) -> None:
signature_object = self.get_attribute(BCertObjType.SIGNATURE)
if not signature_object:
raise InvalidCertificate(f"No signature object found in certificate {index}")
raise InvalidCertificate(f"No signature object found in certificate")
signature_attribute = signature_object.attribute
raw_signature_key = signature_attribute.signature_key
if public_key != raw_signature_key:
raise InvalidCertificate(f"Signature keys of certificate {index} do not match")
signature_key = ECC.construct(
curve='P-256',
@ -492,22 +539,25 @@ class Certificate(_BCertStructs):
point_y=int.from_bytes(raw_signature_key[32:], 'big')
)
sign_payload = self.dumps()[:-signature_object.length]
sign_payload = self.dumps()[:self.parsed.certificate_length]
if not Crypto.ecc256_verify(
public_key=signature_key,
data=sign_payload,
signature=signature_attribute.signature
):
raise InvalidCertificate(f"Signature of certificate {index} is not authentic")
return self.get_issuer_key()
raise InvalidCertificate(f"Signature of certificate is not authentic")
class CertificateChain(_BCertStructs):
"""Represents a BCertChain"""
ECC256MSBCertRootIssuerPubKey = bytes.fromhex("864d61cff2256e422c568b3c28001cfb3e1527658584ba0521b79b1828d936de1d826a8fc3e6e7fa7a90d5ca2946f1f64a2efb9f5dcffe7e434eb44293fac5ab")
MSPlayReadyRootIssuerPubKey = bytes([
0x86, 0x4D, 0x61, 0xCF, 0xF2, 0x25, 0x6E, 0x42, 0x2C, 0x56, 0x8B, 0x3C, 0x28, 0x00, 0x1C, 0xFB,
0x3E, 0x15, 0x27, 0x65, 0x85, 0x84, 0xBA, 0x05, 0x21, 0xB7, 0x9B, 0x18, 0x28, 0xD9, 0x36, 0xDE,
0x1D, 0x82, 0x6A, 0x8F, 0xC3, 0xE6, 0xE7, 0xFA, 0x7A, 0x90, 0xD5, 0xCA, 0x29, 0x46, 0xF1, 0xF6,
0x4A, 0x2E, 0xFB, 0x9F, 0x5D, 0xCF, 0xFE, 0x7E, 0x43, 0x4E, 0xB4, 0x42, 0x93, 0xFA, 0xC5, 0xAB
])
def __init__(
self,
@ -541,24 +591,66 @@ class CertificateChain(_BCertStructs):
return self._BCERT_CHAIN.build(self.parsed)
def get_security_level(self) -> int:
# not sure if there's a better way than this
return self.get(0).get_security_level()
def get_name(self) -> str:
return self.get(0).get_name()
def verify(self) -> bool:
issuer_key = self.ECC256MSBCertRootIssuerPubKey
def verify_chain(
self,
check_expiry: bool = False,
cert_type: Optional[BCertCertType] = None
) -> bool:
# There should be 1-6 certificates in a chain
if not (1 <= self.count() <= 6):
raise InvalidCertificateChain("An invalid maximum license chain depth")
try:
for i in reversed(range(self.count())):
certificate = self.get(i)
issuer_key = certificate.verify(issuer_key, i)
for i in range(self.count()):
if i == 0 and cert_type:
if self.get(i).get_type() != cert_type:
raise InvalidCertificateChain("Invalid certificate type")
if not issuer_key and i != 0:
raise InvalidCertificate(f"Certificate {i} is not valid")
except InvalidCertificate as e:
raise InvalidCertificateChain(e)
self.get(i).verify_signature()
if check_expiry:
if time.time() >= self.get(i).get_expiration_date():
raise InvalidCertificateChain(f"Certificate {i} has expired")
if i > 0:
if not self._verify_adjacent_certs(self.get(i - 1), self.get(i)):
raise InvalidCertificateChain("Adjacent certificate validation failed")
if i == (self.count() - 1):
if self.get(i).get_issuer_key() != self.MSPlayReadyRootIssuerPubKey:
raise InvalidCertificateChain("Root certificate issuer missmatch")
return True
@staticmethod
def _verify_adjacent_certs(child_cert: Certificate, parent_cert: Certificate) -> bool:
if parent_cert.get_type() != BCertCertType.ISSUER:
return False
if child_cert.get_security_level() > parent_cert.get_expiration_date():
return False
key_info = parent_cert.get_attribute(BCertObjType.KEY)
if not key_info:
return False
issuer_key = child_cert.get_issuer_key()
issuer_key_match = False
for key in key_info.attribute.cert_keys:
if key.key == issuer_key:
issuer_key_match = True
if not issuer_key_match:
return False
# TODO:
# check issuer rights
# check issuer features/key usages
return True

View File

@ -5,12 +5,12 @@ from uuid import UUID
from construct import Struct, Int32ul, Int16ul, this, Bytes, Switch, Int8ub, Int24ub, Int32ub, Const, Container, \
ConstructError, Rebuild, Default, If, PrefixedArray, Prefixed, GreedyBytes
from pyplayready.exceptions import InvalidPssh
from pyplayready.misc.exceptions import InvalidPssh
from pyplayready.system.wrmheader import WRMHeader
class _PlayreadyPSSHStructs:
PSSHBox = Struct(
PsshBox = Struct(
"length" / Int32ub,
"pssh" / Const(b"pssh"),
"version" / Rebuild(Int8ub, lambda ctx: 1 if (hasattr(ctx, "key_ids") and ctx.key_ids) else 0),
@ -58,7 +58,7 @@ class PSSH(_PlayreadyPSSHStructs):
self.wrm_headers: List[WRMHeader]
try:
# PSSH Box -> PlayReady Header
box = self.PSSHBox.parse(data)
box = self.PsshBox.parse(data)
if self._is_utf_16_le(box.data):
self.wrm_headers = [WRMHeader(box.data)]
else:

View File

@ -33,7 +33,7 @@ class WRMHeader:
def _missing_(cls, value):
return cls.UNKNOWN
_RETURN_STRUCTURE = Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]]
_RETURN_STRUCTURE = Optional[Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]]]
def __init__(self, data: Union[str, bytes]):
"""Load a WRM Header from either a string, base64 encoded data or bytes"""
@ -141,5 +141,7 @@ class WRMHeader:
elif self.version == self.Version.VERSION_4_3_0_0:
return self._read_v4_3_0_0(data)
return None
def dumps(self) -> str:
return self._raw_data.decode("utf-16-le")

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
version = "0.6.0"
version = "0.6.3"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"]
@ -39,6 +39,8 @@ click = "^8.1.7"
xmltodict = "^0.14.2"
PyYAML = "^6.0.1"
aiohttp = {version = "^3.9.1", optional = true}
cryptography = "^45.0.6"
lxml = "^6.0.0"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"

View File

@ -5,4 +5,6 @@ construct==2.8.8
click
PyYAML
aiohttp
xmltodict
xmltodict
cryptography
lxml