RemoteCdm: Remove all uses of Session()

This is now possible because everything relating to an underlying session is now finally fully remote thanks to the changes surrounding the new get_keys() method.

Any client code still getting keys by accessing `_sessions` manually should be updated to use the get_keys() method.
This commit is contained in:
rlaphoenix 2022-08-06 10:08:28 +01:00
parent 665b77bd24
commit 2179987986

View File

@ -10,13 +10,11 @@ from Crypto.PublicKey import RSA
from google.protobuf.message import DecodeError
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.exceptions import InvalidSession, InvalidInitData, InvalidLicenseType, TooManySessions, \
InvalidLicenseMessage, DeviceMismatch
from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch
from pywidevine.key import Key
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
from pywidevine.pssh import PSSH
from pywidevine.session import Session
class RemoteCdm(Cdm):
@ -96,37 +94,25 @@ class RemoteCdm(Cdm):
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
def open(self) -> bytes:
if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
r = self.__session.get(f"{self.host}/{self.device_name}/open")
if r.status_code != 200:
raise ValueError(f"Cannot Open CDM Session, {r.text} [{r.status_code}]")
r = r.json()["data"]
session = Session()
session.id = bytes.fromhex(r["session_id"])
self._sessions[session.id] = session
if int(r["device"]["system_id"]) != self.system_id:
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
if int(r["device"]["security_level"]) != self.security_level:
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
return session.id
return bytes.fromhex(r["session_id"])
def close(self, session_id: bytes) -> None:
r = self.__session.get(f"{self.host}/{self.device_name}/close/{session_id.hex()}")
if r.status_code != 200:
raise ValueError(f"Cannot Close CDM Session, {r.text} [{r.status_code}]")
del self._sessions[session_id]
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if certificate is None:
certificate_b64 = None
elif isinstance(certificate, str):
@ -156,10 +142,6 @@ class RemoteCdm(Cdm):
type_: Union[int, str] = LicenseType.STREAMING,
privacy_mode: bool = True
) -> bytes:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if not pssh:
raise InvalidInitData("A pssh must be provided.")
if not isinstance(pssh, PSSH):
@ -197,10 +179,6 @@ class RemoteCdm(Cdm):
return license_message.SerializeToString()
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if not license_message:
raise InvalidLicenseMessage("Cannot parse an empty license_message")