From 17c69027e03928260a575404ac00987676c2194f Mon Sep 17 00:00:00 2001
From: larley <121249322+DevLARLEY@users.noreply.github.com>
Date: Sun, 5 Oct 2025 13:25:22 +0200
Subject: [PATCH] + Added support for RevLists + Updated RemoteCdm/Serve to
support RevLists + Added Storage class for RevList persistence + Added
support for ExtData objects in BCerts + signature verification + Added an Xml
Builder class (instead of xmltodict) + Added a SOAP Builder/Parser class
(instead of xmltodict) + Refactored WRMHeader class to use ET (instead of
xmltodict) + Upgraded ServerException detection + Removed dependencies:
xmltodict, lxml + Added Util class + Minor Crypto/ElGamal class changes
---
README.md | 3 +-
pyplayready/__init__.py | 7 +-
pyplayready/cdm.py | 290 +++-------------
pyplayready/crypto/__init__.py | 36 +-
pyplayready/crypto/ecc_key.py | 13 +-
pyplayready/crypto/elgamal.py | 16 +-
pyplayready/license/license.py | 126 +++++++
pyplayready/license/xml_key.py | 4 +-
pyplayready/license/xmrlicense.py | 73 +++-
pyplayready/main.py | 45 ++-
pyplayready/misc/exceptions.py | 26 +-
pyplayready/misc/revocation_list.py | 499 ++++++++++++++++++++++++++++
pyplayready/misc/soap_message.py | 92 +++++
pyplayready/misc/storage.py | 35 ++
pyplayready/remote/remotecdm.py | 20 +-
pyplayready/remote/serve.py | 7 +-
pyplayready/system/bcert.py | 88 ++++-
pyplayready/system/builder.py | 238 +++++++++++++
pyplayready/system/session.py | 3 -
pyplayready/system/util.py | 20 ++
pyplayready/system/wrmheader.py | 241 ++++++++------
pyproject.toml | 7 +-
requirements.txt | 5 +-
23 files changed, 1435 insertions(+), 459 deletions(-)
create mode 100644 pyplayready/license/license.py
create mode 100644 pyplayready/misc/revocation_list.py
create mode 100644 pyplayready/misc/soap_message.py
create mode 100644 pyplayready/misc/storage.py
create mode 100644 pyplayready/system/builder.py
create mode 100644 pyplayready/system/util.py
diff --git a/README.md b/README.md
index b39f685..dacab12 100644
--- a/README.md
+++ b/README.md
@@ -31,6 +31,7 @@ An example code snippet:
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.system.pssh import PSSH
+from pyplayready.misc.revocation_list import RevocationList
import requests
@@ -52,7 +53,7 @@ pssh = PSSH(
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
-request = cdm.get_license_challenge(session_id, pssh.wrm_headers[0])
+request = cdm.get_license_challenge(session_id, pssh.wrm_headers[0], rev_lists=RevocationList.SupportedListIds)
response = requests.post(
url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",
diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py
index 49c7884..546c7b6 100644
--- a/pyplayready/__init__.py
+++ b/pyplayready/__init__.py
@@ -1,6 +1,7 @@
-from pyplayready.cdm import *
from pyplayready.crypto.ecc_key import *
from pyplayready.crypto.elgamal import *
+from pyplayready.crypto import *
+from pyplayready.cdm import *
from pyplayready.device import *
from pyplayready.license.key import *
from pyplayready.license.xml_key import *
@@ -11,6 +12,8 @@ from pyplayready.system.pssh import *
from pyplayready.system.session import *
from pyplayready.misc.drmresults import *
from pyplayready.misc.exceptions import *
+from pyplayready.misc.revocation_list import *
+from pyplayready.misc.storage import *
-__version__ = "0.6.3"
+__version__ = "0.8.0"
diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py
index bfe59b1..fafe5cf 100644
--- a/pyplayready/cdm.py
+++ b/pyplayready/cdm.py
@@ -1,27 +1,23 @@
from __future__ import annotations
-import base64
-import time
+import xml.etree.ElementTree as ET
from typing import List, Union, Optional
from uuid import UUID
-from lxml import etree as ET
-import xmltodict
from Crypto.Cipher import AES
-from Crypto.Hash import SHA256
-from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad
-from Crypto.Util.strxor import strxor
-
from ecpy.curves import Point, Curve
from pyplayready.crypto import Crypto
-from pyplayready.misc.drmresults import DrmResult
-from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.key import Key
-from pyplayready.license.xmrlicense import XMRLicense, XMRObjectTypes
-from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, ServerException)
+from pyplayready.license.license import License
+from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidXmrLicense)
+from pyplayready.misc.revocation_list import RevocationList
+from pyplayready.misc.soap_message import SoapMessage
+from pyplayready.misc.storage import Storage
+from pyplayready.system.bcert import CertificateChain
+from pyplayready.system.builder import XmlBuilder
from pyplayready.system.session import Session
from pyplayready.system.wrmheader import WRMHeader
@@ -29,11 +25,6 @@ from pyplayready.system.wrmheader import WRMHeader
class Cdm:
MAX_NUM_OF_SESSIONS = 16
- MagicConstantZero = bytes([
- 0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f,
- 0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb
- ])
-
def __init__(
self,
security_level: int,
@@ -48,7 +39,6 @@ class Cdm:
self.signing_key = signing_key
self.client_version = client_version
- self.__crypto = Crypto()
self._wmrm_key = Point(
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
@@ -84,30 +74,8 @@ class Cdm:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
del self.__sessions[session_id]
- def _get_key_data(self, session: Session) -> bytes:
- return self.__crypto.ecc256_encrypt(
- public_key=self._wmrm_key,
- plaintext=session.xml_key.get_point()
- )
-
def _get_cipher_data(self, session: Session) -> bytes:
- b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
- body = xmltodict.unparse({
- 'Data': {
- 'CertificateChains': {
- 'CertificateChain': b64_chain
- },
- 'Features': {
- 'Feature': {
- '@Name': 'AESCBC',
- '#text': '""'
- },
- 'REE': {
- 'AESCBCS': None
- }
- }
- }
- }, full_document=False)
+ body = XmlBuilder.ClientData([self.certificate_chain], ["AESCBCS"])
cipher = AES.new(
key=session.xml_key.aes_key,
@@ -122,111 +90,12 @@ class Cdm:
return session.xml_key.aes_iv + ciphertext
- @staticmethod
- def _build_main_body(la_content: dict, signed_info: dict, signature: str, public_signing_key: str) -> dict:
- return {
- 'soap:Envelope': {
- '@xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
- '@xmlns:xsd': 'http://www.w3.org/2001/XMLSchema',
- '@xmlns:soap': 'http://schemas.xmlsoap.org/soap/envelope/',
- 'soap:Body': {
- 'AcquireLicense': {
- '@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols',
- 'challenge': {
- 'Challenge': {
- '@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols/messages',
- 'LA': la_content["LA"],
- 'Signature': {
- 'SignedInfo': signed_info["SignedInfo"],
- '@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
- 'SignatureValue': signature,
- 'KeyInfo': {
- '@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
- 'KeyValue': {
- 'ECCKeyValue': {
- 'PublicKey': public_signing_key
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
-
- def _build_digest_content(
+ def get_license_challenge(
self,
- wrm_header: str,
- nonce: str,
- wmrm_cipher: str,
- cert_cipher: str,
- protocol_version: int
- ) -> dict:
- return {
- 'LA': {
- '@xmlns': 'http://schemas.microsoft.com/DRM/2007/03/protocols',
- '@Id': 'SignedData',
- '@xml:space': 'preserve',
- 'Version': protocol_version,
- 'ContentHeader': xmltodict.parse(wrm_header),
- 'CLIENTINFO': {
- 'CLIENTVERSION': self.client_version
- },
- 'LicenseNonce': nonce,
- 'ClientTime': int(time.time()),
- 'EncryptedData': {
- '@xmlns': 'http://www.w3.org/2001/04/xmlenc#',
- '@Type': 'http://www.w3.org/2001/04/xmlenc#Element',
- 'EncryptionMethod': {
- '@Algorithm': 'http://www.w3.org/2001/04/xmlenc#aes128-cbc'
- },
- 'KeyInfo': {
- '@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
- 'EncryptedKey': {
- '@xmlns': 'http://www.w3.org/2001/04/xmlenc#',
- 'EncryptionMethod': {
- '@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256'
- },
- 'KeyInfo': {
- '@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
- 'KeyName': 'WMRMServer'
- },
- 'CipherData': {
- 'CipherValue': wmrm_cipher
- }
- }
- },
- 'CipherData': {
- 'CipherValue': cert_cipher
- }
- }
- }
- }
-
- @staticmethod
- def _build_signed_info(digest_value: str) -> dict:
- return {
- 'SignedInfo': {
- '@xmlns': 'http://www.w3.org/2000/09/xmldsig#',
- 'CanonicalizationMethod': {
- '@Algorithm': 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315'
- },
- 'SignatureMethod': {
- '@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256'
- },
- 'Reference': {
- '@URI': '#SignedData',
- 'DigestMethod': {
- '@Algorithm': 'http://schemas.microsoft.com/DRM/2007/03/protocols#sha256'
- },
- 'DigestValue': digest_value
- }
- }
- }
-
- def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str]) -> str:
+ session_id: bytes,
+ wrm_header: Union[WRMHeader, str],
+ rev_lists: Optional[List[UUID]]=None # default: RevocationList.SupportedListIds
+ ) -> str:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
@@ -234,7 +103,10 @@ class Cdm:
if isinstance(wrm_header, str):
wrm_header = WRMHeader(wrm_header)
if not isinstance(wrm_header, WRMHeader):
- raise ValueError(f"Expected WRMHeader to be a {str} or {WRMHeader} not {wrm_header!r}")
+ raise ValueError(f"Expected wrm_header to be a {str} or {WRMHeader} not {wrm_header!r}")
+
+ if rev_lists and not isinstance(rev_lists, list):
+ raise ValueError(f"Expected rev_lists to be a {list} not {rev_lists!r}")
match wrm_header.version:
case WRMHeader.Version.VERSION_4_3_0_0:
@@ -247,117 +119,51 @@ class Cdm:
session.signing_key = self.signing_key
session.encryption_key = self.encryption_key
- la_content = self._build_digest_content(
- wrm_header=wrm_header.dumps(),
- nonce=base64.b64encode(get_random_bytes(16)).decode(),
- wmrm_cipher=base64.b64encode(self._get_key_data(session)).decode(),
- cert_cipher=base64.b64encode(self._get_cipher_data(session)).decode(),
- protocol_version=protocol_version
+ acquire_license_message = XmlBuilder.AcquireLicenseMessage(
+ wrmheader=wrm_header.dumps(),
+ protocol_version=protocol_version,
+ wrmserver_data=Crypto.ecc256_encrypt(self._wmrm_key, session.xml_key.get_point()),
+ client_data=self._get_cipher_data(session),
+ signing_key=self.signing_key,
+ client_info=self.client_version,
+ revocation_lists=rev_lists
)
- la_content_xml = xmltodict.unparse(la_content, full_document=False)
+ soap_message = SoapMessage.create(acquire_license_message)
- la_hash_obj = SHA256.new()
- la_hash_obj.update(la_content_xml.encode())
- la_hash = la_hash_obj.digest()
+ return soap_message.dumps()
- signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
- signed_info_xml = xmltodict.unparse(signed_info, full_document=False)
-
- signature = self.__crypto.ecc256_sign(session.signing_key, signed_info_xml.encode())
- b64_signature = base64.b64encode(signature).decode()
-
- b64_public_singing_key = base64.b64encode(session.signing_key.public_bytes()).decode()
-
- return xmltodict.unparse(self._build_main_body(la_content, signed_info, b64_signature, b64_public_singing_key)).replace('\n', '')
-
- @staticmethod
- def _verify_encryption_key(session: Session, licence: XMRLicense) -> bool:
- ecc_keys = list(licence.get_object(XMRObjectTypes.ECC_DEVICE_KEY_OBJECT))
- if not ecc_keys:
- raise InvalidLicense("No ECC public key in license")
-
- return ecc_keys[0].key == session.encryption_key.public_bytes()
-
- def parse_license(self, session_id: bytes, licence: str) -> None:
+ def parse_license(self, session_id: bytes, soap_message: str) -> None:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id.hex()} is invalid.")
- if not licence:
- raise InvalidLicense("Cannot parse an empty licence message")
- if not isinstance(licence, str):
- raise InvalidLicense(f"Expected licence message to be a {str}, not {licence!r}")
+ if not soap_message:
+ raise InvalidXmrLicense("Cannot parse an empty licence message")
+ if not isinstance(soap_message, str):
+ raise InvalidXmrLicense(f"Expected licence message to be a {str}, not {soap_message!r}")
if not session.encryption_key or not session.signing_key:
raise InvalidSession("Cannot parse a license message without first making a license request")
- try:
- parser = ET.XMLParser(remove_blank_text=True)
- root = ET.XML(licence.encode(), parser)
- faults = root.findall(".//{http://schemas.xmlsoap.org/soap/envelope/}Fault")
+ soap_message = SoapMessage.loads(soap_message)
+ soap_message.raise_faults()
- for fault in faults:
- status_codes = fault.findall(".//StatusCode")
- for status_code in status_codes:
- code = DrmResult.from_code(status_code.text)
- raise ServerException(f"[{status_code.text}] ({code.name}) {code.message}")
+ licence = License(soap_message.get_message())
+ if licence.is_verifiable():
+ licence.verify()
- license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
+ if licence.rev_info is not None:
+ current_rev_info_file = Storage.read_file(RevocationList.CurrentRevListStorageName)
- for license_element in license_elements:
- parsed_licence = XMRLicense.loads(license_element.text)
+ if current_rev_info_file:
+ new_rev_info = RevocationList.merge(ET.fromstring(current_rev_info_file), licence.rev_info)
+ else:
+ new_rev_info = licence.rev_info
- if not self._verify_encryption_key(session, parsed_licence):
- raise InvalidLicense("Public encryption key does not match")
+ Storage.write_file(RevocationList.CurrentRevListStorageName, new_rev_info)
+ Storage.write_file(RevocationList.loads(new_rev_info).get_storage_file_name(), new_rev_info)
- is_scalable = bool(next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT), None))
-
- for content_key in parsed_licence.get_content_keys():
- cipher_type = Key.CipherType(content_key.cipher_type)
-
- if cipher_type not in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC):
- raise InvalidLicense(f"Invalid cipher type {cipher_type}")
-
- via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC
-
- decrypted = self.__crypto.ecc256_decrypt(
- private_key=session.encryption_key,
- ciphertext=content_key.encrypted_key
- )
- ci, ck = decrypted[:16], decrypted[16:]
-
- if is_scalable:
- ci, ck = decrypted[::2][:16], decrypted[1::2][:16]
-
- if via_symmetric:
- embedded_root_license = content_key.encrypted_key[:144]
- embedded_leaf_license = content_key.encrypted_key[144:]
-
- rgb_key = strxor(ck, self.MagicConstantZero)
- content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
-
- aux_key = next(parsed_licence.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"]
-
- uplink_x_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key)
- secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:])
-
- embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
- embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
-
- ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:]
-
- if not parsed_licence.check_signature(ci):
- raise InvalidLicense("License integrity signature does not match")
-
- session.keys.append(Key(
- key_id=UUID(bytes_le=content_key.key_id),
- key_type=content_key.key_type,
- cipher_type=content_key.cipher_type,
- key_length=content_key.key_length,
- key=ck
- ))
- except Exception as e:
- raise
- raise InvalidLicense(f"Unable to parse license: {e}")
+ for xmr_license in licence.licenses:
+ session.keys.append(xmr_license.get_content_key(session.encryption_key))
def get_keys(self, session_id: bytes) -> List[Key]:
session = self.__sessions.get(session_id)
diff --git a/pyplayready/crypto/__init__.py b/pyplayready/crypto/__init__.py
index 4e339eb..08e83b9 100644
--- a/pyplayready/crypto/__init__.py
+++ b/pyplayready/crypto/__init__.py
@@ -8,16 +8,16 @@ from ecpy.curves import Point, Curve
from pyplayready.crypto.elgamal import ElGamal
from pyplayready.crypto.ecc_key import ECCKey
+from pyplayready.system.util import Util
class Crypto:
- def __init__(self, curve: str = "secp256r1"):
- self.curve = Curve.get_curve(curve)
- self.elgamal = ElGamal(self.curve)
+ curve = Curve.get_curve("secp256r1")
- def ecc256_encrypt(self, public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
+ @staticmethod
+ def ecc256_encrypt(public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
if isinstance(public_key, ECCKey):
- public_key = public_key.get_point(self.curve)
+ public_key = public_key.get_point(Crypto.curve)
if not isinstance(public_key, Point):
raise ValueError(f"Expecting ECCKey or Point input, got {public_key!r}")
@@ -25,41 +25,39 @@ class Crypto:
plaintext = Point(
x=int.from_bytes(plaintext[:32], 'big'),
y=int.from_bytes(plaintext[32:64], 'big'),
- curve=self.curve
+ curve=Crypto.curve
)
if not isinstance(plaintext, Point):
raise ValueError(f"Expecting Point or Bytes input, got {plaintext!r}")
- point1, point2 = self.elgamal.encrypt(
- message_point=plaintext,
- public_key=public_key
- )
+ point1, point2 = ElGamal.encrypt(plaintext, public_key)
return b''.join([
- self.elgamal.to_bytes(point1.x),
- self.elgamal.to_bytes(point1.y),
- self.elgamal.to_bytes(point2.x),
- self.elgamal.to_bytes(point2.y)
+ Util.to_bytes(point1.x),
+ Util.to_bytes(point1.y),
+ Util.to_bytes(point2.x),
+ Util.to_bytes(point2.y)
])
- def ecc256_decrypt(self, private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
+ @staticmethod
+ def ecc256_decrypt(private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
if isinstance(ciphertext, bytes):
ciphertext = (
Point(
x=int.from_bytes(ciphertext[:32], 'big'),
y=int.from_bytes(ciphertext[32:64], 'big'),
- curve=self.curve
+ curve=Crypto.curve
),
Point(
x=int.from_bytes(ciphertext[64:96], 'big'),
y=int.from_bytes(ciphertext[96:128], 'big'),
- curve=self.curve
+ curve=Crypto.curve
)
)
if not isinstance(ciphertext, Tuple):
raise ValueError(f"Expecting Tuple[Point, Point] or Bytes input, got {ciphertext!r}")
- decrypted = self.elgamal.decrypt(ciphertext, int(private_key.key.d))
- return self.elgamal.to_bytes(decrypted.x)
+ decrypted = ElGamal.decrypt(ciphertext, int(private_key.key.d))
+ return Util.to_bytes(decrypted.x)
@staticmethod
def ecc256_sign(private_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes]) -> bytes:
diff --git a/pyplayready/crypto/ecc_key.py b/pyplayready/crypto/ecc_key.py
index 55996b1..ff8b68a 100644
--- a/pyplayready/crypto/ecc_key.py
+++ b/pyplayready/crypto/ecc_key.py
@@ -9,6 +9,8 @@ from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from ecpy.curves import Curve, Point
+from pyplayready.system.util import Util
+
class ECCKey:
"""Represents a PlayReady ECC key pair"""
@@ -68,18 +70,11 @@ class ECCKey:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps(private_only))
- @staticmethod
- def _to_bytes(n: int) -> bytes:
- byte_len = (n.bit_length() + 7) // 8
- if byte_len % 2 != 0:
- byte_len += 1
- return n.to_bytes(byte_len, 'big')
-
def get_point(self, curve: Curve) -> Point:
return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
def private_bytes(self) -> bytes:
- return self._to_bytes(int(self.key.d))
+ return Util.to_bytes(int(self.key.d))
def private_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
@@ -87,7 +82,7 @@ class ECCKey:
return hash_object.digest()
def public_bytes(self) -> bytes:
- return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y))
+ return Util.to_bytes(int(self.key.pointQ.x)) + Util.to_bytes(int(self.key.pointQ.y))
def public_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
diff --git a/pyplayready/crypto/elgamal.py b/pyplayready/crypto/elgamal.py
index ed6c3db..2a9adf5 100644
--- a/pyplayready/crypto/elgamal.py
+++ b/pyplayready/crypto/elgamal.py
@@ -7,25 +7,17 @@ import secrets
class ElGamal:
"""ElGamal ECC utility using ecpy"""
- def __init__(self, curve: Curve):
- """Initialize the utility with a given curve type ('secp256r1' for PlayReady)"""
- self.curve = curve
+ curve = Curve.get_curve("secp256r1")
@staticmethod
- def to_bytes(n: int) -> bytes:
- byte_len = (n.bit_length() + 7) // 8
- if byte_len % 2 != 0:
- byte_len += 1
- return n.to_bytes(byte_len, 'big')
-
- def encrypt(self, message_point: Point, public_key: Point) -> Tuple[Point, Point]:
+ def encrypt(message_point: Point, public_key: Point) -> Tuple[Point, Point]:
"""
Encrypt a single point with a given public key
Returns an encrypted point pair
"""
- ephemeral_key = secrets.randbelow(self.curve.order)
- point1 = ephemeral_key * self.curve.generator
+ ephemeral_key = secrets.randbelow(ElGamal.curve.order)
+ point1 = ephemeral_key * ElGamal.curve.generator
point2 = message_point + (ephemeral_key * public_key)
return point1, point2
diff --git a/pyplayready/license/license.py b/pyplayready/license/license.py
new file mode 100644
index 0000000..aaee087
--- /dev/null
+++ b/pyplayready/license/license.py
@@ -0,0 +1,126 @@
+from __future__ import annotations
+
+import base64
+import copy
+import hashlib
+import xml.etree.ElementTree as ET
+from typing import Union, Iterator
+
+from Crypto.PublicKey import ECC
+
+from pyplayready import Crypto
+from pyplayready.license.xmrlicense import XMRLicense
+from pyplayready.misc.exceptions import InvalidLicense
+from pyplayready.system.bcert import CertificateChain, BCertKeyUsage
+from pyplayready.system.util import Util
+
+
+class License:
+ def __init__(self, data: Union[str, bytes, ET.Element]):
+ if not data:
+ raise InvalidLicense("Data must not be empty")
+
+ if isinstance(data, str):
+ data = data.encode()
+
+ if isinstance(data, bytes):
+ self._root = ET.fromstring(data)
+ elif isinstance(data, ET.Element):
+ self._root = data
+ else:
+ raise InvalidLicense("Invalid data type")
+
+ self._original_root = copy.deepcopy(self._root)
+ Util.remove_namespaces(self._root)
+
+ if self._root.tag != "AcquireLicenseResponse":
+ raise InvalidLicense("License root must be AcquireLicenseResponse")
+
+ self._Response = self._root.find("AcquireLicenseResult/Response")
+
+ if self._Response is None:
+ raise InvalidLicense("Response not found in license")
+
+ self.rmsdk_version = self._Response.get("rmsdkVersion")
+
+ self._LicenseResponse = self._Response.find("LicenseResponse")
+ if self._Response is None:
+ raise InvalidLicense("LicenseResponse not found in license")
+
+ self.version = self._LicenseResponse.findtext("Version")
+
+ self.licenses = list(self._load_licenses())
+ self.rev_info = self._LicenseResponse.find("RevInfo")
+
+ self.transaction_id = self._LicenseResponse.find("Acknowledgement/TransactionID")
+
+ self.license_nonce = self._LicenseResponse.findtext("LicenseNonce")
+ self.response_id = self._LicenseResponse.findtext("ResponseID")
+
+ cert_chain_str = self._LicenseResponse.findtext("SigningCertificateChain")
+ self.signing_certificate_chain = CertificateChain.loads(cert_chain_str) if cert_chain_str else None
+
+ def _find_element_raw(self, name: str) -> ET.Element:
+ return self._original_root.find(f".//{name}", {
+ "": "http://www.w3.org/2000/09/xmldsig#",
+ "soap": "http://schemas.xmlsoap.org/soap/envelope/",
+ "proto": "http://schemas.microsoft.com/DRM/2007/03/protocols",
+ "msg": "http://schemas.microsoft.com/DRM/2007/03/protocols/messages"
+ })
+
+ def _load_licenses(self) -> Iterator[XMRLicense]:
+ Licenses = self._LicenseResponse.findall("Licenses/License")
+ if Licenses is None:
+ return iter([])
+
+ for license_ in Licenses:
+ yield XMRLicense.loads(license_.text)
+
+ def is_verifiable(self):
+ if self.signing_certificate_chain is None:
+ return False
+
+ signature = self._Response.find("Signature")
+
+ if signature is None:
+ return False
+ if signature.findtext("SignedInfo/Reference/DigestValue") is None:
+ return False
+ if signature.findtext("SignatureValue") is None:
+ return False
+
+ return True
+
+ def verify(self):
+ if not self.is_verifiable():
+ raise RuntimeError("Missing required information for license signature verification")
+
+ ET.register_namespace("", "http://schemas.microsoft.com/DRM/2007/03/protocols")
+
+ license_response_xml = ET.tostring(self._find_element_raw("proto:LicenseResponse"), short_empty_elements=False)
+ response_hash = hashlib.sha256(license_response_xml).digest()
+
+ Signature = self._Response.find("Signature")
+ digest_value = base64.b64decode(Signature.findtext("SignedInfo/Reference/DigestValue"))
+
+ if digest_value != response_hash:
+ raise InvalidLicense("Digest mismatch in license")
+
+ signing_leaf_cert = self.signing_certificate_chain.get(0)
+ signing_key_bytes = signing_leaf_cert.get_key_by_usage(BCertKeyUsage.SIGN_RESPONSE)
+
+ signing_key = ECC.construct(
+ point_x=int.from_bytes(signing_key_bytes[:32], "big"),
+ point_y=int.from_bytes(signing_key_bytes[32:], "big"),
+ curve="P-256"
+ )
+
+ ET.register_namespace("", "http://www.w3.org/2000/09/xmldsig#")
+ signed_info_xml = ET.tostring(self._find_element_raw("SignedInfo"), short_empty_elements=False)
+
+ signature_value = base64.b64decode(Signature.findtext("SignatureValue"))
+
+ if not Crypto.ecc256_verify(signing_key, signed_info_xml, signature_value):
+ raise InvalidLicense("Signature mismatch in license")
+
+ return True
diff --git a/pyplayready/license/xml_key.py b/pyplayready/license/xml_key.py
index cc501fd..eb42a52 100644
--- a/pyplayready/license/xml_key.py
+++ b/pyplayready/license/xml_key.py
@@ -1,7 +1,7 @@
from ecpy.curves import Point, Curve
from pyplayready.crypto.ecc_key import ECCKey
-from pyplayready.crypto.elgamal import ElGamal
+from pyplayready.system.util import Util
class XmlKey:
@@ -14,7 +14,7 @@ class XmlKey:
self.shared_key_x = self._shared_point.key.pointQ.x
self.shared_key_y = self._shared_point.key.pointQ.y
- self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
+ self._shared_key_x_bytes = Util.to_bytes(int(self.shared_key_x))
self.aes_iv = self._shared_key_x_bytes[:16]
self.aes_key = self._shared_key_x_bytes[16:]
diff --git a/pyplayready/license/xmrlicense.py b/pyplayready/license/xmrlicense.py
index 3ce6492..e9a7992 100644
--- a/pyplayready/license/xmrlicense.py
+++ b/pyplayready/license/xmrlicense.py
@@ -2,12 +2,18 @@ from __future__ import annotations
import base64
from enum import IntEnum
-from typing import Union
+from typing import Union, Tuple
+from uuid import UUID
from Crypto.Cipher import AES
from Crypto.Hash import CMAC
+from Crypto.Util.strxor import strxor
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
+from pyplayready import ECCKey, Crypto
+from pyplayready.license.key import Key
+from pyplayready.misc.exceptions import InvalidXmrLicense
+
class XMRObjectTypes(IntEnum):
INVALID = 0x0000
@@ -300,6 +306,11 @@ class _XMRLicenseStructs:
class XMRLicense(_XMRLicenseStructs):
"""Represents an XMRLicense"""
+ MagicConstantZero = bytes([
+ 0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f,
+ 0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb
+ ])
+
def __init__(
self,
parsed_license: Container,
@@ -336,8 +347,64 @@ class XMRLicense(_XMRLicenseStructs):
if container.type == type_:
yield container.data
- def get_content_keys(self):
- yield from self.get_object(XMRObjectTypes.CONTENT_KEY_OBJECT)
+ def get_device_key_obj(self) -> Container:
+ return next(self.get_object(XMRObjectTypes.ECC_DEVICE_KEY_OBJECT), None)
+
+ def get_content_key_obj(self) -> Container:
+ return next(self.get_object(XMRObjectTypes.CONTENT_KEY_OBJECT), None)
+
+ def is_scalable(self) -> bool:
+ return bool(next(self.get_object(XMRObjectTypes.AUX_KEY_OBJECT), None))
+
+ def get_content_key(self, encryption_key: ECCKey) -> Key:
+ ecc_key = self.get_device_key_obj()
+ if ecc_key is None:
+ raise InvalidXmrLicense("No ECC public key in license")
+
+ if ecc_key.key != encryption_key.public_bytes():
+ raise InvalidXmrLicense("Public encryption key does not match")
+
+ content_key = self.get_content_key_obj()
+ cipher_type = Key.CipherType(content_key.cipher_type)
+
+ if cipher_type not in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC):
+ raise InvalidXmrLicense(f"Invalid cipher type {cipher_type}")
+
+ via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC
+
+ decrypted = Crypto.ecc256_decrypt(encryption_key, content_key.encrypted_key)
+ ci, ck = decrypted[:16], decrypted[16:]
+
+ if self.is_scalable():
+ ci, ck = decrypted[::2][:16], decrypted[1::2][:16]
+
+ if via_symmetric:
+ embedded_root_license = content_key.encrypted_key[:144]
+ embedded_leaf_license = content_key.encrypted_key[144:]
+
+ rgb_key = strxor(ck, self.MagicConstantZero)
+ content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
+
+ aux_key = next(self.get_object(XMRObjectTypes.AUX_KEY_OBJECT))["auxiliary_keys"][0]["key"]
+
+ uplink_x_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key)
+ secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:])
+
+ embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
+ embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
+
+ ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:]
+
+ if not self.check_signature(ci):
+ raise InvalidXmrLicense("License integrity signature does not match")
+
+ return Key(
+ key_id=UUID(bytes_le=content_key.key_id),
+ key_type=content_key.key_type,
+ cipher_type=content_key.cipher_type,
+ key_length=content_key.key_length,
+ key=ck
+ )
def check_signature(self, integrity_key: bytes) -> bool:
cmac = CMAC.new(integrity_key, ciphermod=AES)
diff --git a/pyplayready/main.py b/pyplayready/main.py
index 8898aa6..936077f 100644
--- a/pyplayready/main.py
+++ b/pyplayready/main.py
@@ -7,12 +7,13 @@ import click
import requests
from Crypto.Random import get_random_bytes
-from pyplayready import __version__, InvalidCertificateChain, InvalidLicense
+from pyplayready import __version__, InvalidCertificateChain, InvalidXmrLicense
from pyplayready.cdm import Cdm
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.crypto.key_wrap import unwrap_wrapped_key
from pyplayready.device import Device
from pyplayready.misc.exceptions import OutdatedDevice
+from pyplayready.misc.revocation_list import RevocationList
from pyplayready.system.bcert import CertificateChain, Certificate, BCertCertType, BCertObjType, BCertFeatures, \
BCertKeyType, BCertKeyUsage
from pyplayready.system.pssh import PSSH
@@ -58,7 +59,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
session_id = cdm.open()
log.info("Opened Session")
- challenge = cdm.get_license_challenge(session_id, pssh.wrm_headers[0])
+ challenge = cdm.get_license_challenge(session_id, pssh.wrm_headers[0], rev_lists=RevocationList.SupportedListIds)
log.info("Created License Request (Challenge)")
log.debug(challenge)
@@ -79,7 +80,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
try:
cdm.parse_license(session_id, licence)
- except InvalidLicense as e:
+ except InvalidXmrLicense as e:
log.error(e)
return
@@ -100,7 +101,8 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> None:
"""
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
- https://testweb.playready.microsoft.com/Content/Content2X
+ https://learn.microsoft.com/en-us/playready/advanced/testcontent/playready-2x-test-content#tears-of-steel---4k-content
+
+ DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
@@ -175,7 +177,7 @@ def create_device(
raise InvalidCertificateChain("Device has already been provisioned")
if certificate_chain.get(0).get_type() != BCertCertType.ISSUER:
- raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate if type DEVICE")
+ raise InvalidCertificateChain("Leaf-most certificate must be of type ISSUER to issue certificate of type DEVICE")
if not certificate_chain.get(0).contains_public_key(group_key):
raise InvalidCertificateChain("Group key does not match this certificate")
@@ -411,23 +413,18 @@ def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -
for i in range(chai.count()):
cert = chai.get(i)
- log.info(f" Certificate {i}:")
+ log.info(f" + Certificate {i}:")
basic_info = cert.get_attribute(BCertObjType.BASIC)
if basic_info:
- log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}")
- log.info(f" + Security Level: SL{basic_info.attribute.security_level}")
- log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}")
- log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}")
+ log.info(f" + Cert Type: {BCertCertType(basic_info.attribute.cert_type).name}")
+ log.info(f" + Security Level: SL{basic_info.attribute.security_level}")
+ log.info(f" + Expiration Date: {datetime.fromtimestamp(basic_info.attribute.expiration_date)}")
+ log.info(f" + Client ID: {basic_info.attribute.client_id.hex()}")
- manufacturer_info = cert.get_attribute(BCertObjType.MANUFACTURER)
- if manufacturer_info:
- manu_attr = manufacturer_info.attribute
-
- def un_pad(name: bytes):
- return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
-
- log.info( f" + Name: {un_pad(manu_attr.manufacturer_name)} {un_pad(manu_attr.model_name)} {un_pad(manu_attr.model_number)}")
+ model_name = cert.get_name()
+ if model_name:
+ log.info( f" + Name: {model_name}")
feature_info = cert.get_attribute(BCertObjType.FEATURE)
if feature_info and feature_info.attribute.feature_count > 0:
@@ -435,22 +432,22 @@ def inspect(ctx: click.Context, device: Optional[Path], chain: Optional[Path]) -
lambda x: BCertFeatures(x).name,
feature_info.attribute.features
))
- log.info(f" + Features: {', '.join(features)}")
+ log.info(f" + Features: {', '.join(features)}")
key_info = cert.get_attribute(BCertObjType.KEY)
if key_info and key_info.attribute.key_count > 0:
key_attr = key_info.attribute
- log.info(f" + Cert Keys:")
+ log.info(f" + Cert Keys:")
for idx, key in enumerate(key_attr.cert_keys):
- log.info(f" + Key {idx}:")
- log.info(f" + Type: {BCertKeyType(key.type).name}")
- log.info(f" + Key Length: {key.length} bits")
+ log.info(f" + Key {idx}:")
+ log.info(f" + Type: {BCertKeyType(key.type).name}")
+ log.info(f" + Key Length: {key.length} bits")
usages = list(map(
lambda x: BCertKeyUsage(x).name,
key.usages
))
if len(usages) > 0:
- log.info(f" + Usages: {', '.join(usages)}")
+ log.info(f" + Usages: {', '.join(usages)}")
return None
diff --git a/pyplayready/misc/exceptions.py b/pyplayready/misc/exceptions.py
index dd63c01..0de26e3 100644
--- a/pyplayready/misc/exceptions.py
+++ b/pyplayready/misc/exceptions.py
@@ -1,5 +1,3 @@
-from pyplayready.misc.drmresults import DrmResult
-
class PyPlayreadyException(Exception):
"""Exceptions used by pyplayready."""
@@ -12,10 +10,22 @@ class InvalidSession(PyPlayreadyException):
"""No Session is open with the specified identifier."""
+class InvalidSoapMessage(PyPlayreadyException):
+ """The Soap Message is invalid or empty."""
+
+
class InvalidPssh(PyPlayreadyException):
"""The Playready PSSH is invalid or empty."""
+class InvalidWrmHeader(PyPlayreadyException):
+ """The Playready WRMHEADER is invalid or empty."""
+
+
+class InvalidChecksum(PyPlayreadyException):
+ """The Playready WRMHEADER key ID checksum is invalid or empty."""
+
+
class InvalidInitData(PyPlayreadyException):
"""The Playready Cenc Header Data is invalid or empty."""
@@ -24,10 +34,14 @@ class DeviceMismatch(PyPlayreadyException):
"""The Remote CDMs Device information and the APIs Device information did not match."""
-class InvalidLicense(PyPlayreadyException):
+class InvalidXmrLicense(PyPlayreadyException):
"""Unable to parse XMR License."""
+class InvalidLicense(PyPlayreadyException):
+ """Unable to parse License XML."""
+
+
class InvalidCertificate(PyPlayreadyException):
"""The BCert is not correctly formatted."""
@@ -41,4 +55,8 @@ class OutdatedDevice(PyPlayreadyException):
class ServerException(PyPlayreadyException):
- """Recasted on the client if found in license response."""
+ """Re-casted on the client if found in license response."""
+
+
+class InvalidRevocationList(PyPlayreadyException):
+ """The RevocationList is not correctly formatted."""
\ No newline at end of file
diff --git a/pyplayready/misc/revocation_list.py b/pyplayready/misc/revocation_list.py
new file mode 100644
index 0000000..dee2bea
--- /dev/null
+++ b/pyplayready/misc/revocation_list.py
@@ -0,0 +1,499 @@
+from __future__ import annotations
+
+import base64
+import hashlib
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Union, Optional, Iterator
+from uuid import UUID
+
+from Crypto.PublicKey import ECC
+from construct import Struct, Bytes, Switch, Int64ul, Int64ub, Int32ub, \
+ Int16ub, Int8ub, Array, this, Adapter, OneOf, If, Container, Select, GreedyBytes, String
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa, padding
+from ecpy import curve_defs
+from ecpy.curve_defs import WEIERSTRASS
+from ecpy.curves import Curve, Point
+from ecpy.ecdsa import ECDSA
+from ecpy.keys import ECPublicKey
+import xml.etree.ElementTree as ET
+
+from pyplayready import Crypto
+from pyplayready.misc.exceptions import InvalidRevocationList
+from pyplayready.system.bcert import CertificateChain, BCertCertType, BCertKeyUsage
+from pyplayready.system.util import Util
+
+
+class FileTime(Adapter):
+ EPOCH_AS_FILETIME = 116444736000000000
+ HUNDREDS_OF_NANOSECONDS = 10_000_000
+
+ def _decode(self, obj, context):
+ timestamp = (obj - self.EPOCH_AS_FILETIME) / self.HUNDREDS_OF_NANOSECONDS
+ return datetime.fromtimestamp(timestamp, timezone.utc)
+
+ def _encode(self, obj, context):
+ return self.EPOCH_AS_FILETIME + int(obj.timestamp() * self.HUNDREDS_OF_NANOSECONDS)
+
+
+class UUIDLe(Adapter):
+ def _decode(self, obj, context):
+ return UUID(bytes_le=obj)
+
+ def _encode(self, obj, context):
+ return obj.bytes_le
+
+
+class _RevocationStructs:
+ BRevInfoData = Struct(
+ "magic" / OneOf(Int32ub, [0x524C5649, 0x524C5632]), # RLVI / RLV2
+ "length" / Int32ub,
+ "format_version" / Int8ub,
+ "reserved" / Bytes(3),
+ "sequence_number" / Int32ub, # what's this?
+ "issued_time" / Switch(lambda ctx: ctx.magic, {
+ 0x524C5649: FileTime(Int64ul),
+ 0x524C5632: FileTime(Int64ub),
+ }),
+ "record_count" / Int32ub,
+ "records" / Array(this.record_count, Struct(
+ "list_id" / UUIDLe(Bytes(16)),
+ "version" / Int64ub
+ ))
+ )
+
+ BRevInfoSigned = Struct(
+ "data" / BRevInfoData,
+ "signature_type" / Int8ub,
+ "signature_size" / Switch(lambda ctx: ctx.signature_type, {
+ 1: 128,
+ 2: Int16ub
+ }),
+ "signature" / Bytes(this.signature_size),
+ "certificate_chain_length" / If(this.signature_type == 1, Int32ub),
+ "certificate_chain" / Select(CertificateChain.BCertChain, GreedyBytes)
+ )
+
+ BPrRLData = Struct(
+ "id" / Bytes(16),
+ "version" / Int32ub,
+ "entry_count" / Int32ub,
+ "revocation_entries" / Array(this.entry_count, Bytes(32)),
+ )
+
+ BPrRLSigned = Struct(
+ "data" / BPrRLData,
+ "signature_type" / Int8ub,
+ "signature_length" / Int16ub,
+ "signature" / Bytes(this.signature_length),
+ "certificate_chain" / Select(CertificateChain.BCertChain, GreedyBytes)
+ )
+
+ WMDRMNETData = Struct(
+ "version" / Int32ub,
+ "entry_count" / Int32ub,
+ "revocation_entries" / Array(this.entry_count, Bytes(20)),
+ "certificate_chain_length" / Int32ub,
+ "certificate_chain" / String(this.certificate_chain_length),
+ )
+
+ WMDRMNETSigned = Struct(
+ "data" / WMDRMNETData,
+ "signature_type" / Int8ub,
+ "signature_length" / Int16ub,
+ "signature" / Bytes(this.signature_length)
+ )
+
+
+class RevocationList(_RevocationStructs):
+
+ class ListID:
+ # Rev Info
+ REV_INFO = UUID("CCDE5A55-A688-4405-A88B-D13F90D5BA3E") # VVrezIimBUSoi9E/kNW6Pg==
+ REV_INFO_V2 = UUID("52D1FF11-D388-4EDD-82B7-68EA4C20A16C") # Ef/RUojT3U6Ct2jqTCChbA==
+
+ # PlayReady Revocation List
+ PLAYREADY_RUNTIME = UUID("4E9D8C8A-B652-45A7-9791-6925A6B4791F") # ioydTlK2p0WXkWklprR5Hw==
+ PLAYREADY_APPLICATION = UUID("28082E80-C7A3-40B1-8256-19E5B6D89B27") # gC4IKKPHsUCCVhnlttibJw==
+
+ # WMDRMNET Revocation List (deprecated: "LegacyXMLCert")
+ WMDRMNET = UUID("CD75E604-543D-4A9C-9F09-FE6D24E8BF90") # BOZ1zT1UnEqfCf5tJOi/kA==
+
+ # WMDRM Device Revocation List
+ DEVICE_REVOCATION = UUID("3129E375-CEB0-47D5-9CCA-9DB74CFD4332") # deMpMbDO1Uecyp23TP1DMg==
+
+ # App Revocation List
+ APP_REVOCATION = UUID("90A37313-0ECF-4CAA-A906-B188F6129300") # E3OjkM8OqkypBrGI9hKTAA==
+
+ SupportedListIds = [ListID.PLAYREADY_RUNTIME, ListID.PLAYREADY_APPLICATION, ListID.REV_INFO_V2, ListID.WMDRMNET]
+
+ RevocationDataPubKeyAllowList = [
+ bytes([
+ 0x3F, 0x3C, 0x09, 0x41, 0xB3, 0xE2, 0x45, 0xC4, 0xF0, 0x55, 0x32, 0xF1, 0x00, 0x40, 0xAA, 0x48,
+ 0xFD, 0x2A, 0xC8, 0x44, 0x23, 0x68, 0x2D, 0xBF, 0x45, 0xFE, 0x2A, 0x65, 0xFF, 0x4E, 0xFF, 0x3A,
+ 0x60, 0xC4, 0x2A, 0x71, 0x38, 0x61, 0xA3, 0xA7, 0xBC, 0x89, 0xB3, 0xE7, 0xB9, 0xA4, 0xF4, 0xAA,
+ 0xA2, 0x8B, 0xA8, 0xCE, 0xE6, 0x89, 0xBA, 0x8D, 0xF7, 0xB0, 0x1B, 0x6A, 0x79, 0xC7, 0xDC, 0x93,
+ ])
+ ]
+
+ pubkeyWMDRMNDRevocation = bytes([
+ 0x17, 0xab, 0x8d, 0x43, 0xe6, 0x47, 0xef, 0xba, 0xbd, 0x23,
+ 0x44, 0x66, 0x9f, 0x64, 0x04, 0x84, 0xf8, 0xe7, 0x71, 0x39,
+ 0xc7, 0x07, 0x36, 0x25, 0x5d, 0xa6, 0x5f, 0xba, 0xb9, 0x00,
+ 0xef, 0x9c, 0x89, 0x6b, 0xf2, 0xc4, 0x81, 0x1d, 0xa2, 0x12
+ ])
+
+ CurrentRevListStorageName = "RevInfo_Current.xml"
+
+ def __init__(self, parsed): # List[Tuple[UUID, Container]]
+ self.parsed = parsed
+
+ @staticmethod
+ def _verify_crl_signatures(crl: Container, data_struct) -> None:
+ if isinstance(crl.certificate_chain, bytes) and len(crl.certificate_chain) == 64:
+ # TODO: untested, since RLVI is deprecated
+ if crl.certificate_chain not in RevocationList.RevocationDataPubKeyAllowList:
+ raise InvalidRevocationList("Unallowed revocation list signing public key")
+
+ signing_pub_key = crl.certificate_chain
+ else:
+ signing_cert = CertificateChain(crl.certificate_chain)
+ signing_cert.verify_chain(
+ check_expiry=True,
+ cert_type=BCertCertType.CRL_SIGNER
+ )
+
+ leaf_signing_cert = signing_cert.get(0)
+ signing_pub_key = leaf_signing_cert.get_key_by_usage(BCertKeyUsage.SIGN_CRL)
+
+ signing_key = ECC.construct(
+ curve='P-256',
+ point_x=int.from_bytes(signing_pub_key[:32]),
+ point_y=int.from_bytes(signing_pub_key[32:])
+ )
+
+ sign_payload = data_struct.build(crl.data)
+
+ if not Crypto.ecc256_verify(
+ public_key=signing_key,
+ data=sign_payload,
+ signature=crl.signature
+ ):
+ raise InvalidRevocationList("Revocation List signature is not authentic")
+
+ @staticmethod
+ def _verify_wmdrmnet_wrap_signature(xml: str) -> bool:
+ self = RevocationList
+
+ # Microsoft's ECC1 curve
+ msdrm_ecc1_params = {
+ 'name': "msdrm-ecc1",
+ 'type': WEIERSTRASS,
+ 'size': 160,
+ 'field': 0x89abcdef012345672718281831415926141424f7, # q
+ 'generator': (0x8723947fd6a3a1e53510c07dba38daf0109fa120, # gen x
+ 0x445744911075522d8c3c5856d4ed7acda379936f), # gen y
+ 'order': 0x89abcdef012345672716b26eec14904428c2a675, # n
+ 'cofactor': 0x1,
+ 'a': 0x37a5abccd277bce87632ff3d4780c009ebe41497, # a
+ 'b': 0x0dd8dabf725e2f3228e85f1ad78fdedf9328239e, # b
+ }
+
+ curve_defs.curves.append(msdrm_ecc1_params)
+ msdrm_ecc1 = Curve.get_curve("msdrm-ecc1")
+
+ public_point = Point(
+ x=int.from_bytes(self.pubkeyWMDRMNDRevocation[:20], "little"),
+ y=int.from_bytes(self.pubkeyWMDRMNDRevocation[20:], "little"),
+ curve=msdrm_ecc1,
+ check=True
+ )
+
+ public_key = ECPublicKey(public_point)
+
+ root = ET.fromstring(f"{xml}")
+ signature_value_element = root.find("SIGNATURE/VALUE")
+
+ if signature_value_element is None:
+ raise InvalidRevocationList("No SIGNATURE VALUE found in WMDRMNET revocation wrap")
+
+ signature_value = base64.b64decode(signature_value_element.text)
+ if len(signature_value) != 40:
+ raise InvalidRevocationList("Invalid WMDRMNET revocation wrap SIGNATURE length")
+
+ r = int.from_bytes(signature_value[:20], "little")
+ s = int.from_bytes(signature_value[20:], "little")
+
+ if not r < msdrm_ecc1_params["order"] or not s < msdrm_ecc1_params["order"]:
+ raise InvalidRevocationList("Invalid WMDRMNET revocation wrap SIGNATURE")
+
+ data_element = root.find("DATA")
+ if data_element is None:
+ raise InvalidRevocationList("No DATA element found in WMDRMNET revocation wrap")
+
+ # contents = ["" | element contents | ""]
+ data_bytes = ET.tostring(data_element, encoding="utf-8")
+ data_digest = hashlib.sha1(data_bytes).digest()
+
+ signer = ECDSA("ITUPLE")
+ authentic = signer.verify(
+ data_digest,
+ (r, s),
+ public_key
+ )
+
+ # Windows Media DRM (WMDRM) Signature verification:
+ # Signature values (r, s) and the public key are both little-endian and loaded directly as
+ # integers. (We're also not using Montgomery but that shouldn't change anything)
+ # Despite loading them correctly and all checks on the pubkey and (r, s) being valid,
+ # signature verification still fails and (TODO) I DON'T KNOW WHY
+ # Useful sources:
+ # http://bearcave.com/misl/misl_tech/msdrm/technical.html
+ # https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-drmcd/36aabf50-a6be-4eb2-8f36-e1879eb54585
+ # https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-drm/76e5839c-5395-447a-a48c-3ab724c972a3
+
+ return True
+
+ @staticmethod
+ def _unwrap_wmdrmnet_list(xml: str) -> bytes:
+ root = ET.fromstring(f"{xml}")
+ data_template = root.findtext("DATA/TEMPLATE")
+
+ if not data_template:
+ raise InvalidRevocationList("No DATA/TEMPLATE found in WMDRMNET revocation wrap")
+
+ return base64.b64decode(data_template)
+
+ @staticmethod
+ def _verify_prnd_certificate(data: str) -> None:
+ root = ET.fromstring(data)
+
+ ET.register_namespace("c", "http://schemas.microsoft.com/DRM/2004/02/cert")
+ ET.register_namespace("", "http://www.w3.org/2000/09/xmldsig#")
+
+ _ns = {"c": "http://schemas.microsoft.com/DRM/2004/02/cert"}
+
+ for cert in root.findall("c:Certificate", _ns):
+ data_elem = cert.find("c:Data", _ns)
+ if data_elem is None:
+ raise InvalidRevocationList("Missing Data")
+
+ data_xml = ET.tostring(data_elem)
+
+ Util.remove_namespaces(cert)
+
+ digest_val = cert.findtext("Signature/SignedInfo/Reference/DigestValue")
+ if not digest_val:
+ raise InvalidRevocationList("Missing DigestValue")
+
+ digest_calc = base64.b64encode(hashlib.sha1(data_xml).digest()).decode()
+ if digest_val != digest_calc:
+ raise InvalidRevocationList("Digest mismatch")
+
+ rsa_key_value = cert.find("Signature/KeyInfo/KeyValue/RSAKeyValue")
+ mod_b64 = rsa_key_value.findtext("Modulus")
+ exp_b64 = rsa_key_value.findtext("Exponent")
+ if not mod_b64 or not exp_b64:
+ raise InvalidRevocationList("Missing Modulus/Exponent")
+
+ modulus_int = int.from_bytes(base64.b64decode(mod_b64), "big")
+
+ exp_raw = bytearray(base64.b64decode(exp_b64))
+ exp_bytes = (b"\x00" + exp_raw[::-1])
+ exponent_int = int.from_bytes(exp_bytes, "big")
+
+ pub_key = rsa.RSAPublicNumbers(exponent_int, modulus_int).public_key(default_backend())
+
+ sig_b64 = cert.findtext("Signature/SignatureValue")
+ if not sig_b64:
+ raise InvalidRevocationList("Missing SignatureValue")
+
+ sig_bytes = base64.b64decode(sig_b64)
+
+ pub_key.verify(
+ signature=sig_bytes,
+ data=data_xml,
+ padding=padding.PSS(padding.MGF1(hashes.SHA1()), padding.PSS.MAX_LENGTH),
+ algorithm=hashes.SHA1()
+ )
+
+ @staticmethod
+ def _get_wmdrmnet_crl_keys(data: str) -> Iterator[rsa.RSAPublicKey]:
+ root = ET.fromstring(data.encode())
+ Util.remove_namespaces(root)
+
+ for cert in root.findall('Certificate'):
+ data_elem = cert.find("Data")
+ if data_elem is None:
+ continue
+
+ key_usage_elem = data_elem.findtext('KeyUsage/SignCRL')
+ if key_usage_elem != "1":
+ continue
+
+ rsa_elem = data_elem.find('PublicKey/KeyValue/RSAKeyValue')
+ if rsa_elem is None:
+ continue
+
+ modulus_b64 = rsa_elem.findtext('Modulus')
+ exponent_b64 = rsa_elem.findtext('Exponent')
+ if not modulus_b64 or not exponent_b64:
+ continue
+
+ modulus = int.from_bytes(base64.b64decode(modulus_b64), 'big')
+ exponent = int.from_bytes(base64.b64decode(exponent_b64), 'big')
+
+ yield rsa.RSAPublicNumbers(exponent, modulus).public_key()
+
+ return None
+
+ @staticmethod
+ def _parse_list(list_id: UUID, data: bytes):
+ self = RevocationList
+
+ if list_id in (self.ListID.REV_INFO, self.ListID.REV_INFO_V2):
+ rev_info = self.BRevInfoSigned.parse(data)
+ self._verify_crl_signatures(rev_info, self.BRevInfoData)
+
+ return list_id, rev_info
+ elif list_id in (self.ListID.PLAYREADY_RUNTIME, self.ListID.PLAYREADY_APPLICATION):
+ pr_rl = self.BPrRLSigned.parse(data)
+ self._verify_crl_signatures(pr_rl, self.BPrRLData)
+
+ return list_id, pr_rl
+ elif list_id == self.ListID.WMDRMNET:
+ try:
+ xml = data.decode("utf-16-le")
+ if "" in xml:
+ if not self._verify_wmdrmnet_wrap_signature(xml):
+ raise InvalidRevocationList("WMDRMNET wrap signature is not authentic")
+
+ wmdrmnet_data = self._unwrap_wmdrmnet_list(xml)
+ else:
+ raise InvalidRevocationList("WMDRMNET revocation list cannot be valid UTF-16-LE and not be wrapped")
+ except UnicodeDecodeError:
+ wmdrmnet_data = base64.b64decode(data)
+
+ wmdrmnet_parsed = self.WMDRMNETSigned.parse(wmdrmnet_data)
+ certificate_chain = wmdrmnet_parsed.data.certificate_chain.decode()
+
+ self._verify_prnd_certificate(certificate_chain)
+ crl_pub_key = next(self._get_wmdrmnet_crl_keys(certificate_chain), None)
+
+ crl_pub_key.verify(
+ signature=wmdrmnet_parsed.signature,
+ data=self.WMDRMNETData.build(wmdrmnet_parsed.data),
+ padding=padding.PSS(padding.MGF1(hashes.SHA1()), padding.PSS.MAX_LENGTH),
+ algorithm=hashes.SHA1()
+ )
+
+ return list_id, wmdrmnet_parsed
+
+ # TODO: DEVICE_REVOCATION, APP_REVOCATION
+
+ return None
+
+ @staticmethod
+ def _remove_utf8_bom(data: bytes) -> bytes:
+ # https://en.wikipedia.org/wiki/Byte_order_mark#UTF-8
+
+ if data[:3] == b"\xEF\xBB\xBF":
+ return data[3:]
+ return data
+
+ @staticmethod
+ def _verify_and_parse(revocation):
+ list_id = revocation.find("ListID")
+
+ if list_id is None or not list_id.text:
+ raise InvalidRevocationList(f" is either missing or empty")
+
+ list_id_uuid = UUID(bytes_le=base64.b64decode(list_id.text))
+
+ list_data = revocation.find("ListData")
+ if list_data is None or not list_data.text:
+ raise InvalidRevocationList(f" is either missing or empty")
+
+ return RevocationList._parse_list(list_id_uuid, base64.b64decode(list_data.text))
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes, ET.Element]) -> RevocationList:
+ if isinstance(data, str):
+ data = data.encode()
+ if isinstance(data, bytes):
+ root = ET.fromstring(cls._remove_utf8_bom(data))
+ else:
+ root = data
+
+ if root.tag != "RevInfo":
+ raise InvalidRevocationList("Root element is not ")
+
+ revocations = root.findall("Revocation")
+
+ return cls(list(map(
+ cls._verify_and_parse,
+ revocations
+ )))
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> RevocationList:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ @staticmethod
+ def merge(root: ET.Element, root2: ET.Element) -> ET.Element:
+ if root.tag != "RevInfo" or root2.tag != "RevInfo":
+ raise InvalidRevocationList("Root element is not ")
+
+ revocation = root.findall("Revocation")
+
+ def _get_version(parsed):
+ if parsed[0] in (RevocationList.ListID.REV_INFO, RevocationList.ListID.REV_INFO_V2):
+ return parsed[1].data.sequence_number
+ return parsed[1].data.version
+
+ def find_in_revs(list_id: UUID):
+ for rev in revocation:
+ parsed_rev = RevocationList._verify_and_parse(rev)
+ if parsed_rev[0] == list_id:
+ return rev, _get_version(parsed_rev)
+ return None, None
+
+ for revocation2 in root2.findall("Revocation"):
+ parsed_rev2 = RevocationList._verify_and_parse(revocation2)
+
+ rev_find, version = find_in_revs(parsed_rev2[0])
+ if rev_find is None:
+ root.append(revocation2)
+ else:
+ if _get_version(parsed_rev2) > version:
+ rev_find.find("ListData").text = revocation2.find("ListData").text
+
+ return root
+
+ def get_by_id(self, uuid: UUID) -> Optional[Container]:
+ for rev_list in self.parsed:
+ if rev_list[0] == uuid:
+ return rev_list[1]
+
+ return None
+
+ def get_storage_file_name(self):
+ rev_list = self.get_by_id(self.ListID.REV_INFO_V2)
+ list_name = "RevInfo2"
+
+ if rev_list is None:
+ rev_list = self.get_by_id(self.ListID.REV_INFO)
+ list_name = "RevInfo"
+
+ if rev_list is None:
+ raise InvalidRevocationList("No RevInfo available")
+
+ list_version = rev_list.data.sequence_number
+ list_date = rev_list.data.issued_time.strftime("%Y%m%d")
+
+ return f"{list_name}v{list_version}_{list_date}.xml"
diff --git a/pyplayready/misc/soap_message.py b/pyplayready/misc/soap_message.py
new file mode 100644
index 0000000..bf0a445
--- /dev/null
+++ b/pyplayready/misc/soap_message.py
@@ -0,0 +1,92 @@
+from __future__ import annotations
+
+import html
+import xml.etree.ElementTree as ET
+from typing import Union, Optional
+
+from pyplayready.misc.drmresults import DrmResult
+from pyplayready.misc.exceptions import InvalidSoapMessage, ServerException
+
+
+class SoapMessage:
+ XML_DECLARATION = ''
+
+ _NS = {
+ "soap": "http://schemas.xmlsoap.org/soap/envelope/",
+ "envelope": "http://www.w3.org/2003/05/soap-envelope"
+ }
+
+ def __init__(self, root: ET.Element):
+ self.root = root
+
+ @classmethod
+ def create(cls, message: ET.Element) -> SoapMessage:
+ Envelope = ET.Element("soap:Envelope", {
+ "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
+ "xmlns:xsd": "http://www.w3.org/2001/XMLSchema",
+ "xmlns:soap": "http://schemas.xmlsoap.org/soap/envelope/"
+ })
+
+ Body = ET.SubElement(Envelope, "soap:Body")
+ Body.append(message)
+
+ return cls(Envelope)
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> SoapMessage:
+ if not data:
+ raise InvalidSoapMessage("Data must not be empty")
+
+ if isinstance(data, str):
+ data = data.encode()
+
+ parser = ET.XMLParser(encoding="utf-8")
+ root = ET.fromstring(data, parser=parser)
+
+ if not root.tag.endswith("Envelope"):
+ raise InvalidSoapMessage("Soap Message root must be Envelope")
+
+ return cls(root)
+
+ def get_message(self) -> Optional[ET.Element]:
+ Body = self.root.find("soap:Body", self._NS) or self.root.find("envelope:Body", self._NS)
+ if Body is None:
+ return None
+
+ if len(list(Body)) == 0:
+ return None
+
+ return Body[0]
+
+ @staticmethod
+ def read_namespace(element) -> Optional[str]:
+ if element.tag.startswith("{"):
+ return element.tag.split("}")[0][1:]
+ return None
+
+ def raise_faults(self):
+ fault = self.get_message()
+
+ if not fault.tag.endswith("Fault"):
+ return
+
+ nsmap = {"soap": self.read_namespace(fault)}
+
+ status_code = fault.findtext("detail/Exception/StatusCode")
+ drm_result = DrmResult.from_code(status_code) if status_code is not None else None
+ fault_text = fault.findtext("faultstring") or fault.findtext("soap:Reason/soap:Text", namespaces=nsmap)
+
+ error_message = fault_text or getattr(drm_result, "message", "(No message)")
+ exception_message = (f"[{drm_result.name}] " if drm_result else "") + error_message
+
+ raise ServerException(exception_message)
+
+ def dumps(self) -> str:
+ xml_data = ET.tostring(
+ self.root,
+ short_empty_elements=False,
+ encoding="utf-8"
+ )
+
+ # this shouldn't exist
+ return self.XML_DECLARATION + html.unescape(xml_data.decode())
diff --git a/pyplayready/misc/storage.py b/pyplayready/misc/storage.py
new file mode 100644
index 0000000..3455dac
--- /dev/null
+++ b/pyplayready/misc/storage.py
@@ -0,0 +1,35 @@
+import os
+from pathlib import Path
+from typing import Optional
+
+from platformdirs import user_data_dir
+
+
+class Storage:
+
+ @staticmethod
+ def _get_initialized_path() -> Path:
+ storage_path = Path(user_data_dir("pyplayready", "DevLARLEY"))
+ storage_path.mkdir(parents=True, exist_ok=True)
+ return storage_path
+
+ @staticmethod
+ def write_file(file_name: str, data: bytes) -> bool:
+ storage_path = Storage._get_initialized_path()
+ storage_file = storage_path / file_name
+
+ new_file = not storage_file.exists()
+
+ storage_file.write_bytes(data)
+
+ return new_file
+
+ @staticmethod
+ def read_file(file_name: str) -> Optional[bytes]:
+ storage_path = Storage._get_initialized_path()
+ storage_file = storage_path / file_name
+
+ if not storage_file.exists():
+ return None
+
+ return storage_file.read_bytes()
diff --git a/pyplayready/remote/remotecdm.py b/pyplayready/remote/remotecdm.py
index bb76e34..f54a448 100644
--- a/pyplayready/remote/remotecdm.py
+++ b/pyplayready/remote/remotecdm.py
@@ -1,15 +1,15 @@
from __future__ import annotations
import logging
-from typing import Union
+from typing import Union, Optional, List
+from uuid import UUID
import requests
-from pyplayready import InvalidLicense
+from pyplayready import InvalidXmrLicense
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.license.key import Key
-
from pyplayready.misc.exceptions import (DeviceMismatch, InvalidInitData)
from pyplayready.system.wrmheader import WRMHeader
@@ -95,19 +95,22 @@ class RemoteCdm(Cdm):
if response.status_code != 200:
raise ValueError(f"Cannot Close CDM Session, {response_json['message']} [{response.status_code}]")
- def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str]) -> str:
+ def get_license_challenge(self, session_id: bytes, wrm_header: Union[WRMHeader, str], rev_lists: Optional[List[UUID]]=None) -> str:
if not wrm_header:
raise InvalidInitData("A wrm_header must be provided.")
if isinstance(wrm_header, WRMHeader):
wrm_header = wrm_header.dumps()
if not isinstance(wrm_header, str):
raise ValueError(f"Expected WRMHeader to be a {str} or {WRMHeader} not {wrm_header!r}")
+ if rev_lists and not isinstance(rev_lists, list):
+ raise ValueError(f"Expected rev_lists to be a {list} not {rev_lists!r}")
response = self.__session.post(
url=f"{self.host}/{self.device_name}/get_license_challenge",
json={
"session_id": session_id.hex(),
- "init_data": wrm_header
+ "init_data": wrm_header,
+ **({"rev_lists": list(map(str, rev_lists))} if rev_lists else {})
}
)
response_json = response.json()
@@ -119,10 +122,10 @@ class RemoteCdm(Cdm):
def parse_license(self, session_id: bytes, license_message: str) -> None:
if not license_message:
- raise InvalidLicense("Cannot parse an empty license_message")
+ raise InvalidXmrLicense("Cannot parse an empty license_message")
if not isinstance(license_message, str):
- raise InvalidLicense(f"Expected license_message to be a {str}, not {license_message!r}")
+ raise InvalidXmrLicense(f"Expected license_message to be a {str}, not {license_message!r}")
response = self.__session.post(
url=f"{self.host}/{self.device_name}/parse_license",
@@ -158,6 +161,3 @@ class RemoteCdm(Cdm):
)
for key in response_json["data"]["keys"]
]
-
-
-__all__ = ("RemoteCdm",)
diff --git a/pyplayready/remote/serve.py b/pyplayready/remote/serve.py
index 1480e43..08fc941 100644
--- a/pyplayready/remote/serve.py
+++ b/pyplayready/remote/serve.py
@@ -1,5 +1,6 @@
from pathlib import Path
from typing import Any, Optional, Union
+from uuid import UUID
from aiohttp.typedefs import Handler
from aiohttp import web
@@ -8,7 +9,7 @@ from pyplayready import __version__, PSSH
from pyplayready.cdm import Cdm
from pyplayready.device import Device
-from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidLicense, InvalidPssh)
+from pyplayready.misc.exceptions import (InvalidSession, TooManySessions, InvalidXmrLicense, InvalidPssh)
routes = web.RouteTableDef()
@@ -103,6 +104,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
session_id = bytes.fromhex(body["session_id"])
init_data = body["init_data"]
+ rev_lists = body.get("rev_lists")
if not init_data.startswith(" web.Response:
license_request = cdm.get_license_challenge(
session_id=session_id,
wrm_header=init_data,
+ rev_lists=list(map(UUID, rev_lists)) if rev_lists else None
)
except InvalidSession:
return web.json_response({"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}, status=400)
@@ -151,7 +154,7 @@ async def parse_license(request: web.Request) -> web.Response:
cdm.parse_license(session_id, license_message)
except InvalidSession:
return web.json_response({"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}, status=400)
- except InvalidLicense as e:
+ except InvalidXmrLicense as e:
return web.json_response({"message": f"Invalid License, {e}"}, status=400)
except Exception as e:
return web.json_response({"message": f"Error, {e}"}, status=500)
diff --git a/pyplayready/system/bcert.py b/pyplayready/system/bcert.py
index e2496c6..4369410 100644
--- a/pyplayready/system/bcert.py
+++ b/pyplayready/system/bcert.py
@@ -14,10 +14,11 @@ from enum import IntEnum
from Crypto.PublicKey import ECC
-from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
+from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer, Embedded
from construct import Int16ub, Array
from construct import Struct, this
+from pyplayready.system.util import Util
from pyplayready.crypto import Crypto
from pyplayready.misc.exceptions import InvalidCertificateChain, InvalidCertificate
from pyplayready.crypto.ecc_key import ECCKey
@@ -127,6 +128,12 @@ class BCertFeatures(IntEnum):
class _BCertStructs:
+ Header = Struct(
+ "flags" / Int16ub,
+ "tag" / Int16ub,
+ "length" / Int32ub,
+ )
+
BasicInfo = Struct(
"cert_id" / Bytes(16),
"security_level" / Int32ub,
@@ -224,10 +231,22 @@ class _BCertStructs:
"signature" / Bytes(this.signature_size)
)
+ ExtDataHwid = Struct(
+ "record_length" / Int32ub,
+ "record_data" / Bytes(this.record_length),
+ "padding" / Bytes((4 - (this.record_length % 4)) % 4)
+ )
+
+ # defined manually, since refactoring everything is not worth it
ExtDataContainer = Struct(
- "record_count" / Int32ub, # always 1
- "records" / Array(this.record_count, DataRecord),
- "signature" / ExtDataSignature
+ "record" / Struct(
+ Embedded(Header),
+ Embedded(ExtDataHwid)
+ ),
+ "signature" / Struct(
+ Embedded(Header),
+ Embedded(ExtDataSignature)
+ )
)
# TODO: untested
@@ -242,9 +261,7 @@ class _BCertStructs:
)
Attribute = Struct(
- "flags" / Int16ub,
- "tag" / Int16ub,
- "length" / Int32ub,
+ Embedded(Header),
"attribute" / Switch(
lambda this_: this_.tag,
{
@@ -260,8 +277,8 @@ class _BCertStructs:
BCertObjType.METERING: MeteringInfo,
BCertObjType.EXTDATASIGNKEY: ExtDataSignKeyInfo,
BCertObjType.EXTDATACONTAINER: ExtDataContainer,
- BCertObjType.EXTDATASIGNATURE: ExtDataSignature,
- BCertObjType.EXTDATA_HWID: Bytes(this.length - 8),
+ # BCertObjType.EXTDATASIGNATURE: ExtDataSignature,
+ # BCertObjType.EXTDATA_HWID: ExtDataHwid,
BCertObjType.SERVER: ServerInfo,
BCertObjType.SECURITY_VERSION: SecurityVersion,
BCertObjType.SECURITY_VERSION_2: SecurityVersion
@@ -462,15 +479,16 @@ class Certificate(_BCertStructs):
return None
def get_name(self) -> Optional[str]:
- manufacturer_info = self.get_attribute(BCertObjType.MANUFACTURER)
+ manufacturer_info_attr = self.get_attribute(BCertObjType.MANUFACTURER)
- if manufacturer_info:
- manufacturer_info_attr = manufacturer_info.attribute
+ if manufacturer_info_attr:
+ manufacturer_info = manufacturer_info_attr.attribute
- def un_pad(name: bytes):
- return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
+ manufacturer = Util.un_pad(manufacturer_info.manufacturer_name)
+ model_name = Util.un_pad(manufacturer_info.model_name)
+ model_number = Util.un_pad(manufacturer_info.model_number)
- return f"{un_pad(manufacturer_info_attr.manufacturer_name)} {un_pad(manufacturer_info_attr.model_name)} {un_pad(manufacturer_info_attr.model_number)}"
+ return f"{manufacturer} {model_name} {model_number}"
return None
@@ -524,13 +542,40 @@ class Certificate(_BCertStructs):
def dumps(self) -> bytes:
return self._BCERT.build(self.parsed)
+ def _verify_extdata_signature(self) -> None:
+ sign_key = self.get_attribute(BCertObjType.EXTDATASIGNKEY)
+ if not sign_key:
+ raise InvalidCertificate("No extdata sign key object found in certificate")
+
+ sign_key_bytes = sign_key.attribute.key
+
+ signing_key = ECC.construct(
+ point_x=int.from_bytes(sign_key_bytes[:32], "big"),
+ point_y=int.from_bytes(sign_key_bytes[32:], "big"),
+ curve="P-256"
+ )
+
+ extdata = self.get_attribute(BCertObjType.EXTDATACONTAINER)
+ if not extdata:
+ raise InvalidCertificate("No extdata container found in certificate")
+
+ signature = extdata.attribute.signature.signature
+
+ sign_data = _BCertStructs.ExtDataContainer.subcons[0].build(extdata.attribute.record)
+
+ if not Crypto.ecc256_verify(
+ public_key=signing_key,
+ data=sign_data,
+ signature=signature
+ ):
+ raise InvalidCertificate("Signature of certificate extdata is not authentic")
+
def verify_signature(self) -> None:
signature_object = self.get_attribute(BCertObjType.SIGNATURE)
if not signature_object:
- raise InvalidCertificate(f"No signature object found in certificate")
+ raise InvalidCertificate("No signature object found in certificate")
signature_attribute = signature_object.attribute
-
raw_signature_key = signature_attribute.signature_key
signature_key = ECC.construct(
@@ -546,7 +591,14 @@ class Certificate(_BCertStructs):
data=sign_payload,
signature=signature_attribute.signature
):
- raise InvalidCertificate(f"Signature of certificate is not authentic")
+ raise InvalidCertificate("Signature of certificate is not authentic")
+
+ basic_info_attribute = self.get_attribute(BCertObjType.BASIC)
+ if not basic_info_attribute:
+ raise InvalidCertificate("No basic info object found in certificate")
+
+ if basic_info_attribute.attribute.flags & BCertFlag.EXTDATA_PRESENT == BCertFlag.EXTDATA_PRESENT:
+ self._verify_extdata_signature()
class CertificateChain(_BCertStructs):
diff --git a/pyplayready/system/builder.py b/pyplayready/system/builder.py
new file mode 100644
index 0000000..47802a5
--- /dev/null
+++ b/pyplayready/system/builder.py
@@ -0,0 +1,238 @@
+import base64
+import hashlib
+import html
+import time
+import xml.etree.ElementTree as ET
+from typing import Optional, List
+from uuid import UUID
+
+from Crypto.Random import get_random_bytes
+
+from pyplayready import ECCKey, Crypto
+from pyplayready.misc.revocation_list import RevocationList
+from pyplayready.misc.storage import Storage
+from pyplayready.system.bcert import CertificateChain
+
+
+class XmlBuilder:
+
+ @staticmethod
+ def _ClientInfo(parent: ET.Element, client_version: str) -> ET.Element:
+ ClientInfo = ET.SubElement(parent, "CLIENTINFO")
+
+ ClientVersion = ET.SubElement(ClientInfo, "CLIENTVERSION")
+ ClientVersion.text = client_version
+
+ return ClientVersion
+
+ @staticmethod
+ def _RevListInfo(parent: ET.Element, list_id: UUID, version: int) -> ET.Element:
+ RevListInfo = ET.SubElement(parent, "RevListInfo")
+
+ ListID = ET.SubElement(RevListInfo, "ListID")
+ ListID.text = base64.b64encode(list_id.bytes_le).decode()
+
+ Version = ET.SubElement(RevListInfo, "Version")
+ Version.text = str(version)
+
+ return RevListInfo
+
+ @staticmethod
+ def _RevocationLists(parent: ET.Element, rev_lists: List[UUID]) -> ET.Element:
+ RevocationLists = ET.SubElement(parent, "RevocationLists")
+
+ load_result = Storage.read_file(RevocationList.CurrentRevListStorageName)
+ if load_result is None:
+ for rev_list in rev_lists:
+ XmlBuilder._RevListInfo(RevocationLists, rev_list, 0)
+
+ return RevocationLists
+
+ loaded_list = RevocationList.loads(load_result)
+
+ for list_id, list_data in loaded_list.parsed:
+ if list_id not in rev_lists:
+ continue
+
+ if list_id == RevocationList.ListID.REV_INFO_V2:
+ version = list_data.data.sequence_number
+ else:
+ version = list_data.data.version
+
+ XmlBuilder._RevListInfo(RevocationLists, list_id, version)
+
+ return RevocationLists
+
+ @staticmethod
+ def _LicenseAcquisition(
+ parent: ET.Element,
+ wrmheader: str,
+ protocol_version: int,
+ wrmserver_data: bytes,
+ client_data: bytes,
+ client_info: Optional[str] = None,
+ revocation_lists: Optional[List[UUID]] = None
+ ) -> ET.Element:
+ LA = ET.SubElement(parent, "LA", {
+ "xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols",
+ "Id": "SignedData",
+ "xml:space": "preserve"
+ })
+
+ Version = ET.SubElement(LA, "Version")
+ Version.text = str(protocol_version)
+
+ ContentHeader = ET.SubElement(LA, "ContentHeader")
+ ContentHeader.text = wrmheader
+
+ if client_info is not None:
+ XmlBuilder._ClientInfo(LA, client_info)
+
+ if revocation_lists is not None:
+ XmlBuilder._RevocationLists(LA, revocation_lists)
+
+ LicenseNonce = ET.SubElement(LA, "LicenseNonce")
+ LicenseNonce.text = base64.b64encode(get_random_bytes(16)).decode()
+
+ ClientTime = ET.SubElement(LA, "ClientTime")
+ ClientTime.text = str(int(time.time()))
+
+ EncryptedData = ET.SubElement(LA, "EncryptedData", {
+ "xmlns": "http://www.w3.org/2001/04/xmlenc#",
+ "Type": "http://www.w3.org/2001/04/xmlenc#Element"
+ })
+ ET.SubElement(EncryptedData, "EncryptionMethod", {
+ "Algorithm": "http://www.w3.org/2001/04/xmlenc#aes128-cbc"
+ })
+
+ KeyInfo = ET.SubElement(EncryptedData, "KeyInfo", {
+ "xmlns": "http://www.w3.org/2000/09/xmldsig#"
+ })
+
+ EncryptedKey = ET.SubElement(KeyInfo, "EncryptedKey", {
+ "xmlns": "http://www.w3.org/2001/04/xmlenc#"
+ })
+ ET.SubElement(EncryptedKey, "EncryptionMethod", {
+ "Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256"
+ })
+
+ KeyInfoInner = ET.SubElement(EncryptedKey, "KeyInfo", {
+ "xmlns": "http://www.w3.org/2000/09/xmldsig#"
+ })
+ KeyName = ET.SubElement(KeyInfoInner, "KeyName")
+ KeyName.text = "WMRMServer"
+
+ WRMServerData = ET.SubElement(ET.SubElement(EncryptedKey, "CipherData"), "CipherValue")
+ WRMServerData.text = base64.b64encode(wrmserver_data).decode()
+
+ ClientData = ET.SubElement(ET.SubElement(EncryptedData, "CipherData"), "CipherValue")
+ ClientData.text = base64.b64encode(client_data).decode()
+
+ return LA
+
+ @staticmethod
+ def _SignedInfo(parent: ET.Element, digest_value: bytes) -> ET.Element:
+ SignedInfo = ET.SubElement(parent, "SignedInfo", {
+ "xmlns": "http://www.w3.org/2000/09/xmldsig#"
+ })
+ ET.SubElement(SignedInfo, "CanonicalizationMethod", {
+ "Algorithm": "http://www.w3.org/TR/2001/REC-xml-c14n-20010315"
+ })
+ ET.SubElement(SignedInfo, "SignatureMethod", {
+ "Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256"
+ })
+
+ Reference = ET.SubElement(SignedInfo, "Reference", {
+ "URI": "#SignedData"
+ })
+ ET.SubElement(Reference, "DigestMethod", {
+ "Algorithm": "http://schemas.microsoft.com/DRM/2007/03/protocols#sha256"
+ })
+ DigestValue = ET.SubElement(Reference, "DigestValue")
+ DigestValue.text = base64.b64encode(digest_value).decode()
+
+ return SignedInfo
+
+ @staticmethod
+ def AcquireLicenseMessage(
+ wrmheader: str,
+ protocol_version: int,
+ wrmserver_data: bytes,
+ client_data: bytes,
+ signing_key: ECCKey,
+ client_info: Optional[str] = None,
+ revocation_lists: Optional[List[UUID]] = None
+ ) -> ET.Element:
+ AcquireLicense = ET.Element("AcquireLicense", {
+ "xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols"
+ })
+
+ Challenge = ET.SubElement(ET.SubElement(AcquireLicense, "challenge"), "Challenge", {
+ "xmlns": "http://schemas.microsoft.com/DRM/2007/03/protocols/messages"
+ })
+
+ LA = XmlBuilder._LicenseAcquisition(Challenge, wrmheader, protocol_version, wrmserver_data, client_data, client_info, revocation_lists)
+
+ Signature = ET.SubElement(Challenge, "Signature", {
+ "xmlns": "http://www.w3.org/2000/09/xmldsig#"
+ })
+
+ la_xml = ET.tostring(
+ LA,
+ encoding="utf-8",
+ short_empty_elements=False
+ )
+ # I don't like this but re-serializing the WRMHEADER XML could change it
+ unescaped_la_xml = html.unescape(la_xml.decode())
+ la_digest = hashlib.sha256(unescaped_la_xml.encode()).digest()
+
+ SignedInfo = XmlBuilder._SignedInfo(Signature, la_digest)
+
+ signed_info_xml = ET.tostring(
+ SignedInfo,
+ encoding="utf-8",
+ short_empty_elements=False
+ )
+
+ SignatureValue = ET.SubElement(Signature, "SignatureValue")
+ SignatureValue.text = base64.b64encode(
+ Crypto.ecc256_sign(signing_key, signed_info_xml)
+ ).decode()
+
+ ECCKeyValue = ET.SubElement(
+ ET.SubElement(
+ ET.SubElement(
+ Signature, "KeyInfo", {
+ "xmlns": "http://www.w3.org/2000/09/xmldsig#"
+ }
+ ),
+ "KeyValue"
+ ), "ECCKeyValue"
+ )
+
+ PublicKey = ET.SubElement(ECCKeyValue, "PublicKey")
+ PublicKey.text = base64.b64encode(signing_key.public_bytes()).decode()
+
+ return AcquireLicense
+
+ @staticmethod
+ def ClientData(cert_chains: List[CertificateChain], ree_features: List[str]) -> str:
+ Data = ET.Element("Data")
+ CertificateChains = ET.SubElement(Data, "CertificateChains")
+
+ for cert_chain in cert_chains:
+ CertificateChainElement = ET.SubElement(CertificateChains, "CertificateChain")
+ CertificateChainElement.text = f" {base64.b64encode(cert_chain.dumps()).decode()} "
+
+ Features = ET.SubElement(Data, "Features")
+ ET.SubElement(Features, "Feature", {"Name": "AESCBC"})
+
+ REE = ET.SubElement(Features, "REE")
+ for ree_feature in ree_features:
+ ET.SubElement(REE, ree_feature)
+
+ return ET.tostring(
+ Data,
+ encoding="utf-8",
+ short_empty_elements=False
+ ).decode()
\ No newline at end of file
diff --git a/pyplayready/system/session.py b/pyplayready/system/session.py
index 59f0570..516860d 100644
--- a/pyplayready/system/session.py
+++ b/pyplayready/system/session.py
@@ -15,6 +15,3 @@ class Session:
self.signing_key: Optional[ECCKey] = None
self.encryption_key: Optional[ECCKey] = None
self.keys: list[Key] = []
-
-
-__all__ = ("Session",)
diff --git a/pyplayready/system/util.py b/pyplayready/system/util.py
new file mode 100644
index 0000000..a2f7879
--- /dev/null
+++ b/pyplayready/system/util.py
@@ -0,0 +1,20 @@
+import xml.etree.ElementTree as ET
+
+
+class Util:
+
+ @staticmethod
+ def remove_namespaces(element: ET.Element) -> None:
+ for elem in element.iter():
+ elem.tag = elem.tag.split('}')[-1]
+
+ @staticmethod
+ def un_pad(name: bytes) -> str:
+ return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
+
+ @staticmethod
+ def to_bytes(n: int) -> bytes:
+ byte_len = (n.bit_length() + 7) // 8
+ if byte_len % 2 != 0:
+ byte_len += 1
+ return n.to_bytes(byte_len, 'big')
diff --git a/pyplayready/system/wrmheader.py b/pyplayready/system/wrmheader.py
index 1b636d2..9323a80 100644
--- a/pyplayready/system/wrmheader.py
+++ b/pyplayready/system/wrmheader.py
@@ -1,26 +1,65 @@
import base64
+import hashlib
from enum import Enum
-from typing import Optional, List, Union, Tuple
+from typing import List, Optional, Union
+from uuid import UUID
+import xml.etree.ElementTree as ET
-import xmltodict
+from Crypto.Cipher import AES
+
+from pyplayready.misc.exceptions import InvalidWrmHeader, InvalidChecksum
+from pyplayready.system.util import Util
class WRMHeader:
"""Represents a PlayReady WRM Header"""
class SignedKeyID:
- def __init__(
- self,
- alg_id: str,
- value: str,
- checksum: str
- ):
- self.alg_id = alg_id
+ class AlgId(Enum):
+ AESCTR = "AESCTR"
+ AESCBC = "AESCBC"
+ COCKTAIL = "COCKTAIL"
+ UNKNOWN = "UNKNOWN"
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ def __init__(self, value: UUID, alg_id, checksum: Optional[bytes]):
self.value = value
+ self.alg_id = alg_id
self.checksum = checksum
+ @classmethod
+ def load(cls, value: str, alg_id: str, checksum: Optional[str]):
+ return cls(
+ value=UUID(bytes_le=base64.b64decode(value)),
+ alg_id=cls.AlgId(alg_id),
+ checksum=base64.b64decode(checksum) if checksum else None
+ )
+
def __repr__(self):
- return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")'
+ return f'SignedKeyID(value="{self.value}", alg_id={self.alg_id}, checksum={self.checksum})'
+
+ def verify(self, content_key: bytes) -> bool:
+ if self.value is None:
+ raise InvalidChecksum("Key ID must not be empty")
+ if self.checksum is None:
+ raise InvalidChecksum("Checksum must not be empty")
+
+ if self.alg_id == self.AlgId.AESCTR:
+ cipher = AES.new(content_key, mode=AES.MODE_ECB)
+ encrypted = cipher.encrypt(self.value.bytes_le)
+ checksum = encrypted[:8]
+ elif self.alg_id == self.AlgId.COCKTAIL:
+ buffer = content_key.ljust(21, b"\x00")
+ for _ in range(5):
+ buffer = hashlib.sha1(buffer).digest()
+ checksum = buffer[:7]
+ else:
+ raise InvalidChecksum("Algorithm ID must be either \"AESCTR\" or \"COCKTAIL\"")
+
+ return checksum == self.checksum
class Version(Enum):
VERSION_4_0_0_0 = "4.0.0.0"
@@ -33,13 +72,9 @@ class WRMHeader:
def _missing_(cls, value):
return cls.UNKNOWN
- _RETURN_STRUCTURE = Optional[Tuple[List[SignedKeyID], Optional[str], Optional[str], Optional[str]]]
-
def __init__(self, data: Union[str, bytes]):
- """Load a WRM Header from either a string, base64 encoded data or bytes"""
-
if not data:
- raise ValueError("Data must not be empty")
+ raise InvalidWrmHeader("Data must not be empty")
if isinstance(data, str):
try:
@@ -47,101 +82,105 @@ class WRMHeader:
except Exception:
data = data.encode("utf-16-le")
- self._raw_data: bytes = data
- self._parsed = xmltodict.parse(self._raw_data)
+ self._raw_data = data
+ self._root = ET.fromstring(data)
+ Util.remove_namespaces(self._root)
- self._header = self._parsed.get('WRMHEADER')
- if not self._header:
- raise ValueError("Data is not a valid WRMHEADER")
+ if self._root.tag != "WRMHEADER":
+ raise InvalidWrmHeader("Data is not a valid WRMHEADER")
- self.version = self.Version(self._header.get('@version'))
+ self.version = self.Version(self._root.attrib.get("version"))
- @staticmethod
- def _ensure_list(element: Union[dict, list]) -> List:
- if isinstance(element, dict):
- return [element]
- return element
-
- @staticmethod
- def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
- protect_info = data.get("PROTECTINFO")
-
- return (
- [WRMHeader.SignedKeyID(
- alg_id=protect_info["ALGID"],
- value=data["KID"],
- checksum=data.get("CHECKSUM")
- )],
- data.get("LA_URL"),
- data.get("LUI_URL"),
- data.get("DS_ID")
- )
-
- @staticmethod
- def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
- protect_info = data.get("PROTECTINFO")
-
- key_ids = []
- if protect_info:
- kid = protect_info["KID"]
- if kid:
- key_ids = [WRMHeader.SignedKeyID(
- alg_id=kid["@ALGID"],
- value=kid["@VALUE"],
- checksum=kid.get("@CHECKSUM")
- )]
-
- return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
-
- @staticmethod
- def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
- protect_info = data.get("PROTECTINFO")
-
- key_ids = []
- if protect_info:
- kids = protect_info["KIDS"]
- if kids:
- for kid in WRMHeader._ensure_list(kids["KID"]):
- key_ids.append(WRMHeader.SignedKeyID(
- alg_id=kid["@ALGID"],
- value=kid["@VALUE"],
- checksum=kid.get("@CHECKSUM")
- ))
-
- return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
-
- @staticmethod
- def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
- protect_info = data.get("PROTECTINFO")
-
- key_ids = []
- if protect_info:
- kids = protect_info["KIDS"]
- for kid in WRMHeader._ensure_list(kids["KID"]):
- key_ids.append(WRMHeader.SignedKeyID(
- alg_id=kid.get("@ALGID"),
- value=kid["@VALUE"],
- checksum=kid.get("@CHECKSUM")
- ))
-
- return key_ids, data.get("LA_URL"), data.get("LUI_URL"), data.get("DS_ID")
-
- def read_attributes(self) -> _RETURN_STRUCTURE:
- """Read any non-custom XML attributes"""
- data = self._header.get("DATA")
- if not data:
- raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
+ self.key_ids: List[WRMHeader.SignedKeyID] = []
+ self.la_url: Optional[str] = None
+ self.lui_url: Optional[str] = None
+ self.ds_id: Optional[str] = None
+ self.custom_attributes: Optional[ET.Element] = None
+ self.decryptor_setup: Optional[str] = None
if self.version == self.Version.VERSION_4_0_0_0:
- return self._read_v4_0_0_0(data)
+ self._load_v4_0_data(self._root)
elif self.version == self.Version.VERSION_4_1_0_0:
- return self._read_v4_1_0_0(data)
+ self._load_v4_1_data(self._root)
elif self.version == self.Version.VERSION_4_2_0_0:
- return self._read_v4_2_0_0(data)
+ self._load_v4_2_data(self._root)
elif self.version == self.Version.VERSION_4_3_0_0:
- return self._read_v4_3_0_0(data)
+ self._load_v4_3_data(self._root)
- return None
+ def __repr__(self):
+ attrs = ", \n ".join(f"{k}={v!r}" for k, v in self.__dict__.items())
+ return f"{self.__class__.__name__}({attrs})"
+
+ @staticmethod
+ def _attr(element, name):
+ return element.attrib.get(name) if element is not None else None
+
+ def _load_v4_0_data(self, parent: ET.Element):
+ Data = parent.find("DATA")
+
+ Kid = Data.findtext("KID")
+ AlgId = Data.findtext("PROTECTINFO/ALGID")
+ Checksum = Data.findtext("CHECKSUM")
+
+ self.key_ids = [self.SignedKeyID.load(Kid, AlgId, Checksum)]
+
+ self.la_url = Data.findtext("LA_URL")
+ self.lui_url = Data.findtext("LUI_URL")
+ self.ds_id = Data.findtext("DS_ID")
+
+ self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
+
+ def _load_v4_1_data(self, parent: ET.Element):
+ Data = parent.find("DATA")
+
+ Kid = Data.find("PROTECTINFO/KID")
+ if Kid is not None:
+ Value = Kid.get("VALUE")
+ AlgId = Kid.get("ALGID")
+ Checksum = Kid.get("CHECKSUM")
+
+ self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
+
+ self.la_url = Data.findtext("LA_URL")
+ self.lui_url = Data.findtext("LUI_URL")
+ self.ds_id = Data.findtext("DS_ID")
+
+ self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
+ self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
+
+ def _load_v4_2_data(self, parent: ET.Element):
+ Data = parent.find("DATA")
+
+ for kid in Data.findall("PROTECTINFO/KIDS/KID"):
+ Value = kid.get("VALUE")
+ AlgId = kid.get("ALGID")
+ Checksum = kid.get("CHECKSUM")
+
+ self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
+
+ self.la_url = Data.findtext("LA_URL")
+ self.lui_url = Data.findtext("LUI_URL")
+ self.ds_id = Data.findtext("DS_ID")
+
+ self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
+ self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
+
+ def _load_v4_3_data(self, parent: ET.Element):
+ Data = parent.find("DATA")
+
+ for kid in Data.findall("PROTECTINFO/KIDS/KID"):
+ Value = kid.get("VALUE")
+ AlgId = kid.get("ALGID")
+ Checksum = kid.get("CHECKSUM")
+
+ self.key_ids.append(self.SignedKeyID.load(Value, AlgId, Checksum))
+
+ self.la_url = Data.findtext("LA_URL")
+ self.lui_url = Data.findtext("LUI_URL")
+ self.ds_id = Data.findtext("DS_ID")
+
+ self.custom_attributes = Data.find("CUSTOMATTRIBUTES")
+ self.decryptor_setup = Data.findtext("DECRYPTORSETUP")
def dumps(self) -> str:
return self._raw_data.decode("utf-16-le")
diff --git a/pyproject.toml b/pyproject.toml
index 61f76d1..64f12b3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
-version = "0.6.3"
+version = "0.8.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"]
@@ -36,11 +36,10 @@ pycryptodome = "^3.21.0"
construct = "2.8.8"
ECPy = "^1.2.5"
click = "^8.1.7"
-xmltodict = "^0.14.2"
PyYAML = "^6.0.1"
-aiohttp = {version = "^3.9.1", optional = true}
+aiohttp = "^3.9.1"
cryptography = "^45.0.6"
-lxml = "^6.0.0"
+platformdirs = "^4.4.0"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"
diff --git a/requirements.txt b/requirements.txt
index 4027d4c..5655a6f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,9 @@
requests
pycryptodome
-ecpy
construct==2.8.8
+ecpy
click
PyYAML
aiohttp
-xmltodict
cryptography
-lxml
\ No newline at end of file
+platformdirs
\ No newline at end of file