"""Module to handle the remote device PlayReady.""" import base64 import os from pathlib import Path import re import yaml from flask import Blueprint, jsonify, request, current_app, Response from pyplayready.device import Device as PlayReadyDevice from pyplayready.cdm import Cdm as PlayReadyCDM from pyplayready import PSSH as PlayReadyPSSH from pyplayready.exceptions import ( InvalidSession, InvalidLicense, InvalidPssh, ) from custom_functions.database.user_db import fetch_username_by_api_key from custom_functions.decrypt.api_decrypt import is_base64 from custom_functions.user_checks.device_allowed import user_allowed_to_use_device remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__) with open( os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8" ) as file: config = yaml.safe_load(file) def make_response(status, message, data=None, http_status=200): """Make a response.""" resp = {"status": status, "message": message} if data is not None: resp["data"] = data return jsonify(resp), http_status def check_required_fields(body, required_fields): """Return a response tuple if a required field is missing, else None.""" for field in required_fields: if not body.get(field): return make_response( "Error", f'Missing required field "{field}" in JSON body', http_status=400, ) return None @remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"]) def remote_cdm_playready(): """Handle the remote device PlayReady.""" if request.method == "GET": return make_response( "Success", "OK", http_status=200, ) if request.method == "HEAD": response = Response(status=200) response.headers["Server"] = "playready serve" return response return make_response("Failed", "Method not allowed", http_status=405) @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"]) def remote_cdm_playready_deviceinfo(): """Handle the remote device PlayReady device info.""" base_name = config["default_pr_cdm"] device = PlayReadyDevice.load( os.path.join(os.getcwd(), "configs", "CDMs", "PR", base_name + ".prd") ) cdm = PlayReadyCDM.from_device(device) return jsonify( { "security_level": cdm.security_level, "host": f'{config["fqdn"]}/remotecdm/playready', "secret": f'{config["remote_cdm_secret"]}', "device_name": Path(base_name).stem, } ) def sanitize_username(username): """Sanitize the username.""" return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower() @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/", methods=["GET"]) def remote_cdm_playready_deviceinfo_specific(device): """Handle the remote device PlayReady device info specific.""" base_name = Path(device).with_suffix(".prd").name api_key = request.headers["X-Secret-Key"] username = fetch_username_by_api_key(api_key) if not username: return jsonify({"message": "Invalid or missing API key."}), 403 safe_username = sanitize_username(username) device = PlayReadyDevice.load( os.path.join( os.getcwd(), "configs", "CDMs", "users_uploaded", safe_username, "PR", base_name, ) ) cdm = PlayReadyCDM.from_device(device) return jsonify( { "security_level": cdm.security_level, "host": f'{config["fqdn"]}/remotecdm/playready', "secret": f"{api_key}", "device_name": Path(base_name).stem, } ) @remotecdm_pr_bp.route("/remotecdm/playready//open", methods=["GET"]) def remote_cdm_playready_open(device): """Handle the remote device PlayReady open.""" unauthorized_msg = { "message": f"Device '{device}' is not found or you are not authorized to use it." } # Default device logic if str(device).lower() == config["default_pr_cdm"].lower(): pr_device = PlayReadyDevice.load( os.path.join( os.getcwd(), "configs", "CDMs", "PR", config["default_pr_cdm"] + ".prd" ) ) cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) session_id = cdm.open() return make_response( "Success", "Successfully opened the PlayReady CDM session", { "session_id": session_id.hex(), "device": {"security_level": cdm.security_level}, }, http_status=200, ) # User device logic api_key = request.headers.get("X-Secret-Key") if api_key and str(device).lower() != config["default_pr_cdm"].lower(): user = fetch_username_by_api_key(api_key=api_key) safe_username = sanitize_username(user) if user and user_allowed_to_use_device(device=device, username=user): pr_device = PlayReadyDevice.load( os.path.join( os.getcwd(), "configs", "CDMs", "users_uploaded", safe_username, "PR", device + ".prd", ) ) cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) session_id = cdm.open() return make_response( "Success", "Successfully opened the PlayReady CDM session", { "session_id": session_id.hex(), "device": {"security_level": cdm.security_level}, }, http_status=200, ) return make_response("Failed", unauthorized_msg, http_status=403) return make_response("Failed", unauthorized_msg, http_status=403) def get_cdm_or_error(device): """Get the CDM or return an error response.""" cdm = current_app.config.get("CDM") if not cdm: return make_response( "Error", f'No CDM session for "{device}" has been opened yet. No session to use', http_status=400, ) return cdm @remotecdm_pr_bp.route( "/remotecdm/playready//close/", methods=["GET"] ) def remote_cdm_playready_close(device, session_id): """Handle the remote device PlayReady close.""" try: session_id = bytes.fromhex(session_id) cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm try: cdm.close(session_id) except InvalidSession: return make_response( "Error", f'Invalid session ID "{session_id.hex()}", it may have expired', http_status=400, ) return make_response( "Success", f'Successfully closed Session "{session_id.hex()}".', http_status=200, ) except Exception as error: return make_response( "Error", f'Failed to close Session "{session_id.hex()}", {error}.', http_status=400, ) @remotecdm_pr_bp.route( "/remotecdm/playready//get_license_challenge", methods=["POST"] ) def remote_cdm_playready_get_license_challenge(device): """Handle the remote device PlayReady get license challenge.""" body = request.get_json() missing_field = check_required_fields(body, ("session_id", "init_data")) if missing_field: return missing_field cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm session_id = bytes.fromhex(body["session_id"]) init_data = body["init_data"] if not init_data.startswith("/parse_license", methods=["POST"]) def remote_cdm_playready_parse_license(device): """Handle the remote device PlayReady parse license.""" body = request.get_json() missing_field = check_required_fields(body, ("license_message", "session_id")) if missing_field: return missing_field cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm session_id = bytes.fromhex(body["session_id"]) license_message = body["license_message"] if is_base64(license_message): license_message = base64.b64decode(license_message).decode("utf-8") try: cdm.parse_license(session_id, license_message) except InvalidSession: return make_response( "Error", f"Invalid Session ID '{session_id.hex()}', it may have expired.", http_status=400, ) except InvalidLicense as e: return make_response( "Error", f"Invalid License, {e}", http_status=400, ) except Exception as e: return make_response( "Error", f"Error, {e}", http_status=400, ) return make_response( "Success", "Successfully parsed and loaded the Keys from the License message", http_status=200, ) @remotecdm_pr_bp.route("/remotecdm/playready//get_keys", methods=["POST"]) def remote_cdm_playready_get_keys(device): """Handle the remote device PlayReady get keys.""" body = request.get_json() missing_field = check_required_fields(body, ("session_id",)) if missing_field: return missing_field session_id = bytes.fromhex(body["session_id"]) key_type = body.get("key_type", None) cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm try: keys = cdm.get_keys(session_id, key_type) except InvalidSession: return make_response( "Error", f"Invalid Session ID '{session_id.hex()}', it may have expired.", http_status=400, ) except ValueError as error: return make_response( "Error", f"The Key Type value '{key_type}' is invalid, {error}", http_status=400, ) keys_json = [ { "key_id": key.key_id.hex, "key": key.key.hex(), "type": key.key_type.value, "cipher_type": key.cipher_type.value, "key_length": key.key_length, } for key in keys if not key_type or key.type == key_type ] return make_response( "Success", "Successfully got the Keys", {"keys": keys_json}, http_status=200, )