159 lines
5.6 KiB
Python
159 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
import requests
|
|
|
|
from .cdm import Cdm
|
|
from .device import Device
|
|
from .key import Key
|
|
|
|
from .exceptions import (DeviceMismatch, InvalidInitData)
|
|
|
|
|
|
class RemoteCdm(Cdm):
|
|
"""Remote Accessible CDM using pyplayready's serve schema."""
|
|
|
|
def __init__(
|
|
self,
|
|
security_level: int,
|
|
host: str,
|
|
secret: str,
|
|
device_name: str
|
|
):
|
|
"""Initialize a Playready Content Decryption Module (CDM)."""
|
|
if not security_level:
|
|
raise ValueError("Security Level must be provided")
|
|
if not isinstance(security_level, int):
|
|
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
|
|
|
if not host:
|
|
raise ValueError("API Host must be provided")
|
|
if not isinstance(host, str):
|
|
raise TypeError(f"Expected host to be a {str} not {host!r}")
|
|
|
|
if not secret:
|
|
raise ValueError("API Secret must be provided")
|
|
if not isinstance(secret, str):
|
|
raise TypeError(f"Expected secret to be a {str} not {secret!r}")
|
|
|
|
if not device_name:
|
|
raise ValueError("API Device name must be provided")
|
|
if not isinstance(device_name, str):
|
|
raise TypeError(f"Expected device_name to be a {str} not {device_name!r}")
|
|
|
|
self.security_level = security_level
|
|
self.host = host
|
|
self.device_name = device_name
|
|
|
|
# spoof certificate_chain and ecc_key just so we can construct via super call
|
|
super().__init__(security_level, None, None, None)
|
|
|
|
self.__session = requests.Session()
|
|
self.__session.headers.update({
|
|
"X-Secret-Key": secret
|
|
})
|
|
|
|
r = requests.head(self.host)
|
|
if r.status_code != 200:
|
|
raise ValueError(f"Could not test Remote API version [{r.status_code}]")
|
|
server = r.headers.get("Server")
|
|
if not server or "pyplayready serve" not in server.lower():
|
|
raise ValueError(f"This Remote CDM API does not seem to be a pyplayready serve API ({server}).")
|
|
server_version_re = re.search(r"pyplayready serve v([\d.]+)", server, re.IGNORECASE)
|
|
if not server_version_re:
|
|
raise ValueError("The pyplayready server API is not stating the version correctly, cannot continue.")
|
|
server_version = server_version_re.group(1)
|
|
if server_version < "0.3.1":
|
|
raise ValueError(f"This pyplayready serve API version ({server_version}) is not supported.")
|
|
|
|
@classmethod
|
|
def from_device(cls, device: Device) -> RemoteCdm:
|
|
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
|
|
|
|
def open(self) -> bytes:
|
|
r = self.__session.get(
|
|
url=f"{self.host}/{self.device_name}/open"
|
|
).json()
|
|
|
|
if r['status'] != 200:
|
|
raise ValueError(f"Cannot Open CDM Session, {r['message']} [{r['status']}]")
|
|
r = r["data"]
|
|
|
|
if int(r["device"]["security_level"]) != self.security_level:
|
|
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
|
|
|
|
return bytes.fromhex(r["session_id"])
|
|
|
|
def close(self, session_id: bytes) -> None:
|
|
r = self.__session.get(
|
|
url=f"{self.host}/{self.device_name}/close/{session_id.hex()}"
|
|
).json()
|
|
if r["status"] != 200:
|
|
raise ValueError(f"Cannot Close CDM Session, {r['message']} [{r['status']}]")
|
|
|
|
def get_license_challenge(
|
|
self,
|
|
session_id: bytes,
|
|
wrm_header: str,
|
|
) -> str:
|
|
if not wrm_header:
|
|
raise InvalidInitData("A wrm_header must be provided.")
|
|
if not isinstance(wrm_header, str):
|
|
raise InvalidInitData(f"Expected wrm_header to be a {str}, not {wrm_header!r}")
|
|
|
|
r = self.__session.post(
|
|
url=f"{self.host}/{self.device_name}/get_license_challenge",
|
|
json={
|
|
"session_id": session_id.hex(),
|
|
"init_data": wrm_header,
|
|
}
|
|
).json()
|
|
if r["status"] != 200:
|
|
raise ValueError(f"Cannot get Challenge, {r['message']} [{r['status']}]")
|
|
r = r["data"]
|
|
|
|
return r["challenge"]
|
|
|
|
def parse_license(self, session_id: bytes, license_message: str) -> None:
|
|
if not license_message:
|
|
raise Exception("Cannot parse an empty license_message")
|
|
|
|
if not isinstance(license_message, str):
|
|
raise Exception(f"Expected license_message to be a {str}, not {license_message!r}")
|
|
|
|
r = self.__session.post(
|
|
url=f"{self.host}/{self.device_name}/parse_license",
|
|
json={
|
|
"session_id": session_id.hex(),
|
|
"license_message": license_message
|
|
}
|
|
).json()
|
|
if r["status"] != 200:
|
|
raise ValueError(f"Cannot parse License, {r['message']} [{r['status']}]")
|
|
|
|
def get_keys(self, session_id: bytes) -> list[Key]:
|
|
r = self.__session.post(
|
|
url=f"{self.host}/{self.device_name}/get_keys",
|
|
json={
|
|
"session_id": session_id.hex()
|
|
}
|
|
).json()
|
|
if r["status"] != 200:
|
|
raise ValueError(f"Could not get Keys, {r['message']} [{r['status']}]")
|
|
r = r["data"]
|
|
|
|
return [
|
|
Key(
|
|
key_type=key["type"],
|
|
key_id=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
|
|
key=bytes.fromhex(key["key"]),
|
|
cipher_type=key["cipher_type"],
|
|
key_length=key["key_length"]
|
|
)
|
|
for key in r["keys"]
|
|
]
|
|
|
|
|
|
__all__ = ("RemoteCdm",)
|