diff --git a/routes/remote_device_pr.py b/routes/remote_device_pr.py index 1a6e4af..77c50e9 100644 --- a/routes/remote_device_pr.py +++ b/routes/remote_device_pr.py @@ -1,44 +1,75 @@ -import base64 +"""Module to handle the remote device PlayReady.""" -from flask import Blueprint, jsonify, request, current_app, Response +import base64 import os +from pathlib import Path +import re import yaml +from flask import Blueprint, jsonify, request, current_app, Response + from pyplayready.device import Device as PlayReadyDevice from pyplayready.cdm import Cdm as PlayReadyCDM from pyplayready import PSSH as PlayReadyPSSH from pyplayready.exceptions import ( InvalidSession, - TooManySessions, InvalidLicense, InvalidPssh, ) from custom_functions.database.user_db import fetch_username_by_api_key from custom_functions.decrypt.api_decrypt import is_base64 from custom_functions.user_checks.device_allowed import user_allowed_to_use_device -from pathlib import Path remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__) -with open(f"{os.getcwd()}/configs/config.yaml", "r") as file: +with open( + os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8" +) as file: config = yaml.safe_load(file) +def make_response(status, message, data=None, http_status=200): + """Make a response.""" + resp = {"status": status, "message": message} + if data is not None: + resp["data"] = data + return jsonify(resp), http_status + + +def check_required_fields(body, required_fields): + """Return a response tuple if a required field is missing, else None.""" + for field in required_fields: + if not body.get(field): + return make_response( + "Error", + f'Missing required field "{field}" in JSON body', + http_status=400, + ) + return None + + @remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"]) def remote_cdm_playready(): + """Handle the remote device PlayReady.""" if request.method == "GET": - return jsonify({"message": "OK"}) + return make_response( + "Success", + "OK", + http_status=200, + ) if request.method == "HEAD": response = Response(status=200) response.headers["Server"] = "playready serve" return response + return make_response("Failed", "Method not allowed", http_status=405) @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"]) def remote_cdm_playready_deviceinfo(): + """Handle the remote device PlayReady device info.""" base_name = config["default_pr_cdm"] - if not base_name.endswith(".prd"): - full_file_name = base_name + ".prd" - device = PlayReadyDevice.load(f"{os.getcwd()}/configs/CDMs/PR/{full_file_name}") + device = PlayReadyDevice.load( + os.path.join(os.getcwd(), "configs", "CDMs", "PR", base_name + ".prd") + ) cdm = PlayReadyCDM.from_device(device) return jsonify( { @@ -50,133 +81,141 @@ def remote_cdm_playready_deviceinfo(): ) +def sanitize_username(username): + """Sanitize the username.""" + return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower() + + @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/", methods=["GET"]) def remote_cdm_playready_deviceinfo_specific(device): - if request.method == "GET": - base_name = Path(device).with_suffix(".prd").name - api_key = request.headers["X-Secret-Key"] - username = fetch_username_by_api_key(api_key) - device = PlayReadyDevice.load( - f"{os.getcwd()}/configs/CDMs/{username}/PR/{base_name}" - ) - cdm = PlayReadyCDM.from_device(device) - return jsonify( - { - "security_level": cdm.security_level, - "host": f'{config["fqdn"]}/remotecdm/widevine', - "secret": f"{api_key}", - "device_name": Path(base_name).stem, - } + """Handle the remote device PlayReady device info specific.""" + base_name = Path(device).with_suffix(".prd").name + api_key = request.headers["X-Secret-Key"] + username = fetch_username_by_api_key(api_key) + if not username: + return jsonify({"message": "Invalid or missing API key."}), 403 + safe_username = sanitize_username(username) + device = PlayReadyDevice.load( + os.path.join( + os.getcwd(), + "configs", + "CDMs", + "users_uploaded", + safe_username, + "PR", + base_name, ) + ) + cdm = PlayReadyCDM.from_device(device) + return jsonify( + { + "security_level": cdm.security_level, + "host": f'{config["fqdn"]}/remotecdm/playready', + "secret": f"{api_key}", + "device_name": Path(base_name).stem, + } + ) @remotecdm_pr_bp.route("/remotecdm/playready//open", methods=["GET"]) def remote_cdm_playready_open(device): + """Handle the remote device PlayReady open.""" + unauthorized_msg = { + "message": f"Device '{device}' is not found or you are not authorized to use it." + } + + # Default device logic if str(device).lower() == config["default_pr_cdm"].lower(): pr_device = PlayReadyDevice.load( - f'{os.getcwd()}/configs/CDMs/PR/{config["default_pr_cdm"]}.prd' + os.path.join( + os.getcwd(), "configs", "CDMs", "PR", config["default_pr_cdm"] + ".prd" + ) ) cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) session_id = cdm.open() - return jsonify( + return make_response( + "Success", + "Successfully opened the PlayReady CDM session", { - "message": "Success", - "data": { + "session_id": session_id.hex(), + "device": {"security_level": cdm.security_level}, + }, + http_status=200, + ) + + # User device logic + api_key = request.headers.get("X-Secret-Key") + if api_key and str(device).lower() != config["default_pr_cdm"].lower(): + user = fetch_username_by_api_key(api_key=api_key) + safe_username = sanitize_username(user) + if user and user_allowed_to_use_device(device=device, username=user): + pr_device = PlayReadyDevice.load( + os.path.join( + os.getcwd(), + "configs", + "CDMs", + "users_uploaded", + safe_username, + "PR", + device + ".prd", + ) + ) + cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) + session_id = cdm.open() + return make_response( + "Success", + "Successfully opened the PlayReady CDM session", + { "session_id": session_id.hex(), "device": {"security_level": cdm.security_level}, }, - } - ) - if ( - request.headers["X-Secret-Key"] - and str(device).lower() != config["default_pr_cdm"].lower() - ): - api_key = request.headers["X-Secret-Key"] - user = fetch_username_by_api_key(api_key=api_key) - if user: - if user_allowed_to_use_device(device=device, username=user): - pr_device = PlayReadyDevice.load( - f"{os.getcwd()}/configs/CDMs/{user}/PR/{device}.prd" - ) - cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) - session_id = cdm.open() - return jsonify( - { - "message": "Success", - "data": { - "session_id": session_id.hex(), - "device": {"security_level": cdm.security_level}, - }, - } - ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - } - ), - 403, - ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - } - ), - 403, + http_status=200, ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - } - ), - 403, + return make_response("Failed", unauthorized_msg, http_status=403) + + return make_response("Failed", unauthorized_msg, http_status=403) + + +def get_cdm_or_error(device): + """Get the CDM or return an error response.""" + cdm = current_app.config.get("CDM") + if not cdm: + return make_response( + "Error", + f'No CDM session for "{device}" has been opened yet. No session to use', + http_status=400, ) + return cdm @remotecdm_pr_bp.route( "/remotecdm/playready//close/", methods=["GET"] ) def remote_cdm_playready_close(device, session_id): + """Handle the remote device PlayReady close.""" try: session_id = bytes.fromhex(session_id) - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "message": f'No CDM for "{device}" has been opened yet. No session to close' - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: cdm.close(session_id) except InvalidSession: - return ( - jsonify( - { - "message": f'Invalid session ID "{session_id.hex()}", it may have expired' - } - ), - 400, + return make_response( + "Error", + f'Invalid session ID "{session_id.hex()}", it may have expired', + http_status=400, ) - return ( - jsonify( - { - "message": f'Successfully closed Session "{session_id.hex()}".', - } - ), - 200, + return make_response( + "Success", + f'Successfully closed Session "{session_id.hex()}".', + http_status=200, ) - except Exception as e: - return ( - jsonify({"message": f'Failed to close Session "{session_id.hex()}".'}), - 400, + except Exception as error: + return make_response( + "Error", + f'Failed to close Session "{session_id.hex()}", {error}.', + http_status=400, ) @@ -184,18 +223,14 @@ def remote_cdm_playready_close(device, session_id): "/remotecdm/playready//get_license_challenge", methods=["POST"] ) def remote_cdm_playready_get_license_challenge(device): + """Handle the remote device PlayReady get license challenge.""" body = request.get_json() - for required_field in ("session_id", "init_data"): - if not body.get(required_field): - return ( - jsonify( - { - "message": f'Missing required field "{required_field}" in JSON body' - } - ), - 400, - ) - cdm = current_app.config["CDM"] + missing_field = check_required_fields(body, ("session_id", "init_data")) + if missing_field: + return missing_field + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm session_id = bytes.fromhex(body["session_id"]) init_data = body["init_data"] if not init_data.startswith("/parse_license", methods=["POST"]) def remote_cdm_playready_parse_license(device): + """Handle the remote device PlayReady parse license.""" body = request.get_json() - for required_field in ("license_message", "session_id"): - if not body.get(required_field): - return jsonify( - {"message": f'Missing required field "{required_field}" in JSON body'} - ) - cdm = current_app.config["CDM"] - if not cdm: - return jsonify( - { - "message": f"No Cdm session for {device} has been opened yet. No session to use." - } - ) + missing_field = check_required_fields(body, ("license_message", "session_id")) + if missing_field: + return missing_field + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm session_id = bytes.fromhex(body["session_id"]) license_message = body["license_message"] if is_base64(license_message): @@ -242,44 +285,56 @@ def remote_cdm_playready_parse_license(device): try: cdm.parse_license(session_id, license_message) except InvalidSession: - return jsonify( - { - "message": f"Invalid Session ID '{session_id.hex()}', it may have expired." - } + return make_response( + "Error", + f"Invalid Session ID '{session_id.hex()}', it may have expired.", + http_status=400, ) except InvalidLicense as e: - return jsonify({"message": f"Invalid License, {e}"}) + return make_response( + "Error", + f"Invalid License, {e}", + http_status=400, + ) except Exception as e: - return jsonify({"message": f"Error, {e}"}) - return jsonify( - {"message": "Successfully parsed and loaded the Keys from the License message"} + return make_response( + "Error", + f"Error, {e}", + http_status=400, + ) + return make_response( + "Success", + "Successfully parsed and loaded the Keys from the License message", + http_status=200, ) @remotecdm_pr_bp.route("/remotecdm/playready//get_keys", methods=["POST"]) def remote_cdm_playready_get_keys(device): + """Handle the remote device PlayReady get keys.""" body = request.get_json() - for required_field in ("session_id",): - if not body.get(required_field): - return jsonify( - {"message": f'Missing required field "{required_field}" in JSON body'} - ) + missing_field = check_required_fields(body, ("session_id",)) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) - cdm = current_app.config["CDM"] - if not cdm: - return jsonify( - {"message": f"Missing required field '{required_field}' in JSON body."} - ) + key_type = body.get("key_type", None) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: - keys = cdm.get_keys(session_id) + keys = cdm.get_keys(session_id, key_type) except InvalidSession: - return jsonify( - { - "message": f"Invalid Session ID '{session_id.hex()}', it may have expired." - } + return make_response( + "Error", + f"Invalid Session ID '{session_id.hex()}', it may have expired.", + http_status=400, + ) + except ValueError as error: + return make_response( + "Error", + f"The Key Type value '{key_type}' is invalid, {error}", + http_status=400, ) - except Exception as e: - return jsonify({"message": f"Error, {e}"}) keys_json = [ { "key_id": key.key_id.hex, @@ -289,5 +344,11 @@ def remote_cdm_playready_get_keys(device): "key_length": key.key_length, } for key in keys + if not key_type or key.type == key_type ] - return jsonify({"message": "success", "data": {"keys": keys_json}}) + return make_response( + "Success", + "Successfully got the Keys", + {"keys": keys_json}, + http_status=200, + ) diff --git a/routes/remote_device_wv.py b/routes/remote_device_wv.py index 977e780..565f34f 100644 --- a/routes/remote_device_wv.py +++ b/routes/remote_device_wv.py @@ -1,7 +1,12 @@ +"""Module to handle the remote device Widevine.""" + import os -from flask import Blueprint, jsonify, request, current_app, Response import base64 -from typing import Any, Optional, Union +import re +from pathlib import Path +import yaml +from flask import Blueprint, jsonify, request, current_app, Response + from google.protobuf.message import DecodeError from pywidevine.pssh import PSSH as widevinePSSH from pywidevine import __version__ @@ -14,24 +19,47 @@ from pywidevine.exceptions import ( InvalidLicenseType, InvalidSession, SignatureMismatch, - TooManySessions, ) -import yaml -from custom_functions.database.user_db import fetch_api_key, fetch_username_by_api_key +from custom_functions.database.user_db import fetch_username_by_api_key +from custom_functions.database.unified_db_ops import cache_to_db from custom_functions.user_checks.device_allowed import user_allowed_to_use_device -from pathlib import Path + remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__) -with open(f"{os.getcwd()}/configs/config.yaml", "r") as file: +with open( + os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8" +) as file: config = yaml.safe_load(file) +def make_response(status, message, data=None, http_status=200): + """Make a response.""" + resp = {"status": status, "message": message} + if data is not None: + resp["data"] = data + return jsonify(resp), http_status + + +def check_required_fields(body, required_fields): + """Return a response if a required field is missing, else None.""" + for field in required_fields: + if not body.get(field): + return make_response( + "Error", + f'Missing required field "{field}" in JSON body', + http_status=400, + ) + return None + @remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"]) def remote_cdm_widevine(): + """Handle the remote device Widevine.""" if request.method == "GET": - return jsonify( - {"status": 200, "message": f"{config['fqdn'].upper()} Remote Widevine CDM."} + return make_response( + "Success", + f"{config['fqdn'].upper()} Remote Widevine CDM.", + http_status=200, ) if request.method == "HEAD": response = Response(status=200) @@ -39,171 +67,168 @@ def remote_cdm_widevine(): f"https://github.com/devine-dl/pywidevine serve v{__version__}" ) return response + return make_response( + "Error", + "Invalid request method", + http_status=405, + ) @remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"]) def remote_cdm_widevine_deviceinfo(): - if request.method == "GET": - base_name = config["default_wv_cdm"] - if not base_name.endswith(".wvd"): - base_name = base_name + ".wvd" - device = widevineDevice.load(f"{os.getcwd()}/configs/CDMs/WV/{base_name}") - cdm = widevineCDM.from_device(device) - return jsonify( - { - "device_type": cdm.device_type.name, - "system_id": cdm.system_id, - "security_level": cdm.security_level, - "host": f'{config["fqdn"]}/remotecdm/widevine', - "secret": f'{config["remote_cdm_secret"]}', - "device_name": Path(base_name).stem, - } - ) + """Handle the remote device Widevine device info.""" + base_name = config["default_wv_cdm"] + if not base_name.endswith(".wvd"): + base_name = base_name + ".wvd" + device = widevineDevice.load( + os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name) + ) + cdm = widevineCDM.from_device(device) + return make_response( + "Success", + "Successfully got the Widevine CDM device info", + { + "device_type": cdm.device_type.name, + "system_id": cdm.system_id, + "security_level": cdm.security_level, + "host": f'{config["fqdn"]}/remotecdm/widevine', + "secret": f'{config["remote_cdm_secret"]}', + "device_name": Path(base_name).stem, + }, + http_status=200, + ) + + +def sanitize_username(username): + """Sanitize the username.""" + return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower() @remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/", methods=["GET"]) def remote_cdm_widevine_deviceinfo_specific(device): - if request.method == "GET": - base_name = Path(device).with_suffix(".wvd").name - api_key = request.headers["X-Secret-Key"] - username = fetch_username_by_api_key(api_key) - device = widevineDevice.load( - f"{os.getcwd()}/configs/CDMs/{username}/WV/{base_name}" + """Handle the remote device Widevine device info specific.""" + base_name = Path(device).with_suffix(".wvd").name + api_key = request.headers["X-Secret-Key"] + username = fetch_username_by_api_key(api_key) + safe_username = sanitize_username(username) + device = widevineDevice.load( + os.path.join( + os.getcwd(), + "configs", + "CDMs", + "users_uploaded", + safe_username, + "WV", + base_name, ) - cdm = widevineCDM.from_device(device) - return jsonify( - { - "device_type": cdm.device_type.name, - "system_id": cdm.system_id, - "security_level": cdm.security_level, - "host": f'{config["fqdn"]}/remotecdm/widevine', - "secret": f"{api_key}", - "device_name": Path(base_name).stem, - } + ) + cdm = widevineCDM.from_device(device) + return make_response( + "Success", + "Successfully got the Widevine CDM device info (by user)", + { + "device_type": cdm.device_type.name, + "system_id": cdm.system_id, + "security_level": cdm.security_level, + "host": f'{config["fqdn"]}/remotecdm/widevine', + "secret": f"{api_key}", + "device_name": Path(base_name).stem, + }, + http_status=200, + ) + + +def load_widevine_device(device_name, api_key=None): + """Load a Widevine device, either default or user-uploaded.""" + try: + if device_name.lower() == config["default_wv_cdm"].lower(): + path = os.path.join( + os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd" + ) + else: + if not api_key: + return None + username = fetch_username_by_api_key(api_key) + if not username or not user_allowed_to_use_device( + device=device_name, username=username + ): + return None + safe_username = sanitize_username(username) + path = os.path.join( + os.getcwd(), + "configs", + "CDMs", + "users_uploaded", + safe_username, + "WV", + device_name + ".wvd", + ) + return widevineDevice.load(path) + except (FileNotFoundError, ValueError): + return None + + +def get_cdm_or_error(device): + """Get the CDM or return an error response.""" + cdm = current_app.config.get("CDM") + if not cdm: + return make_response( + "Error", + f'No CDM session for "{device}" has been opened yet. No session to use', + http_status=400, ) + return cdm @remotecdm_wv_bp.route("/remotecdm/widevine//open", methods=["GET"]) def remote_cdm_widevine_open(device): - if str(device).lower() == config["default_wv_cdm"].lower(): - wv_device = widevineDevice.load( - f'{os.getcwd()}/configs/CDMs/WV/{config["default_wv_cdm"]}.wvd' - ) - cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) - session_id = cdm.open() - return ( - jsonify( - { - "status": 200, - "message": "Success", - "data": { - "session_id": session_id.hex(), - "device": { - "system_id": cdm.system_id, - "security_level": cdm.security_level, - }, - }, - } - ), - 200, - ) - if ( - request.headers["X-Secret-Key"] - and str(device).lower() != config["default_wv_cdm"].lower() - ): - api_key = request.headers["X-Secret-Key"] - user = fetch_username_by_api_key(api_key=api_key) - if user: - if user_allowed_to_use_device(device=device, username=user): - wv_device = widevineDevice.load( - f"{os.getcwd()}/configs/CDMs/{user}/WV/{device}.wvd" - ) - cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) - session_id = cdm.open() - return ( - jsonify( - { - "status": 200, - "message": "Success", - "data": { - "session_id": session_id.hex(), - "device": { - "system_id": cdm.system_id, - "security_level": cdm.security_level, - }, - }, - } - ), - 200, - ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - "status": 403, - } - ), - 403, - ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - "status": 403, - } - ), - 403, - ) - else: - return ( - jsonify( - { - "message": f"Device '{device}' is not found or you are not authorized to use it.", - "status": 403, - } - ), - 403, + """Handle the remote device Widevine open.""" + api_key = request.headers.get("X-Secret-Key") + wv_device = load_widevine_device(device, api_key) + if not wv_device: + return make_response( + "Error", + f"Device '{device}' is not found or you are not authorized to use it.", + http_status=403, ) + cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) + session_id = cdm.open() + return make_response( + "Success", + "Successfully opened the Widevine CDM session", + { + "session_id": session_id.hex(), + "device": { + "system_id": cdm.system_id, + "security_level": cdm.security_level, + }, + }, + http_status=200, + ) @remotecdm_wv_bp.route( "/remotecdm/widevine//close/", methods=["GET"] ) def remote_cdm_widevine_close(device, session_id): + """Handle the remote device Widevine close.""" session_id = bytes.fromhex(session_id) - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM for "{device}" has been opened yet. No session to close', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: cdm.close(session_id) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid session ID "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid session ID "{session_id.hex()}", it may have expired', + http_status=400, ) - return ( - jsonify( - { - "status": 200, - "message": f'Successfully closed Session "{session_id.hex()}".', - } - ), - 200, + + return make_response( + "Success", + f'Successfully closed Session "{session_id.hex()}".', + http_status=200, ) @@ -211,80 +236,44 @@ def remote_cdm_widevine_close(device, session_id): "/remotecdm/widevine//set_service_certificate", methods=["POST"] ) def remote_cdm_widevine_set_service_certificate(device): + """Handle the remote device Widevine set service certificate.""" body = request.get_json() - for required_field in ("session_id", "certificate"): - if required_field == "certificate": - has_field = ( - required_field in body - ) # it needs the key, but can be empty/null - else: - has_field = body.get(required_field) - if not has_field: - return ( - jsonify( - { - "status": 400, - "message": f'Missing required field "{required_field}" in JSON body', - } - ), - 400, - ) + missing_field = check_required_fields(body, ("session_id", "certificate")) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM session for "{device}" has been opened yet. No session to use', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm certificate = body["certificate"] try: provider_id = cdm.set_service_certificate(session_id, certificate) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid session id: "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid session id: "{session_id.hex()}", it may have expired', + http_status=400, ) except DecodeError as error: - return ( - jsonify( - {"status": 400, "message": f"Invalid Service Certificate, {error}"} - ), - 400, + return make_response( + "Error", + f"Invalid Service Certificate, {error}", + http_status=400, ) except SignatureMismatch: - return ( - jsonify( - { - "status": 400, - "message": "Signature Validation failed on the Service Certificate, rejecting", - } - ), - 400, + return make_response( + "Error", + "Signature Validation failed on the Service Certificate, rejecting", + http_status=400, ) - return ( - jsonify( - { - "status": 200, - "message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", - "data": { - "provider_id": provider_id, - }, - } - ), - 200, + return make_response( + "Success", + f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", + {"provider_id": provider_id}, + http_status=200, ) @@ -292,45 +281,25 @@ def remote_cdm_widevine_set_service_certificate(device): "/remotecdm/widevine//get_service_certificate", methods=["POST"] ) def remote_cdm_widevine_get_service_certificate(device): + """Handle the remote device Widevine get service certificate.""" body = request.get_json() - for required_field in ("session_id",): - if not body.get(required_field): - return ( - jsonify( - { - "status": 400, - "message": f'Missing required field "{required_field}" in JSON body', - } - ), - 400, - ) + missing_field = check_required_fields(body, ("session_id",)) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) - cdm = current_app.config["CDM"] - - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM session for "{device}" has been opened yet. No session to use', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: service_certificate = cdm.get_service_certificate(session_id) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid Session ID "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid Session ID "{session_id.hex()}", it may have expired', + http_status=400, ) if service_certificate: service_certificate_b64 = base64.b64encode( @@ -338,17 +307,11 @@ def remote_cdm_widevine_get_service_certificate(device): ).decode() else: service_certificate_b64 = None - return ( - jsonify( - { - "status": 200, - "message": "Successfully got the Service Certificate", - "data": { - "service_certificate": service_certificate_b64, - }, - } - ), - 200, + return make_response( + "Success", + "Successfully got the Service Certificate", + {"service_certificate": service_certificate_b64}, + http_status=200, ) @@ -357,42 +320,23 @@ def remote_cdm_widevine_get_service_certificate(device): methods=["POST"], ) def remote_cdm_widevine_get_license_challenge(device, license_type): + """Handle the remote device Widevine get license challenge.""" body = request.get_json() - for required_field in ("session_id", "init_data"): - if not body.get(required_field): - return ( - jsonify( - { - "status": 400, - "message": f'Missing required field "{required_field}" in JSON body', - } - ), - 400, - ) + missing_field = check_required_fields(body, ("session_id", "init_data")) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) privacy_mode = body.get("privacy_mode", True) - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM session for "{device}" has been opened yet. No session to use', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm if current_app.config.get("force_privacy_mode"): privacy_mode = True if not cdm.get_service_certificate(session_id): - return ( - jsonify( - { - "status": 403, - "message": "No Service Certificate set but Privacy Mode is Enforced.", - } - ), - 403, + return make_response( + "Error", + "No Service Certificate set but Privacy Mode is Enforced.", + http_status=403, ) current_app.config["pssh"] = body["init_data"] @@ -406,97 +350,72 @@ def remote_cdm_widevine_get_license_challenge(device, license_type): privacy_mode=privacy_mode, ) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid Session ID "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid Session ID "{session_id.hex()}", it may have expired', + http_status=400, ) except InvalidInitData as error: - return jsonify({"status": 400, "message": f"Invalid Init Data, {error}"}), 400 - except InvalidLicenseType: - return ( - jsonify({"status": 400, "message": f"Invalid License Type {license_type}"}), - 400, + return make_response( + "Error", + f"Invalid Init Data, {error}", + http_status=400, ) - return ( - jsonify( - { - "status": 200, - "message": "Success", - "data": {"challenge_b64": base64.b64encode(license_request).decode()}, - } - ), - 200, + except InvalidLicenseType: + return make_response( + "Error", + f"Invalid License Type {license_type}", + http_status=400, + ) + return make_response( + "Success", + "Successfully got the License Challenge", + {"challenge_b64": base64.b64encode(license_request).decode()}, + http_status=200, ) @remotecdm_wv_bp.route("/remotecdm/widevine//parse_license", methods=["POST"]) def remote_cdm_widevine_parse_license(device): + """Handle the remote device Widevine parse license.""" body = request.get_json() - for required_field in ("session_id", "license_message"): - if not body.get(required_field): - return ( - jsonify( - { - "status": 400, - "message": f'Missing required field "{required_field}" in JSON body', - } - ), - 400, - ) + missing_field = check_required_fields(body, ("session_id", "license_message")) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM session for "{device}" has been opened yet. No session to use', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: cdm.parse_license(session_id, body["license_message"]) except InvalidLicenseMessage as error: - return ( - jsonify({"status": 400, "message": f"Invalid License Message, {error}"}), - 400, + return make_response( + "Error", + f"Invalid License Message, {error}", + http_status=400, ) except InvalidContext as error: - return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400 + return make_response( + "Error", + f"Invalid Context, {error}", + http_status=400, + ) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid Session ID "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid Session ID "{session_id.hex()}", it may have expired', + http_status=400, ) except SignatureMismatch: - return ( - jsonify( - { - "status": 400, - "message": f"Signature Validation failed on the License Message, rejecting.", - } - ), - 400, + return make_response( + "Error", + "Signature Validation failed on the License Message, rejecting.", + http_status=400, ) - return ( - jsonify( - { - "status": 200, - "message": "Successfully parsed and loaded the Keys from the License message.", - } - ), - 200, + return make_response( + "Success", + "Successfully parsed and loaded the Keys from the License message.", + http_status=200, ) @@ -504,54 +423,30 @@ def remote_cdm_widevine_parse_license(device): "/remotecdm/widevine//get_keys/", methods=["POST"] ) def remote_cdm_widevine_get_keys(device, key_type): + """Handle the remote device Widevine get keys.""" body = request.get_json() - for required_field in ("session_id",): - if not body.get(required_field): - return ( - jsonify( - { - "status": 400, - "message": f'Missing required field "{required_field}" in JSON body', - } - ), - 400, - ) + missing_field = check_required_fields(body, ("session_id",)) + if missing_field: + return missing_field session_id = bytes.fromhex(body["session_id"]) - key_type: Optional[str] = key_type if key_type == "ALL": key_type = None - cdm = current_app.config["CDM"] - if not cdm: - return ( - jsonify( - { - "status": 400, - "message": f'No CDM session for "{device}" has been opened yet. No session to use', - } - ), - 400, - ) + cdm = get_cdm_or_error(device) + if isinstance(cdm, tuple): # error response + return cdm try: keys = cdm.get_keys(session_id, key_type) except InvalidSession: - return ( - jsonify( - { - "status": 400, - "message": f'Invalid Session ID "{session_id.hex()}", it may have expired', - } - ), - 400, + return make_response( + "Error", + f'Invalid Session ID "{session_id.hex()}", it may have expired', + http_status=400, ) except ValueError as error: - return ( - jsonify( - { - "status": 400, - "message": f'The Key Type value "{key_type}" is invalid, {error}', - } - ), - 400, + return make_response( + "Error", + f'The Key Type value "{key_type}" is invalid, {error}', + http_status=400, ) keys_json = [ { @@ -564,10 +459,6 @@ def remote_cdm_widevine_get_keys(device, key_type): if not key_type or key.type == key_type ] for entry in keys_json: - if config["database_type"].lower() != "mariadb": - from custom_functions.database.cache_to_db_sqlite import cache_to_db - elif config["database_type"].lower() == "mariadb": - from custom_functions.database.cache_to_db_mariadb import cache_to_db if entry["type"] != "SIGNING": cache_to_db( pssh=str(current_app.config["pssh"]), @@ -575,7 +466,9 @@ def remote_cdm_widevine_get_keys(device, key_type): key=entry["key"], ) - return ( - jsonify({"status": 200, "message": "Success", "data": {"keys": keys_json}}), - 200, + return make_response( + "Success", + "Successfully got the Keys", + {"keys": keys_json}, + http_status=200, )