+ Added support for RevLists

+ Updated RemoteCdm/Serve to support RevLists
  + Added Storage class for RevList persistence
+ Added support for ExtData objects in BCerts + signature verification
+ Added an Xml Builder class (instead of xmltodict)
+ Added a SOAP Builder/Parser class (instead of xmltodict)
+ Refactored WRMHeader class to use ET (instead of xmltodict)
+ Upgraded ServerException detection
+ Removed dependencies: xmltodict, lxml
+ Added Util class
+ Minor Crypto/ElGamal class changes
This commit is contained in:
larley 2025-10-05 13:25:22 +02:00
parent fa01639834
commit 17c69027e0
23 changed files with 1435 additions and 459 deletions

View File

@ -31,6 +31,7 @@ An example code snippet:
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.system.pssh import PSSH
from pyplayready.misc.revocation_list import RevocationList
import requests
@ -52,7 +53,7 @@ pssh = PSSH(
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
request = cdm.get_license_challenge(session_id, pssh.wrm_headers[0])
request = cdm.get_license_challenge(session_id, pssh.wrm_headers[0], rev_lists=RevocationList.SupportedListIds)
response = requests.post(
url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",

View File

@ -1,6 +1,7 @@
from pyplayready.cdm import *
from pyplayready.crypto.ecc_key import *
from pyplayready.crypto.elgamal import *
from pyplayready.crypto import *
from pyplayready.cdm import *
from pyplayready.device import *
from pyplayready.license.key import *
from pyplayready.license.xml_key import *
@ -11,6 +12,8 @@ from pyplayready.system.pssh import *
from pyplayready.system.session import *
from pyplayready.misc.drmresults import *
from pyplayready.misc.exceptions import *
from pyplayready.misc.revocation_list import *
from pyplayready.misc.storage import *
__version__ = "0.6.3"
__version__ = "0.8.0"

View File

@ -1,27 +1,23 @@
from __future__ import annotations
import base64
import time
import xml.etree.ElementTree as ET
from typing import List, Union, Optional
from uuid import UUID
from lxml import etree as ET
import xmltodict
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad
from Crypto.Util.strxor import strxor
from ecpy.curves import Point, Curve
from pyplayready.crypto import Crypto
from pyplayready.misc.drmresults import DrmResult
from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.key import Key
from pyplayready.license.xmrlicense import XMRLicense, XMRObjectTypes
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException)
from pyplayready.license.license import License
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidXmrLicense)
from pyplayready.misc.revocation_list import RevocationList
from pyplayready.misc.soap_message import SoapMessage
from pyplayready.misc.storage import Storage
from pyplayready.system.bcert import CertificateChain
from pyplayready.system.builder import XmlBuilder
from pyplayready.system.session import Session
from pyplayready.system.wrmheader import WRMHeader
@ -29,11 +25,6 @@ from pyplayready.system.wrmheader import WRMHeader
class Cdm:
MAX_NUM_OF_SESSIONS = 16
MagicConstantZero = bytes([
0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f,
0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb
])
def __init__(
self,
security_level: int,
@ -48,7 +39,6 @@ class Cdm:
self.signing_key = signing_key
self.client_version = client_version
self.__crypto = Crypto()
self._wmrm_key = Point(
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
@ -84,30 +74,8 @@ class Cdm:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
del self.__sessions[session_id]
def _get_key_data(self, session: Session) -> bytes:
return self.__crypto.ecc256_encrypt(
public_key=self._wmrm_key,
plaintext=session.xml_key.get_point()
)
def _get_cipher_data(self, session: Session) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = xmltodict.unparse({
'Data': {
'CertificateChains': {
'CertificateChain': b64_chain
},
'Features': {
'Feature': {
'@Name': 'AESCBC',
'#text': '""'
},
'REE': {
'AESCBCS': None
}
}
}
}, full_document=False)
body = XmlBuilder.ClientData([self.certificate_chain], ["AESCBCS"])
cipher = AES.new(
key=session.xml_key.aes_key,
@ -122,111 +90,12 @@ class Cdm:
return session.xml_key.aes_iv + ciphertext
@staticmethod
def _build_main_body(la_content: dict, signed_info: dict, signature: str, public_signing_key: str) -> dict:
return {
'soap:Envelope': {
'@xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'@xmlns:xsd': 'http://www.w3.org/2001/XMLSchema',
'@xmlns:soap': 'http://schemas.xmlsoap.org/soap/envelope/',
'soap:Body': {
'AcquireLicense': {
'@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols',
'challenge': {
'Challenge': {
'@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols/messages',
'LA': la_content["LA"],
'Signature': {
'SignedInfo': signed_info["SignedInfo"],
'@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
'SignatureValue': signature,
'KeyInfo': {
'@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
'KeyValue': {
'ECCKeyValue': {
'PublicKey': public_signing_key
}
}
}
}
}
}
}
}
}
}
def _build_digest_content(
def get_license_challenge(
self,
wrm_header: str,
nonce: str,
wmrm_cipher: str,
cert_cipher: str,
protocol_version: int
) -> dict:
return {
'LA': {
'@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols',
'@Id': 'SignedData',
'@xml:space': 'preserve',
'Version': protocol_version,
'ContentHeader': xmltodict.parse(wrm_header),
'CLIENTINFO': {
'CLIENTVERSION': self.client_version
},
'LicenseNonce': nonce,
'ClientTime': int(time.time()),
'EncryptedData': {
'@xmlns': 'http://www.w3.org/2001/04/xmlenc#',
'@Type': 'http://www.w3.org/2001/04/xmlenc#Element',
'EncryptionMethod': {
'@Algorithm': 'http://www.w3.org/2001/04/xmlenc#aes128-cbc'
},
'KeyInfo': {
'@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
'EncryptedKey': {
'@xmlns': 'http://www.w3.org/2001/04/xmlenc#',
'EncryptionMethod': {
'@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256'
},
'KeyInfo': {
'@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
'KeyName': 'WMRMServer'
},
'CipherData': {
'CipherValue': wmrm_cipher
}
}
},
'CipherData': {
'CipherValue': cert_cipher
}
}
}
}
@staticmethod
def _build_signed_info(digest_value: str) -> dict:
return {
'SignedInfo': {
'@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
'CanonicalizationMethod': {
'@Algorithm': 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315'
},
'SignatureMethod': {
'@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256'
},
'Reference': {
'@URI': '#SignedData',
'DigestMethod': {
'@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#sha256'
},
'DigestValue': digest_value
}
}
}
def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str]) -> str:
session_id: bytes,
wrm_header: Union[WRMHeader, str],
rev_lists: Optional[List[UUID]]=None # default: RevocationList.SupportedListIds
) -> str:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
@ -234,7 +103,10 @@ class Cdm:
if isinstance(wrm_header, str):
wrm_header = WRMHeader(wrm_header)
if not isinstance(wrm_header, WRMHeader):
raise ValueError(f"Expected WRMHeader to be a {str} or {WRMHeader} not {wrm_header!r}")
raise ValueError(f"Expected wrm_header to be a {str} or {WRMHeader} not {wrm_header!r}")
if rev_lists and not isinstance(rev_lists, list):
raise ValueError(f"Expected rev_lists to be a {list} not {rev_lists!r}")
match wrm_header.version:
case WRMHeader.Version.VERSION_4_3_0_0:
@ -247,117 +119,51 @@ class Cdm:
session.signing_key = self.signing_key
session.encryption_key = self.encryption_key
la_content = self._build_digest_content(
wrm_header=wrm_header.dumps(),
nonce=base64.b64encode(get_random_bytes(16)).decode(),
wmrm_cipher=base64.b64encode(self._get_key_data(session)).decode(),
cert_cipher=base64.b64encode(self._get_cipher_data(session)).decode(),
protocol_version=protocol_version
acquire_license_message = XmlBuilder.AcquireLicenseMessage(
wrmheader=wrm_header.dumps(),
protocol_version=protocol_version,
wrmserver_data=Crypto.ecc256_encrypt(self._wmrm_key, session.xml_key.get_point()),
client_data=self._get_cipher_data(session),
signing_key=self.signing_key,
client_info=self.client_version,
revocation_lists=rev_lists
)
la_content_xml = xmltodict.unparse(la_content, full_document=False)
soap_message = SoapMessage.create(acquire_license_message)
la_hash_obj = SHA256.new()
la_hash_obj.update(la_content_xml.encode())
la_hash = la_hash_obj.digest()
return soap_message.dumps()
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
signed_info_xml = xmltodict.unparse(signed_info, full_document=False)
signature = self.__crypto.ecc256_sign(session.signing_key, signed_info_xml.encode())
b64_signature = base64.b64encode(signature).decode()
b64_public_singing_key = base64.b64encode(session.signing_key.public_bytes()).decode()
return xmltodict.unparse(self._build_main_body(la_content, signed_info, b64_signature, b64_public_singing_key)).replace('\n', '')
@staticmethod
def _verify_encryption_key(session: Session, licence: XMRLicense) -> bool:
ecc_keys = list(licence.get_object(XMRObjectTypes.ECC_DEVICE_KEY_OBJECT))
if not ecc_keys:
raise InvalidLicense("No ECC public key in license")
return ecc_keys[0].key == session.encryption_key.public_bytes()
def parse_license(self, session_id: bytes, licence: str) -> None:
def parse_license(self, session_id: bytes, soap_message: str) -> None:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
if not licence:
raise InvalidLicense("Cannot parse an empty licence message")
if not isinstance(licence, str):
raise InvalidLicense(f"Expected licence message to be a {str}, not {licence!r}")
if not soap_message:
raise InvalidXmrLicense("Cannot parse an empty licence message")
if not isinstance(soap_message, str):
raise InvalidXmrLicense(f"Expected licence message to be a {str}, not {soap_message!r}")
if not session.encryption_key or not session.signing_key:
raise InvalidSession("Cannot parse a license message without first making a license request")
try:
parser = ET.XMLParser(remove_blank_text=True)
root = ET.XML(licence.encode(), parser)
faults = root.findall(".//{http://schemas.xmlsoap.org/soap/envelope/}Fault")
soap_message = SoapMessage.loads(soap_message)
soap_message.raise_faults()
for fault in faults:
status_codes = fault.findall(".//StatusCode")
for status_code in status_codes:
code = DrmResult.from_code(status_code.text)
raise ServerException(f"[{status_code.text}] ({code.name}) {code.message}")
licence = License(soap_message.get_message())
if licence.is_verifiable():
licence.verify()
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
if licence.rev_info is not None:
current_rev_info_file = Storage.read_file(RevocationList.CurrentRevListStorageName)
for license_element in license_elements:
parsed_licence = XMRLicense.loads(license_element.text)
if current_rev_info_file:
new_rev_info = RevocationList.merge(ET.fromstring(current_rev_info_file), licence.rev_info)
else:
new_rev_info = licence.rev_info
if not self._verify_encryption_key(session, parsed_licence):
raise InvalidLicense("Public encryption key does not match")
Storage.write_file(RevocationList.CurrentRevListStorageName, new_rev_info)
Storage.write_file(RevocationList.loads(new_rev_info).get_storage_file_name(), new_rev_info)
is_scalable = bool(next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT), None))
for content_key in parsed_licence.get_content_keys():
cipher_type = Key.CipherType(content_key.cipher_type)
if cipher_type not in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC):
raise InvalidLicense(f"Invalid cipher type {cipher_type}")
via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC
decrypted = self.__crypto.ecc256_decrypt(
private_key=session.encryption_key,
ciphertext=content_key.encrypted_key
)
ci, ck = decrypted[:16], decrypted[16:]
if is_scalable:
ci, ck = decrypted[::2][:16], decrypted[1::2][:16]
if via_symmetric:
embedded_root_license = content_key.encrypted_key[:144]
embedded_leaf_license = content_key.encrypted_key[144:]
rgb_key = strxor(ck, self.MagicConstantZero)
content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
aux_key = next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"]
uplink_x_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key)
secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:])
embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:]
if not parsed_licence.check_signature(ci):
raise InvalidLicense("License integrity signature does not match")
session.keys.append(Key(
key_id=UUID(bytes_le=content_key.key_id),
key_type=content_key.key_type,
cipher_type=content_key.cipher_type,
key_length=content_key.key_length,
key=ck
))
except Exception as e:
raise
raise InvalidLicense(f"Unable to parse license: {e}")
for xmr_license in licence.licenses:
session.keys.append(xmr_license.get_content_key(session.encryption_key))
def get_keys(self, session_id: bytes) -> List[Key]:
session = self.__sessions.get(session_id)

View File

@ -8,16 +8,16 @@ from ecpy.curves import Point, Curve
from pyplayready.crypto.elgamal import ElGamal
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.system.util import Util
class Crypto:
def __init__(self, curve: str = "secp256r1"):
self.curve = Curve.get_curve(curve)
self.elgamal = ElGamal(self.curve)
curve = Curve.get_curve("secp256r1")
def ecc256_encrypt(self, public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
@staticmethod
def ecc256_encrypt(public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
if isinstance(public_key, ECCKey):
public_key = public_key.get_point(self.curve)
public_key = public_key.get_point(Crypto.curve)
if not isinstance(public_key, Point):
raise ValueError(f"Expecting ECCKey or Point input, got {public_key!r}")
@ -25,41 +25,39 @@ class Crypto:
plaintext = Point(
x=int.from_bytes(plaintext[:32], 'big'),
y=int.from_bytes(plaintext[32:64], 'big'),
curve=self.curve
curve=Crypto.curve
)
if not isinstance(plaintext, Point):
raise ValueError(f"Expecting Point or Bytes input, got {plaintext!r}")
point1, point2 = self.elgamal.encrypt(
message_point=plaintext,
public_key=public_key
)
point1, point2 = ElGamal.encrypt(plaintext, public_key)
return b''.join([
self.elgamal.to_bytes(point1.x),
self.elgamal.to_bytes(point1.y),
self.elgamal.to_bytes(point2.x),
self.elgamal.to_bytes(point2.y)
Util.to_bytes(point1.x),
Util.to_bytes(point1.y),
Util.to_bytes(point2.x),
Util.to_bytes(point2.y)
])
def ecc256_decrypt(self, private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
@staticmethod
def ecc256_decrypt(private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
if isinstance(ciphertext, bytes):
ciphertext = (
Point(
x=int.from_bytes(ciphertext[:32], 'big'),
y=int.from_bytes(ciphertext[32:64], 'big'),
curve=self.curve
curve=Crypto.curve
),
Point(
x=int.from_bytes(ciphertext[64:96], 'big'),
y=int.from_bytes(ciphertext[96:128], 'big'),
curve=self.curve
curve=Crypto.curve
)
)
if not isinstance(ciphertext, Tuple):
raise ValueError(f"Expecting Tuple[Point, Point] or Bytes input, got {ciphertext!r}")
decrypted = self.elgamal.decrypt(ciphertext, int(private_key.key.d))
return self.elgamal.to_bytes(decrypted.x)
decrypted = ElGamal.decrypt(ciphertext, int(private_key.key.d))
return Util.to_bytes(decrypted.x)
@staticmethod
def ecc256_sign(private_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes]) -> bytes:

View File

@ -9,6 +9,8 @@ from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from ecpy.curves import Curve, Point
from pyplayready.system.util import Util
class ECCKey:
"""Represents a PlayReady ECC key pair"""
@ -68,18 +70,11 @@ class ECCKey:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps(private_only))
@staticmethod
def _to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')
def get_point(self, curve: Curve) -> Point:
return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
def private_bytes(self) -> bytes:
return self._to_bytes(int(self.key.d))
return Util.to_bytes(int(self.key.d))
def private_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
@ -87,7 +82,7 @@ class ECCKey:
return hash_object.digest()
def public_bytes(self) -> bytes:
return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y))
return Util.to_bytes(int(self.key.pointQ.x)) + Util.to_bytes(int(self.key.pointQ.y))
def public_sha256_digest(self) -> bytes:
hash_object = SHA256.new()

View File

@ -7,25 +7,17 @@ import secrets
class ElGamal:
"""ElGamal ECC utility using ecpy"""
def __init__(self, curve: Curve):
"""Initialize the utility with a given curve type ('secp256r1' for PlayReady)"""
self.curve = curve
curve = Curve.get_curve("secp256r1")
@staticmethod
def to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')
def encrypt(self, message_point: Point, public_key: Point) -> Tuple[Point, Point]:
def encrypt(message_point: Point, public_key: Point) -> Tuple[Point, Point]:
"""
Encrypt a single point with a given public key
Returns an encrypted point pair
"""
ephemeral_key = secrets.randbelow(self.curve.order)
point1 = ephemeral_key * self.curve.generator
ephemeral_key = secrets.randbelow(ElGamal.curve.order)
point1 = ephemeral_key * ElGamal.curve.generator
point2 = message_point + (ephemeral_key * public_key)
return point1, point2

View File

@ -0,0 +1,126 @@
from __future__ import annotations
import base64
import copy
import hashlib
import xml.etree.ElementTree as ET
from typing import Union, Iterator
from Crypto.PublicKey import ECC
from pyplayready import Crypto
from pyplayready.license.xmrlicense import XMRLicense
from pyplayready.misc.exceptions import InvalidLicense
from pyplayready.system.bcert import CertificateChain, BCertKeyUsage
from pyplayready.system.util import Util
class License:
def __init__(self, data: Union[str, bytes, ET.Element]):
if not data:
raise InvalidLicense("Data must not be empty")
if isinstance(data, str):
data = data.encode()
if isinstance(data, bytes):
self._root = ET.fromstring(data)
elif isinstance(data, ET.Element):
self._root = data
else:
raise InvalidLicense("Invalid data type")
self._original_root = copy.deepcopy(self._root)
Util.remove_namespaces(self._root)
if self._root.tag != "AcquireLicenseResponse":
raise InvalidLicense("License root must be AcquireLicenseResponse")
self._Response = self._root.find("AcquireLicenseResult/Response")
if self._Response is None:
raise InvalidLicense("Response not found in license")
self.rmsdk_version = self._Response.get("rmsdkVersion")
self._LicenseResponse = self._Response.find("LicenseResponse")
if self._Response is None:
raise InvalidLicense("LicenseResponse not found in license")
self.version = self._LicenseResponse.findtext("Version")
self.licenses = list(self._load_licenses())
self.rev_info = self._LicenseResponse.find("RevInfo")
self.transaction_id = self._LicenseResponse.find("Acknowledgement/TransactionID")
self.license_nonce = self._LicenseResponse.findtext("LicenseNonce")
self.response_id = self._LicenseResponse.findtext("ResponseID")
cert_chain_str = self._LicenseResponse.findtext("SigningCertificateChain")
self.signing_certificate_chain = CertificateChain.loads(cert_chain_str) if cert_chain_str else None
def _find_element_raw(self, name: str) -> ET.Element:
return self._original_root.find(f".//{name}", {
"": "http://www.w3.org/2000/09/xmldsig#",
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"proto": "http://schemas.microsoft.com/DRM/2007/03/protocols",
"msg": "http://schemas.microsoft.com/DRM/2007/03/protocols/messages"
})
def _load_licenses(self) -> Iterator[XMRLicense]:
Licenses = self._LicenseResponse.findall("Licenses/License")
if Licenses is None:
return iter([])
for license_ in Licenses:
yield XMRLicense.loads(license_.text)
def is_verifiable(self):
if self.signing_certificate_chain is None:
return False
signature = self._Response.find("Signature")
if signature is None:
return False
if signature.findtext("SignedInfo/Reference/DigestValue") is None:
return False
if signature.findtext("SignatureValue") is None:
return False
return True
def verify(self):
if not self.is_verifiable():
raise RuntimeError("Missing required information for license signature verification")
ET.register_namespace("", "http://schemas.microsoft.com/DRM/2007/03/protocols")
license_response_xml = ET.tostring(self._find_element_raw("proto:LicenseResponse"), short_empty_elements=False)
response_hash = hashlib.sha256(license_response_xml).digest()
Signature = self._Response.find("Signature")
digest_value = base64.b64decode(Signature.findtext("SignedInfo/Reference/DigestValue"))
if digest_value != response_hash:
raise InvalidLicense("Digest mismatch in license")
signing_leaf_cert = self.signing_certificate_chain.get(0)
signing_key_bytes = signing_leaf_cert.get_key_by_usage(BCertKeyUsage.SIGN_RESPONSE)
signing_key = ECC.construct(
point_x=int.from_bytes(signing_key_bytes[:32], "big"),
point_y=int.from_bytes(signing_key_bytes[32:], "big"),
curve="P-256"
)
ET.register_namespace("", "http://www.w3.org/2000/09/xmldsig#")
signed_info_xml = ET.tostring(self._find_element_raw("SignedInfo"), short_empty_elements=False)
signature_value = base64.b64decode(Signature.findtext("SignatureValue"))
if not Crypto.ecc256_verify(signing_key, signed_info_xml, signature_value):
raise InvalidLicense("Signature mismatch in license")
return True

View File

@ -1,7 +1,7 @@
from ecpy.curves import Point, Curve
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.crypto.elgamal import ElGamal
from pyplayready.system.util import Util
class XmlKey:
@ -14,7 +14,7 @@ class XmlKey:
self.shared_key_x = self._shared_point.key.pointQ.x
self.shared_key_y = self._shared_point.key.pointQ.y
self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
self._shared_key_x_bytes = Util.to_bytes(int(self.shared_key_x))
self.aes_iv = self._shared_key_x_bytes[:16]
self.aes_key = self._shared_key_x_bytes[16:]

View File

@ -2,12 +2,18 @@ from __future__ import annotations
import base64
from enum import IntEnum
from typing import Union
from typing import Union, Tuple
from uuid import UUID
from Crypto.Cipher import AES
from Crypto.Hash import CMAC
from Crypto.Util.strxor import strxor
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
from pyplayready import ECCKey, Crypto
from pyplayready.license.key import Key
from pyplayready.misc.exceptions import InvalidXmrLicense
class XMRObjectTypes(IntEnum):
INVALID = 0x0000
@ -300,6 +306,11 @@ class _XMRLicenseStructs:
class XMRLicense(_XMRLicenseStructs):
"""Represents an XMRLicense"""
MagicConstantZero = bytes([
0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f,
0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb
])
def __init__(
self,
parsed_license: Container,
@ -336,8 +347,64 @@ class XMRLicense(_XMRLicenseStructs):
if container.type == type_:
yield container.data
def get_content_keys(self):
yield from self.get_object(XMRObjectTypes.CONTENT_KEY_OBJECT)
def get_device_key_obj(self) -> Container:
return next(self.get_object(XMRObjectTypes.ECC_DEVICE_KEY_OBJECT), None)
def get_content_key_obj(self) -> Container:
return next(self.get_object(XMRObjectTypes.CONTENT_KEY_OBJECT), None)
def is_scalable(self) -> bool:
return bool(next(self.get_object(XMRObjectTypes.AUX_KEY_OBJECT), None))
def get_content_key(self, encryption_key: ECCKey) -> Key:
ecc_key = self.get_device_key_obj()
if ecc_key is None:
raise InvalidXmrLicense("No ECC public key in license")
if ecc_key.key != encryption_key.public_bytes():
raise InvalidXmrLicense("Public encryption key does not match")
content_key = self.get_content_key_obj()
cipher_type = Key.CipherType(content_key.cipher_type)
if cipher_type not in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC):
raise InvalidXmrLicense(f"Invalid cipher type {cipher_type}")
via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC
decrypted = Crypto.ecc256_decrypt(encryption_key, content_key.encrypted_key)
ci, ck = decrypted[:16], decrypted[16:]
if self.is_scalable():
ci, ck = decrypted[::2][:16], decrypted[1::2][:16]
if via_symmetric:
embedded_root_license = content_key.encrypted_key[:144]
embedded_leaf_license = content_key.encrypted_key[144:]
rgb_key = strxor(ck, self.MagicConstantZero)
content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
aux_key = next(self.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"]
uplink_x_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key)
secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:])
embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:]
if not self.check_signature(ci):
raise InvalidXmrLicense("License integrity signature does not match")
return Key(
key_id=UUID(bytes_le=content_key.key_id),
key_type=content_key.key_type,
cipher_type=content_key.cipher_type,
key_length=content_key.key_length,
key=ck
)
def check_signature(self, integrity_key: bytes) -> bool:
cmac = CMAC.new(integrity_key, ciphermod=AES)

View File

@ -7,12 +7,13 @@ import click
import requests
from Crypto.Random import get_random_bytes
from pyplayready import __version__, InvalidCertificateChain, InvalidLicense
from pyplayready import __version__, InvalidCertificateChain, InvalidXmrLicense
from pyplayready.cdm import Cdm
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.crypto.key_wrap import unwrap_wrapped_key
from pyplayready.device import Device
from pyplayready.misc.exceptions import OutdatedDevice
from pyplayready.misc.revocation_list import RevocationList
from pyplayready.system.bcert import CertificateChain, Certificate, BCertCertType, BCertObjType, BCertFeatures, \
BCertKeyType, BCertKeyUsage
from pyplayready.system.pssh import PSSH
@ -58,7 +59,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
session_id = cdm.open()
log.info("Opened Session")
challenge = cdm.get_license_challenge(session_id, pssh.wrm_headers[0])
challenge = cdm.get_license_challenge(session_id, pssh.wrm_headers[0], rev_lists=RevocationList.SupportedListIds)
log.info("Created License Request (Challenge)")
log.debug(challenge)
@ -79,7 +80,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
try:
cdm.parse_license(session_id, licence)
except InvalidLicense as e:
except InvalidXmrLicense as e:
log.error(e)
return
@ -100,7 +101,8 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> None:
"""
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
https://testweb.playready.microsoft.com/Content/Content2X
https://learn.microsoft.com/en-us/playready/advanced/testcontent/playready-2x-test-content#tears-of-steel---4k-content
+ DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
@ -175,7 +177,7 @@ def create_device(
raise InvalidCertificateChain("Device has already been provisioned")
if certificate_chain.get(0).get_type() != BCertCertType.ISSUER:
raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate if type DEVICE")
raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate of type DEVICE")
if not certificate_chain.get(0).contains_public_key(group_key):
raise InvalidCertificateChain("Group key does not match this certificate")
@ -411,23 +413,18 @@ def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -
for i in range(chai.count()):
cert = chai.get(i)
log.info(f" Certificate {i}:")
log.info(f" + Certificate {i}:")
basic_info = cert.get_attribute(BCertObjType.BASIC)
if basic_info:
log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}")
log.info(f" + Security Level: SL{basic_info.attribute.security_level}")
log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}")
log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}")
log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}")
log.info(f" + Security Level: SL{basic_info.attribute.security_level}")
log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}")
log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}")
manufacturer_info = cert.get_attribute(BCertObjType.MANUFACTURER)
if manufacturer_info:
manu_attr = manufacturer_info.attribute
def un_pad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
log.info( f" + Name: {un_pad(manu_attr.manufacturer_name)} {un_pad(manu_attr.model_name)} {un_pad(manu_attr.model_number)}")
model_name = cert.get_name()
if model_name:
log.info( f" + Name: {model_name}")
feature_info = cert.get_attribute(BCertObjType.FEATURE)
if feature_info and feature_info.attribute.feature_count > 0:
@ -435,22 +432,22 @@ def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -
lambda x: BCertFeatures(x).name,
feature_info.attribute.features
))
log.info(f" + Features: {', '.join(features)}")
log.info(f" + Features: {', '.join(features)}")
key_info = cert.get_attribute(BCertObjType.KEY)
if key_info and key_info.attribute.key_count > 0:
key_attr = key_info.attribute
log.info(f" + Cert Keys:")
log.info(f" + Cert Keys:")
for idx, key in enumerate(key_attr.cert_keys):
log.info(f" + Key {idx}:")
log.info(f" + Type: {BCertKeyType(key.type).name}")
log.info(f" + Key Length: {key.length} bits")
log.info(f" + Key {idx}:")
log.info(f" + Type: {BCertKeyType(key.type).name}")
log.info(f" + Key Length: {key.length} bits")
usages = list(map(
lambda x: BCertKeyUsage(x).name,
key.usages
))
if len(usages) > 0:
log.info(f" + Usages: {', '.join(usages)}")
log.info(f" + Usages: {', '.join(usages)}")
return None

View File

@ -1,5 +1,3 @@
from pyplayready.misc.drmresults import DrmResult
class PyPlayreadyException(Exception):
"""Exceptions used by pyplayready."""
@ -12,10 +10,22 @@ class InvalidSession(PyPlayreadyException):
"""No Session is open with the specified identifier."""
class InvalidSoapMessage(PyPlayreadyException):
"""The Soap Message is invalid or empty."""
class InvalidPssh(PyPlayreadyException):
"""The Playready PSSH is invalid or empty."""
class InvalidWrmHeader(PyPlayreadyException):
"""The Playready WRMHEADER is invalid or empty."""
class InvalidChecksum(PyPlayreadyException):
"""The Playready WRMHEADER key ID checksum is invalid or empty."""
class InvalidInitData(PyPlayreadyException):
"""The Playready Cenc Header Data is invalid or empty."""
@ -24,10 +34,14 @@ class DeviceMismatch(PyPlayreadyException):
"""The Remote CDMs Device information and the APIs Device information did not match."""
class InvalidLicense(PyPlayreadyException):
class InvalidXmrLicense(PyPlayreadyException):
"""Unable to parse XMR License."""
class InvalidLicense(PyPlayreadyException):
"""Unable to parse License XML."""
class InvalidCertificate(PyPlayreadyException):
"""The BCert is not correctly formatted."""
@ -41,4 +55,8 @@ class OutdatedDevice(PyPlayreadyException):
class ServerException(PyPlayreadyException):
"""Recasted on the client if found in license response."""
"""Re-casted on the client if found in license response."""
class InvalidRevocationList(PyPlayreadyException):
"""The RevocationList is not correctly formatted."""

View File

@ -0,0 +1,499 @@
from __future__ import annotations
import base64
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Union, Optional, Iterator
from uuid import UUID
from Crypto.PublicKey import ECC
from construct import Struct, Bytes, Switch, Int64ul, Int64ub, Int32ub, \
Int16ub, Int8ub, Array, this, Adapter, OneOf, If, Container, Select, GreedyBytes, String
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from ecpy import curve_defs
from ecpy.curve_defs import WEIERSTRASS
from ecpy.curves import Curve, Point
from ecpy.ecdsa import ECDSA
from ecpy.keys import ECPublicKey
import xml.etree.ElementTree as ET
from pyplayready import Crypto
from pyplayready.misc.exceptions import InvalidRevocationList
from pyplayready.system.bcert import CertificateChain, BCertCertType, BCertKeyUsage
from pyplayready.system.util import Util
class FileTime(Adapter):
EPOCH_AS_FILETIME = 116444736000000000
HUNDREDS_OF_NANOSECONDS = 10_000_000
def _decode(self, obj, context):
timestamp = (obj - self.EPOCH_AS_FILETIME) / self.HUNDREDS_OF_NANOSECONDS
return datetime.fromtimestamp(timestamp, timezone.utc)
def _encode(self, obj, context):
return self.EPOCH_AS_FILETIME + int(obj.timestamp() * self.HUNDREDS_OF_NANOSECONDS)
class UUIDLe(Adapter):
def _decode(self, obj, context):
return UUID(bytes_le=obj)
def _encode(self, obj, context):
return obj.bytes_le
class _RevocationStructs:
BRevInfoData = Struct(
"magic" / OneOf(Int32ub, [0x524C5649, 0x524C5632]), # RLVI / RLV2
"length" / Int32ub,
"format_version" / Int8ub,
"reserved" / Bytes(3),
"sequence_number" / Int32ub, # what's this?
"issued_time" / Switch(lambda ctx: ctx.magic, {
0x524C5649: FileTime(Int64ul),
0x524C5632: FileTime(Int64ub),
}),
"record_count" / Int32ub,
"records" / Array(this.record_count, Struct(
"list_id" / UUIDLe(Bytes(16)),
"version" / Int64ub
))
)
BRevInfoSigned = Struct(
"data" / BRevInfoData,
"signature_type" / Int8ub,
"signature_size" / Switch(lambda ctx: ctx.signature_type, {
1: 128,
2: Int16ub
}),
"signature" / Bytes(this.signature_size),
"certificate_chain_length" / If(this.signature_type == 1, Int32ub),
"certificate_chain" / Select(CertificateChain.BCertChain, GreedyBytes)
)
BPrRLData = Struct(
"id" / Bytes(16),
"version" / Int32ub,
"entry_count" / Int32ub,
"revocation_entries" / Array(this.entry_count, Bytes(32)),
)
BPrRLSigned = Struct(
"data" / BPrRLData,
"signature_type" / Int8ub,
"signature_length" / Int16ub,
"signature" / Bytes(this.signature_length),
"certificate_chain" / Select(CertificateChain.BCertChain, GreedyBytes)
)
WMDRMNETData = Struct(
"version" / Int32ub,
"entry_count" / Int32ub,
"revocation_entries" / Array(this.entry_count, Bytes(20)),
"certificate_chain_length" / Int32ub,
"certificate_chain" / String(this.certificate_chain_length),
)
WMDRMNETSigned = Struct(
"data" / WMDRMNETData,
"signature_type" / Int8ub,
"signature_length" / Int16ub,
"signature" / Bytes(this.signature_length)
)
class RevocationList(_RevocationStructs):
class ListID:
# Rev Info
REV_INFO = UUID("CCDE5A55-A688-4405-A88B-D13F90D5BA3E") # VVrezIimBUSoi9E/kNW6Pg==
REV_INFO_V2 = UUID("52D1FF11-D388-4EDD-82B7-68EA4C20A16C") # Ef/RUojT3U6Ct2jqTCChbA==
# PlayReady Revocation List
PLAYREADY_RUNTIME = UUID("4E9D8C8A-B652-45A7-9791-6925A6B4791F") # ioydTlK2p0WXkWklprR5Hw==
PLAYREADY_APPLICATION = UUID("28082E80-C7A3-40B1-8256-19E5B6D89B27") # gC4IKKPHsUCCVhnlttibJw==
# WMDRMNET Revocation List (deprecated: "LegacyXMLCert")
WMDRMNET = UUID("CD75E604-543D-4A9C-9F09-FE6D24E8BF90") # BOZ1zT1UnEqfCf5tJOi/kA==
# WMDRM Device Revocation List
DEVICE_REVOCATION = UUID("3129E375-CEB0-47D5-9CCA-9DB74CFD4332") # deMpMbDO1Uecyp23TP1DMg==
# App Revocation List
APP_REVOCATION = UUID("90A37313-0ECF-4CAA-A906-B188F6129300") # E3OjkM8OqkypBrGI9hKTAA==
SupportedListIds = [ListID.PLAYREADY_RUNTIME, ListID.PLAYREADY_APPLICATION, ListID.REV_INFO_V2, ListID.WMDRMNET]
RevocationDataPubKeyAllowList = [
bytes([
0x3F, 0x3C, 0x09, 0x41, 0xB3, 0xE2, 0x45, 0xC4, 0xF0, 0x55, 0x32, 0xF1, 0x00, 0x40, 0xAA, 0x48,
0xFD, 0x2A, 0xC8, 0x44, 0x23, 0x68, 0x2D, 0xBF, 0x45, 0xFE, 0x2A, 0x65, 0xFF, 0x4E, 0xFF, 0x3A,
0x60, 0xC4, 0x2A, 0x71, 0x38, 0x61, 0xA3, 0xA7, 0xBC, 0x89, 0xB3, 0xE7, 0xB9, 0xA4, 0xF4, 0xAA,
0xA2, 0x8B, 0xA8, 0xCE, 0xE6, 0x89, 0xBA, 0x8D, 0xF7, 0xB0, 0x1B, 0x6A, 0x79, 0xC7, 0xDC, 0x93,
])
]
pubkeyWMDRMNDRevocation = bytes([
0x17, 0xab, 0x8d, 0x43, 0xe6, 0x47, 0xef, 0xba, 0xbd, 0x23,
0x44, 0x66, 0x9f, 0x64, 0x04, 0x84, 0xf8, 0xe7, 0x71, 0x39,
0xc7, 0x07, 0x36, 0x25, 0x5d, 0xa6, 0x5f, 0xba, 0xb9, 0x00,
0xef, 0x9c, 0x89, 0x6b, 0xf2, 0xc4, 0x81, 0x1d, 0xa2, 0x12
])
CurrentRevListStorageName = "RevInfo_Current.xml"
def __init__(self, parsed): # List[Tuple[UUID, Container]]
self.parsed = parsed
@staticmethod
def _verify_crl_signatures(crl: Container, data_struct) -> None:
if isinstance(crl.certificate_chain, bytes) and len(crl.certificate_chain) == 64:
# TODO: untested, since RLVI is deprecated
if crl.certificate_chain not in RevocationList.RevocationDataPubKeyAllowList:
raise InvalidRevocationList("Unallowed revocation list signing public key")
signing_pub_key = crl.certificate_chain
else:
signing_cert = CertificateChain(crl.certificate_chain)
signing_cert.verify_chain(
check_expiry=True,
cert_type=BCertCertType.CRL_SIGNER
)
leaf_signing_cert = signing_cert.get(0)
signing_pub_key = leaf_signing_cert.get_key_by_usage(BCertKeyUsage.SIGN_CRL)
signing_key = ECC.construct(
curve='P-256',
point_x=int.from_bytes(signing_pub_key[:32]),
point_y=int.from_bytes(signing_pub_key[32:])
)
sign_payload = data_struct.build(crl.data)
if not Crypto.ecc256_verify(
public_key=signing_key,
data=sign_payload,
signature=crl.signature
):
raise InvalidRevocationList("Revocation List signature is not authentic")
@staticmethod
def _verify_wmdrmnet_wrap_signature(xml: str) -> bool:
self = RevocationList
# Microsoft's ECC1 curve
msdrm_ecc1_params = {
'name': "msdrm-ecc1",
'type': WEIERSTRASS,
'size': 160,
'field': 0x89abcdef012345672718281831415926141424f7, # q
'generator': (0x8723947fd6a3a1e53510c07dba38daf0109fa120, # gen x
0x445744911075522d8c3c5856d4ed7acda379936f), # gen y
'order': 0x89abcdef012345672716b26eec14904428c2a675, # n
'cofactor': 0x1,
'a': 0x37a5abccd277bce87632ff3d4780c009ebe41497, # a
'b': 0x0dd8dabf725e2f3228e85f1ad78fdedf9328239e, # b
}
curve_defs.curves.append(msdrm_ecc1_params)
msdrm_ecc1 = Curve.get_curve("msdrm-ecc1")
public_point = Point(
x=int.from_bytes(self.pubkeyWMDRMNDRevocation[:20], "little"),
y=int.from_bytes(self.pubkeyWMDRMNDRevocation[20:], "little"),
curve=msdrm_ecc1,
check=True
)
public_key = ECPublicKey(public_point)
root = ET.fromstring(f"<root>{xml}</root>")
signature_value_element = root.find("SIGNATURE/VALUE")
if signature_value_element is None:
raise InvalidRevocationList("No SIGNATURE VALUE found in WMDRMNET revocation wrap")
signature_value = base64.b64decode(signature_value_element.text)
if len(signature_value) != 40:
raise InvalidRevocationList("Invalid WMDRMNET revocation wrap SIGNATURE length")
r = int.from_bytes(signature_value[:20], "little")
s = int.from_bytes(signature_value[20:], "little")
if not r < msdrm_ecc1_params["order"] or not s < msdrm_ecc1_params["order"]:
raise InvalidRevocationList("Invalid WMDRMNET revocation wrap SIGNATURE")
data_element = root.find("DATA")
if data_element is None:
raise InvalidRevocationList("No DATA element found in WMDRMNET revocation wrap")
# <VALUE> contents = ["<DATA>" | <DATA> element contents | "</DATA>"]
data_bytes = ET.tostring(data_element, encoding="utf-8")
data_digest = hashlib.sha1(data_bytes).digest()
signer = ECDSA("ITUPLE")
authentic = signer.verify(
data_digest,
(r, s),
public_key
)
# Windows Media DRM (WMDRM) Signature verification:
# Signature values (r, s) and the public key are both little-endian and loaded directly as
# integers. (We're also not using Montgomery but that shouldn't change anything)
# Despite loading them correctly and all checks on the pubkey and (r, s) being valid,
# signature verification still fails and (TODO) I DON'T KNOW WHY
# Useful sources:
# http://bearcave.com/misl/misl_tech/msdrm/technical.html
# https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-drmcd/36aabf50-a6be-4eb2-8f36-e1879eb54585
# https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-drm/76e5839c-5395-447a-a48c-3ab724c972a3
return True
@staticmethod
def _unwrap_wmdrmnet_list(xml: str) -> bytes:
root = ET.fromstring(f"<root>{xml}</root>")
data_template = root.findtext("DATA/TEMPLATE")
if not data_template:
raise InvalidRevocationList("No DATA/TEMPLATE found in WMDRMNET revocation wrap")
return base64.b64decode(data_template)
@staticmethod
def _verify_prnd_certificate(data: str) -> None:
root = ET.fromstring(data)
ET.register_namespace("c", "http://schemas.microsoft.com/DRM/2004/02/cert")
ET.register_namespace("", "http://www.w3.org/2000/09/xmldsig#")
_ns = {"c": "http://schemas.microsoft.com/DRM/2004/02/cert"}
for cert in root.findall("c:Certificate", _ns):
data_elem = cert.find("c:Data", _ns)
if data_elem is None:
raise InvalidRevocationList("Missing Data")
data_xml = ET.tostring(data_elem)
Util.remove_namespaces(cert)
digest_val = cert.findtext("Signature/SignedInfo/Reference/DigestValue")
if not digest_val:
raise InvalidRevocationList("Missing DigestValue")
digest_calc = base64.b64encode(hashlib.sha1(data_xml).digest()).decode()
if digest_val != digest_calc:
raise InvalidRevocationList("Digest mismatch")
rsa_key_value = cert.find("Signature/KeyInfo/KeyValue/RSAKeyValue")
mod_b64 = rsa_key_value.findtext("Modulus")
exp_b64 = rsa_key_value.findtext("Exponent")
if not mod_b64 or not exp_b64:
raise InvalidRevocationList("Missing Modulus/Exponent")
modulus_int = int.from_bytes(base64.b64decode(mod_b64), "big")
exp_raw = bytearray(base64.b64decode(exp_b64))
exp_bytes = (b"\x00" + exp_raw[::-1])
exponent_int = int.from_bytes(exp_bytes, "big")
pub_key = rsa.RSAPublicNumbers(exponent_int, modulus_int).public_key(default_backend())
sig_b64 = cert.findtext("Signature/SignatureValue")
if not sig_b64:
raise InvalidRevocationList("Missing SignatureValue")
sig_bytes = base64.b64decode(sig_b64)
pub_key.verify(
signature=sig_bytes,
data=data_xml,
padding=padding.PSS(padding.MGF1(hashes.SHA1()), padding.PSS.MAX_LENGTH),
algorithm=hashes.SHA1()
)
@staticmethod
def _get_wmdrmnet_crl_keys(data: str) -> Iterator[rsa.RSAPublicKey]:
root = ET.fromstring(data.encode())
Util.remove_namespaces(root)
for cert in root.findall('Certificate'):
data_elem = cert.find("Data")
if data_elem is None:
continue
key_usage_elem = data_elem.findtext('KeyUsage/SignCRL')
if key_usage_elem != "1":
continue
rsa_elem = data_elem.find('PublicKey/KeyValue/RSAKeyValue')
if rsa_elem is None:
continue
modulus_b64 = rsa_elem.findtext('Modulus')
exponent_b64 = rsa_elem.findtext('Exponent')
if not modulus_b64 or not exponent_b64:
continue
modulus = int.from_bytes(base64.b64decode(modulus_b64), 'big')
exponent = int.from_bytes(base64.b64decode(exponent_b64), 'big')
yield rsa.RSAPublicNumbers(exponent, modulus).public_key()
return None
@staticmethod
def _parse_list(list_id: UUID, data: bytes):
self = RevocationList
if list_id in (self.ListID.REV_INFO, self.ListID.REV_INFO_V2):
rev_info = self.BRevInfoSigned.parse(data)
self._verify_crl_signatures(rev_info, self.BRevInfoData)
return list_id, rev_info
elif list_id in (self.ListID.PLAYREADY_RUNTIME, self.ListID.PLAYREADY_APPLICATION):
pr_rl = self.BPrRLSigned.parse(data)
self._verify_crl_signatures(pr_rl, self.BPrRLData)
return list_id, pr_rl
elif list_id == self.ListID.WMDRMNET:
try:
xml = data.decode("utf-16-le")
if "<DATA>" in xml:
if not self._verify_wmdrmnet_wrap_signature(xml):
raise InvalidRevocationList("WMDRMNET wrap signature is not authentic")
wmdrmnet_data = self._unwrap_wmdrmnet_list(xml)
else:
raise InvalidRevocationList("WMDRMNET revocation list cannot be valid UTF-16-LE and not be wrapped")
except UnicodeDecodeError:
wmdrmnet_data = base64.b64decode(data)
wmdrmnet_parsed = self.WMDRMNETSigned.parse(wmdrmnet_data)
certificate_chain = wmdrmnet_parsed.data.certificate_chain.decode()
self._verify_prnd_certificate(certificate_chain)
crl_pub_key = next(self._get_wmdrmnet_crl_keys(certificate_chain), None)
crl_pub_key.verify(
signature=wmdrmnet_parsed.signature,
data=self.WMDRMNETData.build(wmdrmnet_parsed.data),
padding=padding.PSS(padding.MGF1(hashes.SHA1()), padding.PSS.MAX_LENGTH),
algorithm=hashes.SHA1()
)
return list_id, wmdrmnet_parsed
# TODO: DEVICE_REVOCATION, APP_REVOCATION
return None
@staticmethod
def _remove_utf8_bom(data: bytes) -> bytes:
# https://en.wikipedia.org/wiki/Byte_order_mark#UTF-8
if data[:3] == b"\xEF\xBB\xBF":
return data[3:]
return data
@staticmethod
def _verify_and_parse(revocation):
list_id = revocation.find("ListID")
if list_id is None or not list_id.text:
raise InvalidRevocationList(f"<ListID> is either missing or empty")
list_id_uuid = UUID(bytes_le=base64.b64decode(list_id.text))
list_data = revocation.find("ListData")
if list_data is None or not list_data.text:
raise InvalidRevocationList(f"<ListData> is either missing or empty")
return RevocationList._parse_list(list_id_uuid, base64.b64decode(list_data.text))
@classmethod
def loads(cls, data: Union[str, bytes, ET.Element]) -> RevocationList:
if isinstance(data, str):
data = data.encode()
if isinstance(data, bytes):
root = ET.fromstring(cls._remove_utf8_bom(data))
else:
root = data
if root.tag != "RevInfo":
raise InvalidRevocationList("Root element is not <RevInfo>")
revocations = root.findall("Revocation")
return cls(list(map(
cls._verify_and_parse,
revocations
)))
@classmethod
def load(cls, path: Union[Path, str]) -> RevocationList:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
@staticmethod
def merge(root: ET.Element, root2: ET.Element) -> ET.Element:
if root.tag != "RevInfo" or root2.tag != "RevInfo":
raise InvalidRevocationList("Root element is not <RevInfo>")
revocation = root.findall("Revocation")
def _get_version(parsed):
if parsed[0] in (RevocationList.ListID.REV_INFO, RevocationList.ListID.REV_INFO_V2):
return parsed[1].data.sequence_number
return parsed[1].data.version
def find_in_revs(list_id: UUID):
for rev in revocation:
parsed_rev = RevocationList._verify_and_parse(rev)
if parsed_rev[0] == list_id:
return rev, _get_version(parsed_rev)
return None, None
for revocation2 in root2.findall("Revocation"):
parsed_rev2 = RevocationList._verify_and_parse(revocation2)
rev_find, version = find_in_revs(parsed_rev2[0])
if rev_find is None:
root.append(revocation2)
else:
if _get_version(parsed_rev2) > version:
rev_find.find("ListData").text = revocation2.find("ListData").text
return root
def get_by_id(self, uuid: UUID) -> Optional[Container]:
for rev_list in self.parsed:
if rev_list[0] == uuid:
return rev_list[1]
return None
def get_storage_file_name(self):
rev_list = self.get_by_id(self.ListID.REV_INFO_V2)
list_name = "RevInfo2"
if rev_list is None:
rev_list = self.get_by_id(self.ListID.REV_INFO)
list_name = "RevInfo"
if rev_list is None:
raise InvalidRevocationList("No RevInfo available")
list_version = rev_list.data.sequence_number
list_date = rev_list.data.issued_time.strftime("%Y%m%d")
return f"{list_name}v{list_version}_{list_date}.xml"

View File

@ -0,0 +1,92 @@
from __future__ import annotations
import html
import xml.etree.ElementTree as ET
from typing import Union, Optional
from pyplayready.misc.drmresults import DrmResult
from pyplayready.misc.exceptions import InvalidSoapMessage, ServerException
class SoapMessage:
XML_DECLARATION = '<?xml version="1.0" encoding="utf-8"?>'
_NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"envelope": "http://www.w3.org/2003/05/soap-envelope"
}
def __init__(self, root: ET.Element):
self.root = root
@classmethod
def create(cls, message: ET.Element) -> SoapMessage:
Envelope = ET.Element("soap:Envelope", {
"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
"xmlns:xsd": "http://www.w3.org/2001/XMLSchema",
"xmlns:soap": "http://schemas.xmlsoap.org/soap/envelope/"
})
Body = ET.SubElement(Envelope, "soap:Body")
Body.append(message)
return cls(Envelope)
@classmethod
def loads(cls, data: Union[str, bytes]) -> SoapMessage:
if not data:
raise InvalidSoapMessage("Data must not be empty")
if isinstance(data, str):
data = data.encode()
parser = ET.XMLParser(encoding="utf-8")
root = ET.fromstring(data, parser=parser)
if not root.tag.endswith("Envelope"):
raise InvalidSoapMessage("Soap Message root must be Envelope")
return cls(root)
def get_message(self) -> Optional[ET.Element]:
Body = self.root.find("soap:Body", self._NS) or self.root.find("envelope:Body", self._NS)
if Body is None:
return None
if len(list(Body)) == 0:
return None
return Body[0]
@staticmethod
def read_namespace(element) -> Optional[str]:
if element.tag.startswith("{"):
return element.tag.split("}")[0][1:]
return None
def raise_faults(self):
fault = self.get_message()
if not fault.tag.endswith("Fault"):
return
nsmap = {"soap": self.read_namespace(fault)}
status_code = fault.findtext("detail/Exception/StatusCode")
drm_result = DrmResult.from_code(status_code) if status_code is not None else None
fault_text = fault.findtext("faultstring") or fault.findtext("soap:Reason/soap:Text", namespaces=nsmap)
error_message = fault_text or getattr(drm_result, "message", "(No message)")
exception_message = (f"[{drm_result.name}] " if drm_result else "") + error_message
raise ServerException(exception_message)
def dumps(self) -> str:
xml_data = ET.tostring(
self.root,
short_empty_elements=False,
encoding="utf-8"
)
# this shouldn't exist
return self.XML_DECLARATION + html.unescape(xml_data.decode())

View File

@ -0,0 +1,35 @@
import os
from pathlib import Path
from typing import Optional
from platformdirs import user_data_dir
class Storage:
@staticmethod
def _get_initialized_path() -> Path:
storage_path = Path(user_data_dir("pyplayready", "DevLARLEY"))
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
@staticmethod
def write_file(file_name: str, data: bytes) -> bool:
storage_path = Storage._get_initialized_path()
storage_file = storage_path / file_name
new_file = not storage_file.exists()
storage_file.write_bytes(data)
return new_file
@staticmethod
def read_file(file_name: str) -> Optional[bytes]:
storage_path = Storage._get_initialized_path()
storage_file = storage_path / file_name
if not storage_file.exists():
return None
return storage_file.read_bytes()

View File

@ -1,15 +1,15 @@
from __future__ import annotations
import logging
from typing import Union
from typing import Union, Optional, List
from uuid import UUID
import requests
from pyplayready import InvalidLicense
from pyplayready import InvalidXmrLicense
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.license.key import Key
from pyplayready.misc.exceptions import (DeviceMismatch, InvalidInitData)
from pyplayready.system.wrmheader import WRMHeader
@ -95,19 +95,22 @@ class RemoteCdm(Cdm):
if response.status_code != 200:
raise ValueError(f"Cannot Close CDM Session, {response_json['message']} [{response.status_code}]")
def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str]) -> str:
def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str], rev_lists: Optional[List[UUID]]=None) -> str:
if not wrm_header:
raise InvalidInitData("A wrm_header must be provided.")
if isinstance(wrm_header, WRMHeader):
wrm_header = wrm_header.dumps()
if not isinstance(wrm_header, str):
raise ValueError(f"Expected WRMHeader to be a {str} or {WRMHeader} not {wrm_header!r}")
if rev_lists and not isinstance(rev_lists, list):
raise ValueError(f"Expected rev_lists to be a {list} not {rev_lists!r}")
response = self.__session.post(
url=f"{self.host}/{self.device_name}/get_license_challenge",
json={
"session_id": session_id.hex(),
"init_data": wrm_header
"init_data": wrm_header,
**({"rev_lists": list(map(str, rev_lists))} if rev_lists else {})
}
)
response_json = response.json()
@ -119,10 +122,10 @@ class RemoteCdm(Cdm):
def parse_license(self, session_id: bytes, license_message: str) -> None:
if not license_message:
raise InvalidLicense("Cannot parse an empty license_message")
raise InvalidXmrLicense("Cannot parse an empty license_message")
if not isinstance(license_message, str):
raise InvalidLicense(f"Expected license_message to be a {str}, not {license_message!r}")
raise InvalidXmrLicense(f"Expected license_message to be a {str}, not {license_message!r}")
response = self.__session.post(
url=f"{self.host}/{self.device_name}/parse_license",
@ -158,6 +161,3 @@ class RemoteCdm(Cdm):
)
for key in response_json["data"]["keys"]
]
__all__ = ("RemoteCdm",)

View File

@ -1,5 +1,6 @@
from pathlib import Path
from typing import Any, Optional, Union
from uuid import UUID
from aiohttp.typedefs import Handler
from aiohttp import web
@ -8,7 +9,7 @@ from pyplayready import __version__, PSSH
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh)
from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidXmrLicense, InvalidPssh)
routes = web.RouteTableDef()
@ -103,6 +104,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
session_id = bytes.fromhex(body["session_id"])
init_data = body["init_data"]
rev_lists = body.get("rev_lists")
if not init_data.startswith("<WRMHEADER"):
try:
@ -116,6 +118,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
license_request = cdm.get_license_challenge(
session_id=session_id,
wrm_header=init_data,
rev_lists=list(map(UUID, rev_lists)) if rev_lists else None
)
except InvalidSession:
return web.json_response({"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}, status=400)
@ -151,7 +154,7 @@ async def parse_license(request: web.Request) -> web.Response:
cdm.parse_license(session_id, license_message)
except InvalidSession:
return web.json_response({"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}, status=400)
except InvalidLicense as e:
except InvalidXmrLicense as e:
return web.json_response({"message": f"Invalid License, {e}"}, status=400)
except Exception as e:
return web.json_response({"message": f"Error, {e}"}, status=500)

View File

@ -14,10 +14,11 @@ from enum import IntEnum
from Crypto.PublicKey import ECC
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer, Embedded
from construct import Int16ub, Array
from construct import Struct, this
from pyplayready.system.util import Util
from pyplayready.crypto import Crypto
from pyplayready.misc.exceptions import InvalidCertificateChain, InvalidCertificate
from pyplayready.crypto.ecc_key import ECCKey
@ -127,6 +128,12 @@ class BCertFeatures(IntEnum):
class _BCertStructs:
Header = Struct(
"flags" / Int16ub,
"tag" / Int16ub,
"length" / Int32ub,
)
BasicInfo = Struct(
"cert_id" / Bytes(16),
"security_level" / Int32ub,
@ -224,10 +231,22 @@ class _BCertStructs:
"signature" / Bytes(this.signature_size)
)
ExtDataHwid = Struct(
"record_length" / Int32ub,
"record_data" / Bytes(this.record_length),
"padding" / Bytes((4 - (this.record_length % 4)) % 4)
)
# defined manually, since refactoring everything is not worth it
ExtDataContainer = Struct(
"record_count" / Int32ub, # always 1
"records" / Array(this.record_count, DataRecord),
"signature" / ExtDataSignature
"record" / Struct(
Embedded(Header),
Embedded(ExtDataHwid)
),
"signature" / Struct(
Embedded(Header),
Embedded(ExtDataSignature)
)
)
# TODO: untested
@ -242,9 +261,7 @@ class _BCertStructs:
)
Attribute = Struct(
"flags" / Int16ub,
"tag" / Int16ub,
"length" / Int32ub,
Embedded(Header),
"attribute" / Switch(
lambda this_: this_.tag,
{
@ -260,8 +277,8 @@ class _BCertStructs:
BCertObjType.METERING: MeteringInfo,
BCertObjType.EXTDATASIGNKEY: ExtDataSignKeyInfo,
BCertObjType.EXTDATACONTAINER: ExtDataContainer,
BCertObjType.EXTDATASIGNATURE: ExtDataSignature,
BCertObjType.EXTDATA_HWID: Bytes(this.length - 8),
# BCertObjType.EXTDATASIGNATURE: ExtDataSignature,
# BCertObjType.EXTDATA_HWID: ExtDataHwid,
BCertObjType.SERVER: ServerInfo,
BCertObjType.SECURITY_VERSION: SecurityVersion,
BCertObjType.SECURITY_VERSION_2: SecurityVersion
@ -462,15 +479,16 @@ class Certificate(_BCertStructs):
return None
def get_name(self) -> Optional[str]:
manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER)
manufacturer_info_attr = self.get_attribute(BCertObjType.MANUFACTURER)
if manufacturer_info:
manufacturer_info_attr = manufacturer_info.attribute
if manufacturer_info_attr:
manufacturer_info = manufacturer_info_attr.attribute
def un_pad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
manufacturer = Util.un_pad(manufacturer_info.manufacturer_name)
model_name = Util.un_pad(manufacturer_info.model_name)
model_number = Util.un_pad(manufacturer_info.model_number)
return f"{un_pad(manufacturer_info_attr.manufacturer_name)} {un_pad(manufacturer_info_attr.model_name)} {un_pad(manufacturer_info_attr.model_number)}"
return f"{manufacturer} {model_name} {model_number}"
return None
@ -524,13 +542,40 @@ class Certificate(_BCertStructs):
def dumps(self) -> bytes:
return self._BCERT.build(self.parsed)
def _verify_extdata_signature(self) -> None:
sign_key = self.get_attribute(BCertObjType.EXTDATASIGNKEY)
if not sign_key:
raise InvalidCertificate("No extdata sign key object found in certificate")
sign_key_bytes = sign_key.attribute.key
signing_key = ECC.construct(
point_x=int.from_bytes(sign_key_bytes[:32], "big"),
point_y=int.from_bytes(sign_key_bytes[32:], "big"),
curve="P-256"
)
extdata = self.get_attribute(BCertObjType.EXTDATACONTAINER)
if not extdata:
raise InvalidCertificate("No extdata container found in certificate")
signature = extdata.attribute.signature.signature
sign_data = _BCertStructs.ExtDataContainer.subcons[0].build(extdata.attribute.record)
if not Crypto.ecc256_verify(
public_key=signing_key,
data=sign_data,
signature=signature
):
raise InvalidCertificate("Signature of certificate extdata is not authentic")
def verify_signature(self) -> None:
signature_object = self.get_attribute(BCertObjType.SIGNATURE)
if not signature_object:
raise InvalidCertificate(f"No signature object found in certificate")
raise InvalidCertificate("No signature object found in certificate")
signature_attribute = signature_object.attribute
raw_signature_key = signature_attribute.signature_key
signature_key = ECC.construct(
@ -546,7 +591,14 @@ class Certificate(_BCertStructs):
data=sign_payload,
signature=signature_attribute.signature
):
raise InvalidCertificate(f"Signature of certificate is not authentic")
raise InvalidCertificate("Signature of certificate is not authentic")
basic_info_attribute = self.get_attribute(BCertObjType.BASIC)
if not basic_info_attribute:
raise InvalidCertificate("No basic info object found in certificate")
if basic_info_attribute.attribute.flags & BCertFlag.EXTDATA_PRESENT == BCertFlag.EXTDATA_PRESENT:
self._verify_extdata_signature()
class CertificateChain(_BCertStructs):

View File

@ -0,0 +1,238 @@
import base64
import hashlib
import html
import time
import xml.etree.ElementTree as ET
from typing import Optional, List
from uuid import UUID
from Crypto.Random import get_random_bytes
from pyplayready import ECCKey, Crypto
from pyplayready.misc.revocation_list import RevocationList
from pyplayready.misc.storage import Storage
from pyplayready.system.bcert import CertificateChain
class XmlBuilder:
@staticmethod
def _ClientInfo(parent: ET.Element, client_version: str) -> ET.Element:
ClientInfo = ET.SubElement(parent, "CLIENTINFO")
ClientVersion = ET.SubElement(ClientInfo, "CLIENTVERSION")
ClientVersion.text = client_version
return ClientVersion
@staticmethod
def _RevListInfo(parent: ET.Element, list_id: UUID, version: int) -> ET.Element:
RevListInfo = ET.SubElement(parent, "RevListInfo")
ListID = ET.SubElement(RevListInfo, "ListID")
ListID.text = base64.b64encode(list_id.bytes_le).decode()
Version = ET.SubElement(RevListInfo, "Version")
Version.text = str(version)
return RevListInfo
@staticmethod
def _RevocationLists(parent: ET.Element, rev_lists: List[UUID]) -> ET.Element:
RevocationLists = ET.SubElement(parent, "RevocationLists")
load_result = Storage.read_file(RevocationList.CurrentRevListStorageName)
if load_result is None:
for rev_list in rev_lists:
XmlBuilder._RevListInfo(RevocationLists, rev_list, 0)
return RevocationLists
loaded_list = RevocationList.loads(load_result)
for list_id, list_data in loaded_list.parsed:
if list_id not in rev_lists:
continue
if list_id == RevocationList.ListID.REV_INFO_V2:
version = list_data.data.sequence_number
else:
version = list_data.data.version
XmlBuilder._RevListInfo(RevocationLists, list_id, version)
return RevocationLists
@staticmethod
def _LicenseAcquisition(
parent: ET.Element,
wrmheader: str,
protocol_version: int,
wrmserver_data: bytes,
client_data: bytes,
client_info: Optional[str] = None,
revocation_lists: Optional[List[UUID]] = None
) -> ET.Element:
LA = ET.SubElement(parent, "LA", {
"xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols",
"Id": "SignedData",
"xml:space": "preserve"
})
Version = ET.SubElement(LA, "Version")
Version.text = str(protocol_version)
ContentHeader = ET.SubElement(LA, "ContentHeader")
ContentHeader.text = wrmheader
if client_info is not None:
XmlBuilder._ClientInfo(LA, client_info)
if revocation_lists is not None:
XmlBuilder._RevocationLists(LA, revocation_lists)
LicenseNonce = ET.SubElement(LA, "LicenseNonce")
LicenseNonce.text = base64.b64encode(get_random_bytes(16)).decode()
ClientTime = ET.SubElement(LA, "ClientTime")
ClientTime.text = str(int(time.time()))
EncryptedData = ET.SubElement(LA, "EncryptedData", {
"xmlns": "http://www.w3.org/2001/04/xmlenc#",
"Type": "http://www.w3.org/2001/04/xmlenc#Element"
})
ET.SubElement(EncryptedData, "EncryptionMethod", {
"Algorithm": "http://www.w3.org/2001/04/xmlenc#aes128-cbc"
})
KeyInfo = ET.SubElement(EncryptedData, "KeyInfo", {
"xmlns": "http://www.w3.org/2000/09/xmldsig#"
})
EncryptedKey = ET.SubElement(KeyInfo, "EncryptedKey", {
"xmlns": "http://www.w3.org/2001/04/xmlenc#"
})
ET.SubElement(EncryptedKey, "EncryptionMethod", {
"Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256"
})
KeyInfoInner = ET.SubElement(EncryptedKey, "KeyInfo", {
"xmlns": "http://www.w3.org/2000/09/xmldsig#"
})
KeyName = ET.SubElement(KeyInfoInner, "KeyName")
KeyName.text = "WMRMServer"
WRMServerData = ET.SubElement(ET.SubElement(EncryptedKey, "CipherData"), "CipherValue")
WRMServerData.text = base64.b64encode(wrmserver_data).decode()
ClientData = ET.SubElement(ET.SubElement(EncryptedData, "CipherData"), "CipherValue")
ClientData.text = base64.b64encode(client_data).decode()
return LA
@staticmethod
def _SignedInfo(parent: ET.Element, digest_value: bytes) -> ET.Element:
SignedInfo = ET.SubElement(parent, "SignedInfo", {
"xmlns": "http://www.w3.org/2000/09/xmldsig#"
})
ET.SubElement(SignedInfo, "CanonicalizationMethod", {
"Algorithm": "http://www.w3.org/TR/2001/REC-xml-c14n-20010315"
})
ET.SubElement(SignedInfo, "SignatureMethod", {
"Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256"
})
Reference = ET.SubElement(SignedInfo, "Reference", {
"URI": "#SignedData"
})
ET.SubElement(Reference, "DigestMethod", {
"Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#sha256"
})
DigestValue = ET.SubElement(Reference, "DigestValue")
DigestValue.text = base64.b64encode(digest_value).decode()
return SignedInfo
@staticmethod
def AcquireLicenseMessage(
wrmheader: str,
protocol_version: int,
wrmserver_data: bytes,
client_data: bytes,
signing_key: ECCKey,
client_info: Optional[str] = None,
revocation_lists: Optional[List[UUID]] = None
) -> ET.Element:
AcquireLicense = ET.Element("AcquireLicense", {
"xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols"
})
Challenge = ET.SubElement(ET.SubElement(AcquireLicense, "challenge"), "Challenge", {
"xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols/messages"
})
LA = XmlBuilder._LicenseAcquisition(Challenge, wrmheader, protocol_version, wrmserver_data, client_data, client_info, revocation_lists)
Signature = ET.SubElement(Challenge, "Signature", {
"xmlns": "http://www.w3.org/2000/09/xmldsig#"
})
la_xml = ET.tostring(
LA,
encoding="utf-8",
short_empty_elements=False
)
# I don't like this but re-serializing the WRMHEADER XML could change it
unescaped_la_xml = html.unescape(la_xml.decode())
la_digest = hashlib.sha256(unescaped_la_xml.encode()).digest()
SignedInfo = XmlBuilder._SignedInfo(Signature, la_digest)
signed_info_xml = ET.tostring(
SignedInfo,
encoding="utf-8",
short_empty_elements=False
)
SignatureValue = ET.SubElement(Signature, "SignatureValue")
SignatureValue.text = base64.b64encode(
Crypto.ecc256_sign(signing_key, signed_info_xml)
).decode()
ECCKeyValue = ET.SubElement(
ET.SubElement(
ET.SubElement(
Signature, "KeyInfo", {
"xmlns": "http://www.w3.org/2000/09/xmldsig#"
}
),
"KeyValue"
), "ECCKeyValue"
)
PublicKey = ET.SubElement(ECCKeyValue, "PublicKey")
PublicKey.text = base64.b64encode(signing_key.public_bytes()).decode()
return AcquireLicense
@staticmethod
def ClientData(cert_chains: List[CertificateChain], ree_features: List[str]) -> str:
Data = ET.Element("Data")
CertificateChains = ET.SubElement(Data, "CertificateChains")
for cert_chain in cert_chains:
CertificateChainElement = ET.SubElement(CertificateChains, "CertificateChain")
CertificateChainElement.text = f" {base64.b64encode(cert_chain.dumps()).decode()} "
Features = ET.SubElement(Data, "Features")
ET.SubElement(Features, "Feature", {"Name": "AESCBC"})
REE = ET.SubElement(Features, "REE")
for ree_feature in ree_features:
ET.SubElement(REE, ree_feature)
return ET.tostring(
Data,
encoding="utf-8",
short_empty_elements=False
).decode()

View File

@ -15,6 +15,3 @@ class Session:
self.signing_key: Optional[ECCKey] = None
self.encryption_key: Optional[ECCKey] = None
self.keys: list[Key] = []
__all__ = ("Session",)

View File

@ -0,0 +1,20 @@
import xml.etree.ElementTree as ET
class Util:
@staticmethod
def remove_namespaces(element: ET.Element) -> None:
for elem in element.iter():
elem.tag = elem.tag.split('}')[-1]
@staticmethod
def un_pad(name: bytes) -> str:
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
@staticmethod
def to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')

View File

@ -1,26 +1,65 @@
import base64
import hashlib
from enum import Enum
from typing import Optional, List, Union, Tuple
from typing import List, Optional, Union
from uuid import UUID
import xml.etree.ElementTree as ET
import xmltodict
from Crypto.Cipher import AES
from pyplayready.misc.exceptions import InvalidWrmHeader, InvalidChecksum
from pyplayready.system.util import Util
class WRMHeader:
"""Represents a PlayReady WRM Header"""
class SignedKeyID:
def __init__(
self,
alg_id: str,
value: str,
checksum: str
):
self.alg_id = alg_id
class AlgId(Enum):
AESCTR = "AESCTR"
AESCBC = "AESCBC"
COCKTAIL = "COCKTAIL"
UNKNOWN = "UNKNOWN"
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
def __init__(self, value: UUID, alg_id, checksum: Optional[bytes]):
self.value = value
self.alg_id = alg_id
self.checksum = checksum
@classmethod
def load(cls, value: str, alg_id: str, checksum: Optional[str]):
return cls(
value=UUID(bytes_le=base64.b64decode(value)),
alg_id=cls.AlgId(alg_id),
checksum=base64.b64decode(checksum) if checksum else None
)
def __repr__(self):
return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")'
return f'SignedKeyID(value="{self.value}", alg_id={self.alg_id}, checksum={self.checksum})'
def verify(self, content_key: bytes) -> bool:
if self.value is None:
raise InvalidChecksum("Key ID must not be empty")
if self.checksum is None:
raise InvalidChecksum("Checksum must not be empty")
if self.alg_id == self.AlgId.AESCTR:
cipher = AES.new(content_key, mode=AES.MODE_ECB)
encrypted = cipher.encrypt(self.value.bytes_le)
checksum = encrypted[:8]
elif self.alg_id == self.AlgId.COCKTAIL:
buffer = content_key.ljust(21, b"\x00")
for _ in range(5):
buffer = hashlib.sha1(buffer).digest()
checksum = buffer[:7]
else:
raise InvalidChecksum("Algorithm ID must be either \"AESCTR\" or \"COCKTAIL\"")
return checksum == self.checksum
class Version(Enum):
VERSION_4_0_0_0 = "4.0.0.0"
@ -33,13 +72,9 @@ class WRMHeader:
def _missing_(cls, value):
return cls.UNKNOWN
_RETURN_STRUCTURE = Optional[Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]]]
def __init__(self, data: Union[str, bytes]):
"""Load a WRM Header from either a string, base64 encoded data or bytes"""
if not data:
raise ValueError("Data must not be empty")
raise InvalidWrmHeader("Data must not be empty")
if isinstance(data, str):
try:
@ -47,101 +82,105 @@ class WRMHeader:
except Exception:
data = data.encode("utf-16-le")
self._raw_data: bytes = data
self._parsed = xmltodict.parse(self._raw_data)
self._raw_data = data
self._root = ET.fromstring(data)
Util.remove_namespaces(self._root)
self._header = self._parsed.get('WRMHEADER')
if not self._header:
raise ValueError("Data is not a valid WRMHEADER")
if self._root.tag != "WRMHEADER":
raise InvalidWrmHeader("Data is not a valid WRMHEADER")
self.version = self.Version(self._header.get('@version'))
self.version = self.Version(self._root.attrib.get("version"))
@staticmethod
def _ensure_list(element: Union[dict, list]) -> List:
if isinstance(element, dict):
return [element]
return element
@staticmethod
def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
return (
[WRMHeader.SignedKeyID(
alg_id=protect_info["ALGID"],
value=data["KID"],
checksum=data.get("CHECKSUM")
)],
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kid = protect_info["KID"]
if kid:
key_ids = [WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
)]
return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
@staticmethod
def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
if kids:
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
@staticmethod
def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid.get("@ALGID"),
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
def read_attributes(self) -> _RETURN_STRUCTURE:
"""Read any non-custom XML attributes"""
data = self._header.get("DATA")
if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
self.key_ids: List[WRMHeader.SignedKeyID] = []
self.la_url: Optional[str] = None
self.lui_url: Optional[str] = None
self.ds_id: Optional[str] = None
self.custom_attributes: Optional[ET.Element] = None
self.decryptor_setup: Optional[str] = None
if self.version == self.Version.VERSION_4_0_0_0:
return self._read_v4_0_0_0(data)
self._load_v4_0_data(self._root)
elif self.version == self.Version.VERSION_4_1_0_0:
return self._read_v4_1_0_0(data)
self._load_v4_1_data(self._root)
elif self.version == self.Version.VERSION_4_2_0_0:
return self._read_v4_2_0_0(data)
self._load_v4_2_data(self._root)
elif self.version == self.Version.VERSION_4_3_0_0:
return self._read_v4_3_0_0(data)
self._load_v4_3_data(self._root)
return None
def __repr__(self):
attrs = ", \n ".join(f"{k}={v!r}" for k, v in self.__dict__.items())
return f"{self.__class__.__name__}({attrs})"
@staticmethod
def _attr(element, name):
return element.attrib.get(name) if element is not None else None
def _load_v4_0_data(self, parent: ET.Element):
Data = parent.find("DATA")
Kid = Data.findtext("KID")
AlgId = Data.findtext("PROTECTINFO/ALGID")
Checksum = Data.findtext("CHECKSUM")
self.key_ids = [self.SignedKeyID.load(Kid, AlgId, Checksum)]
self.la_url = Data.findtext("LA_URL")
self.lui_url = Data.findtext("LUI_URL")
self.ds_id = Data.findtext("DS_ID")
self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
def _load_v4_1_data(self, parent: ET.Element):
Data = parent.find("DATA")
Kid = Data.find("PROTECTINFO/KID")
if Kid is not None:
Value = Kid.get("VALUE")
AlgId = Kid.get("ALGID")
Checksum = Kid.get("CHECKSUM")
self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
self.la_url = Data.findtext("LA_URL")
self.lui_url = Data.findtext("LUI_URL")
self.ds_id = Data.findtext("DS_ID")
self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
def _load_v4_2_data(self, parent: ET.Element):
Data = parent.find("DATA")
for kid in Data.findall("PROTECTINFO/KIDS/KID"):
Value = kid.get("VALUE")
AlgId = kid.get("ALGID")
Checksum = kid.get("CHECKSUM")
self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
self.la_url = Data.findtext("LA_URL")
self.lui_url = Data.findtext("LUI_URL")
self.ds_id = Data.findtext("DS_ID")
self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
def _load_v4_3_data(self, parent: ET.Element):
Data = parent.find("DATA")
for kid in Data.findall("PROTECTINFO/KIDS/KID"):
Value = kid.get("VALUE")
AlgId = kid.get("ALGID")
Checksum = kid.get("CHECKSUM")
self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
self.la_url = Data.findtext("LA_URL")
self.lui_url = Data.findtext("LUI_URL")
self.ds_id = Data.findtext("DS_ID")
self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
def dumps(self) -> str:
return self._raw_data.decode("utf-16-le")

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
version = "0.6.3"
version = "0.8.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"]
@ -36,11 +36,10 @@ pycryptodome = "^3.21.0"
construct = "2.8.8"
ECPy = "^1.2.5"
click = "^8.1.7"
xmltodict = "^0.14.2"
PyYAML = "^6.0.1"
aiohttp = {version = "^3.9.1", optional = true}
aiohttp = "^3.9.1"
cryptography = "^45.0.6"
lxml = "^6.0.0"
platformdirs = "^4.4.0"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"

View File

@ -1,10 +1,9 @@
requests
pycryptodome
ecpy
construct==2.8.8
ecpy
click
PyYAML
aiohttp
xmltodict
cryptography
lxml
platformdirs