Merge pull request 'feat: add automatic session expiry to prevent stale session accumulation' (#5) from Sp4rky/pyplayready:feat/session-expiry into main

Reviewed-on: https://git.gay/ready-dl/pyplayready/pulls/5
This commit is contained in:
DevLARLEY 2026-02-26 22:00:52 +01:00 committed by git.gay
commit 9710f364a7
No known key found for this signature in database
GPG Key ID: BDA2A7586B5E1432
2 changed files with 12 additions and 0 deletions

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import time
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import List, Union, Optional from typing import List, Union, Optional
from uuid import UUID from uuid import UUID
@ -24,6 +25,7 @@ from pyplayready.system.wrmheader import WRMHeader
class Cdm: class Cdm:
MAX_NUM_OF_SESSIONS = 16 MAX_NUM_OF_SESSIONS = 16
SESSION_TIMEOUT = 30
def __init__( def __init__(
self, self,
@ -59,6 +61,14 @@ class Cdm:
def open(self) -> bytes: def open(self) -> bytes:
"""Open a Playready Content Decryption Module (CDM) session""" """Open a Playready Content Decryption Module (CDM) session"""
now = time.time()
expired = [
session_id for session_id, session in self.__sessions.items()
if (now - session.opened_at) > self.SESSION_TIMEOUT
]
for session_id in expired:
del self.__sessions[session_id]
if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS: if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).") raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")

View File

@ -1,3 +1,4 @@
import time
from typing import Optional from typing import Optional
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
@ -15,3 +16,4 @@ class Session:
self.signing_key: Optional[ECCKey] = None self.signing_key: Optional[ECCKey] = None
self.encryption_key: Optional[ECCKey] = None self.encryption_key: Optional[ECCKey] = None
self.keys: list[Key] = [] self.keys: list[Key] = []
self.opened_at: float = time.time()