Refactor remote device handling in PlayReady and Widevine modules to improve path management, enhance error handling, and implement consistent response formatting. Add module docstrings for better documentation and clarity.

This commit is contained in:
voldemort 2025-07-23 18:00:01 +07:00
parent 7f84542cfb
commit 7f9f04d829
2 changed files with 515 additions and 561 deletions

View File

@ -1,44 +1,75 @@
import base64
"""Module to handle the remote device PlayReady."""
from flask import Blueprint, jsonify, request, current_app, Response
import base64
import os
from pathlib import Path
import re
import yaml
from flask import Blueprint, jsonify, request, current_app, Response
from pyplayready.device import Device as PlayReadyDevice
from pyplayready.cdm import Cdm as PlayReadyCDM
from pyplayready import PSSH as PlayReadyPSSH
from pyplayready.exceptions import (
InvalidSession,
TooManySessions,
InvalidLicense,
InvalidPssh,
)
from custom_functions.database.user_db import fetch_username_by_api_key
from custom_functions.decrypt.api_decrypt import is_base64
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
from pathlib import Path
remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__)
with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
with open(
os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
) as file:
config = yaml.safe_load(file)
def make_response(status, message, data=None, http_status=200):
"""Make a response."""
resp = {"status": status, "message": message}
if data is not None:
resp["data"] = data
return jsonify(resp), http_status
def check_required_fields(body, required_fields):
"""Return a response tuple if a required field is missing, else None."""
for field in required_fields:
if not body.get(field):
return make_response(
"Error",
f'Missing required field "{field}" in JSON body',
http_status=400,
)
return None
@remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"])
def remote_cdm_playready():
"""Handle the remote device PlayReady."""
if request.method == "GET":
return jsonify({"message": "OK"})
return make_response(
"Success",
"OK",
http_status=200,
)
if request.method == "HEAD":
response = Response(status=200)
response.headers["Server"] = "playready serve"
return response
return make_response("Failed", "Method not allowed", http_status=405)
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"])
def remote_cdm_playready_deviceinfo():
"""Handle the remote device PlayReady device info."""
base_name = config["default_pr_cdm"]
if not base_name.endswith(".prd"):
full_file_name = base_name + ".prd"
device = PlayReadyDevice.load(f"{os.getcwd()}/configs/CDMs/PR/{full_file_name}")
device = PlayReadyDevice.load(
os.path.join(os.getcwd(), "configs", "CDMs", "PR", base_name + ".prd")
)
cdm = PlayReadyCDM.from_device(device)
return jsonify(
{
@ -50,133 +81,141 @@ def remote_cdm_playready_deviceinfo():
)
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/<device>", methods=["GET"])
def remote_cdm_playready_deviceinfo_specific(device):
if request.method == "GET":
base_name = Path(device).with_suffix(".prd").name
api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key)
device = PlayReadyDevice.load(
f"{os.getcwd()}/configs/CDMs/{username}/PR/{base_name}"
)
cdm = PlayReadyCDM.from_device(device)
return jsonify(
{
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}",
"device_name": Path(base_name).stem,
}
"""Handle the remote device PlayReady device info specific."""
base_name = Path(device).with_suffix(".prd").name
api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key)
if not username:
return jsonify({"message": "Invalid or missing API key."}), 403
safe_username = sanitize_username(username)
device = PlayReadyDevice.load(
os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"PR",
base_name,
)
)
cdm = PlayReadyCDM.from_device(device)
return jsonify(
{
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/playready',
"secret": f"{api_key}",
"device_name": Path(base_name).stem,
}
)
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/open", methods=["GET"])
def remote_cdm_playready_open(device):
"""Handle the remote device PlayReady open."""
unauthorized_msg = {
"message": f"Device '{device}' is not found or you are not authorized to use it."
}
# Default device logic
if str(device).lower() == config["default_pr_cdm"].lower():
pr_device = PlayReadyDevice.load(
f'{os.getcwd()}/configs/CDMs/PR/{config["default_pr_cdm"]}.prd'
os.path.join(
os.getcwd(), "configs", "CDMs", "PR", config["default_pr_cdm"] + ".prd"
)
)
cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
session_id = cdm.open()
return jsonify(
return make_response(
"Success",
"Successfully opened the PlayReady CDM session",
{
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {"security_level": cdm.security_level},
},
http_status=200,
)
# User device logic
api_key = request.headers.get("X-Secret-Key")
if api_key and str(device).lower() != config["default_pr_cdm"].lower():
user = fetch_username_by_api_key(api_key=api_key)
safe_username = sanitize_username(user)
if user and user_allowed_to_use_device(device=device, username=user):
pr_device = PlayReadyDevice.load(
os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"PR",
device + ".prd",
)
)
cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
session_id = cdm.open()
return make_response(
"Success",
"Successfully opened the PlayReady CDM session",
{
"session_id": session_id.hex(),
"device": {"security_level": cdm.security_level},
},
}
)
if (
request.headers["X-Secret-Key"]
and str(device).lower() != config["default_pr_cdm"].lower()
):
api_key = request.headers["X-Secret-Key"]
user = fetch_username_by_api_key(api_key=api_key)
if user:
if user_allowed_to_use_device(device=device, username=user):
pr_device = PlayReadyDevice.load(
f"{os.getcwd()}/configs/CDMs/{user}/PR/{device}.prd"
)
cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
session_id = cdm.open()
return jsonify(
{
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {"security_level": cdm.security_level},
},
}
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
}
),
403,
http_status=200,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
}
),
403,
return make_response("Failed", unauthorized_msg, http_status=403)
return make_response("Failed", unauthorized_msg, http_status=403)
def get_cdm_or_error(device):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_pr_bp.route(
"/remotecdm/playready/<device>/close/<session_id>", methods=["GET"]
)
def remote_cdm_playready_close(device, session_id):
"""Handle the remote device PlayReady close."""
try:
session_id = bytes.fromhex(session_id)
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"message": f'No CDM for "{device}" has been opened yet. No session to close'
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
cdm.close(session_id)
except InvalidSession:
return (
jsonify(
{
"message": f'Invalid session ID "{session_id.hex()}", it may have expired'
}
),
400,
return make_response(
"Error",
f'Invalid session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
return (
jsonify(
{
"message": f'Successfully closed Session "{session_id.hex()}".',
}
),
200,
return make_response(
"Success",
f'Successfully closed Session "{session_id.hex()}".',
http_status=200,
)
except Exception as e:
return (
jsonify({"message": f'Failed to close Session "{session_id.hex()}".'}),
400,
except Exception as error:
return make_response(
"Error",
f'Failed to close Session "{session_id.hex()}", {error}.',
http_status=400,
)
@ -184,18 +223,14 @@ def remote_cdm_playready_close(device, session_id):
"/remotecdm/playready/<device>/get_license_challenge", methods=["POST"]
)
def remote_cdm_playready_get_license_challenge(device):
"""Handle the remote device PlayReady get license challenge."""
body = request.get_json()
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
return (
jsonify(
{
"message": f'Missing required field "{required_field}" in JSON body'
}
),
400,
)
cdm = current_app.config["CDM"]
missing_field = check_required_fields(body, ("session_id", "init_data"))
if missing_field:
return missing_field
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
session_id = bytes.fromhex(body["session_id"])
init_data = body["init_data"]
if not init_data.startswith("<WRMHEADER"):
@ -203,38 +238,46 @@ def remote_cdm_playready_get_license_challenge(device):
pssh = PlayReadyPSSH(init_data)
if pssh.wrm_headers:
init_data = pssh.wrm_headers[0]
except InvalidPssh as e:
return jsonify({"message": f"Unable to parse base64 PSSH, {e}"})
except InvalidPssh as error:
return make_response(
"Error",
f"Unable to parse base64 PSSH, {error}",
http_status=400,
)
try:
license_request = cdm.get_license_challenge(
session_id=session_id, wrm_header=init_data
)
except InvalidSession:
return jsonify(
{
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}
return make_response(
"Error",
f"Invalid Session ID '{session_id.hex()}', it may have expired.",
http_status=400,
)
except Exception as e:
return jsonify({"message": f"Error, {e}"})
return jsonify({"message": "success", "data": {"challenge": license_request}})
except ValueError as error:
return make_response(
"Error",
f"Invalid License, {error}",
http_status=400,
)
return make_response(
"Success",
"Successfully got the License Challenge",
{"challenge_b64": base64.b64encode(license_request).decode()},
http_status=200,
)
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/parse_license", methods=["POST"])
def remote_cdm_playready_parse_license(device):
"""Handle the remote device PlayReady parse license."""
body = request.get_json()
for required_field in ("license_message", "session_id"):
if not body.get(required_field):
return jsonify(
{"message": f'Missing required field "{required_field}" in JSON body'}
)
cdm = current_app.config["CDM"]
if not cdm:
return jsonify(
{
"message": f"No Cdm session for {device} has been opened yet. No session to use."
}
)
missing_field = check_required_fields(body, ("license_message", "session_id"))
if missing_field:
return missing_field
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
session_id = bytes.fromhex(body["session_id"])
license_message = body["license_message"]
if is_base64(license_message):
@ -242,44 +285,56 @@ def remote_cdm_playready_parse_license(device):
try:
cdm.parse_license(session_id, license_message)
except InvalidSession:
return jsonify(
{
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}
return make_response(
"Error",
f"Invalid Session ID '{session_id.hex()}', it may have expired.",
http_status=400,
)
except InvalidLicense as e:
return jsonify({"message": f"Invalid License, {e}"})
return make_response(
"Error",
f"Invalid License, {e}",
http_status=400,
)
except Exception as e:
return jsonify({"message": f"Error, {e}"})
return jsonify(
{"message": "Successfully parsed and loaded the Keys from the License message"}
return make_response(
"Error",
f"Error, {e}",
http_status=400,
)
return make_response(
"Success",
"Successfully parsed and loaded the Keys from the License message",
http_status=200,
)
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/get_keys", methods=["POST"])
def remote_cdm_playready_get_keys(device):
"""Handle the remote device PlayReady get keys."""
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return jsonify(
{"message": f'Missing required field "{required_field}" in JSON body'}
)
missing_field = check_required_fields(body, ("session_id",))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
cdm = current_app.config["CDM"]
if not cdm:
return jsonify(
{"message": f"Missing required field '{required_field}' in JSON body."}
)
key_type = body.get("key_type", None)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
keys = cdm.get_keys(session_id)
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return jsonify(
{
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}
return make_response(
"Error",
f"Invalid Session ID '{session_id.hex()}', it may have expired.",
http_status=400,
)
except ValueError as error:
return make_response(
"Error",
f"The Key Type value '{key_type}' is invalid, {error}",
http_status=400,
)
except Exception as e:
return jsonify({"message": f"Error, {e}"})
keys_json = [
{
"key_id": key.key_id.hex,
@ -289,5 +344,11 @@ def remote_cdm_playready_get_keys(device):
"key_length": key.key_length,
}
for key in keys
if not key_type or key.type == key_type
]
return jsonify({"message": "success", "data": {"keys": keys_json}})
return make_response(
"Success",
"Successfully got the Keys",
{"keys": keys_json},
http_status=200,
)

View File

@ -1,7 +1,12 @@
"""Module to handle the remote device Widevine."""
import os
from flask import Blueprint, jsonify, request, current_app, Response
import base64
from typing import Any, Optional, Union
import re
from pathlib import Path
import yaml
from flask import Blueprint, jsonify, request, current_app, Response
from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH as widevinePSSH
from pywidevine import __version__
@ -14,24 +19,47 @@ from pywidevine.exceptions import (
InvalidLicenseType,
InvalidSession,
SignatureMismatch,
TooManySessions,
)
import yaml
from custom_functions.database.user_db import fetch_api_key, fetch_username_by_api_key
from custom_functions.database.user_db import fetch_username_by_api_key
from custom_functions.database.unified_db_ops import cache_to_db
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
from pathlib import Path
remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__)
with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
with open(
os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
) as file:
config = yaml.safe_load(file)
def make_response(status, message, data=None, http_status=200):
"""Make a response."""
resp = {"status": status, "message": message}
if data is not None:
resp["data"] = data
return jsonify(resp), http_status
def check_required_fields(body, required_fields):
"""Return a response if a required field is missing, else None."""
for field in required_fields:
if not body.get(field):
return make_response(
"Error",
f'Missing required field "{field}" in JSON body',
http_status=400,
)
return None
@remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"])
def remote_cdm_widevine():
"""Handle the remote device Widevine."""
if request.method == "GET":
return jsonify(
{"status": 200, "message": f"{config['fqdn'].upper()} Remote Widevine CDM."}
return make_response(
"Success",
f"{config['fqdn'].upper()} Remote Widevine CDM.",
http_status=200,
)
if request.method == "HEAD":
response = Response(status=200)
@ -39,171 +67,168 @@ def remote_cdm_widevine():
f"https://github.com/devine-dl/pywidevine serve v{__version__}"
)
return response
return make_response(
"Error",
"Invalid request method",
http_status=405,
)
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"])
def remote_cdm_widevine_deviceinfo():
if request.method == "GET":
base_name = config["default_wv_cdm"]
if not base_name.endswith(".wvd"):
base_name = base_name + ".wvd"
device = widevineDevice.load(f"{os.getcwd()}/configs/CDMs/WV/{base_name}")
cdm = widevineCDM.from_device(device)
return jsonify(
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f'{config["remote_cdm_secret"]}',
"device_name": Path(base_name).stem,
}
)
"""Handle the remote device Widevine device info."""
base_name = config["default_wv_cdm"]
if not base_name.endswith(".wvd"):
base_name = base_name + ".wvd"
device = widevineDevice.load(
os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name)
)
cdm = widevineCDM.from_device(device)
return make_response(
"Success",
"Successfully got the Widevine CDM device info",
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f'{config["remote_cdm_secret"]}',
"device_name": Path(base_name).stem,
},
http_status=200,
)
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/<device>", methods=["GET"])
def remote_cdm_widevine_deviceinfo_specific(device):
if request.method == "GET":
base_name = Path(device).with_suffix(".wvd").name
api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key)
device = widevineDevice.load(
f"{os.getcwd()}/configs/CDMs/{username}/WV/{base_name}"
"""Handle the remote device Widevine device info specific."""
base_name = Path(device).with_suffix(".wvd").name
api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key)
safe_username = sanitize_username(username)
device = widevineDevice.load(
os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
base_name,
)
cdm = widevineCDM.from_device(device)
return jsonify(
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}",
"device_name": Path(base_name).stem,
}
)
cdm = widevineCDM.from_device(device)
return make_response(
"Success",
"Successfully got the Widevine CDM device info (by user)",
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}",
"device_name": Path(base_name).stem,
},
http_status=200,
)
def load_widevine_device(device_name, api_key=None):
"""Load a Widevine device, either default or user-uploaded."""
try:
if device_name.lower() == config["default_wv_cdm"].lower():
path = os.path.join(
os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
)
else:
if not api_key:
return None
username = fetch_username_by_api_key(api_key)
if not username or not user_allowed_to_use_device(
device=device_name, username=username
):
return None
safe_username = sanitize_username(username)
path = os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
device_name + ".wvd",
)
return widevineDevice.load(path)
except (FileNotFoundError, ValueError):
return None
def get_cdm_or_error(device):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"])
def remote_cdm_widevine_open(device):
if str(device).lower() == config["default_wv_cdm"].lower():
wv_device = widevineDevice.load(
f'{os.getcwd()}/configs/CDMs/WV/{config["default_wv_cdm"]}.wvd'
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return (
jsonify(
{
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
}
),
200,
)
if (
request.headers["X-Secret-Key"]
and str(device).lower() != config["default_wv_cdm"].lower()
):
api_key = request.headers["X-Secret-Key"]
user = fetch_username_by_api_key(api_key=api_key)
if user:
if user_allowed_to_use_device(device=device, username=user):
wv_device = widevineDevice.load(
f"{os.getcwd()}/configs/CDMs/{user}/WV/{device}.wvd"
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return (
jsonify(
{
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
}
),
200,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
"""Handle the remote device Widevine open."""
api_key = request.headers.get("X-Secret-Key")
wv_device = load_widevine_device(device, api_key)
if not wv_device:
return make_response(
"Error",
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return make_response(
"Success",
"Successfully opened the Widevine CDM session",
{
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
http_status=200,
)
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/close/<session_id>", methods=["GET"]
)
def remote_cdm_widevine_close(device, session_id):
"""Handle the remote device Widevine close."""
session_id = bytes.fromhex(session_id)
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM for "{device}" has been opened yet. No session to close',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
cdm.close(session_id)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid session ID "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
return (
jsonify(
{
"status": 200,
"message": f'Successfully closed Session "{session_id.hex()}".',
}
),
200,
return make_response(
"Success",
f'Successfully closed Session "{session_id.hex()}".',
http_status=200,
)
@ -211,80 +236,44 @@ def remote_cdm_widevine_close(device, session_id):
"/remotecdm/widevine/<device>/set_service_certificate", methods=["POST"]
)
def remote_cdm_widevine_set_service_certificate(device):
"""Handle the remote device Widevine set service certificate."""
body = request.get_json()
for required_field in ("session_id", "certificate"):
if required_field == "certificate":
has_field = (
required_field in body
) # it needs the key, but can be empty/null
else:
has_field = body.get(required_field)
if not has_field:
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
missing_field = check_required_fields(body, ("session_id", "certificate"))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
certificate = body["certificate"]
try:
provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid session id: "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid session id: "{session_id.hex()}", it may have expired',
http_status=400,
)
except DecodeError as error:
return (
jsonify(
{"status": 400, "message": f"Invalid Service Certificate, {error}"}
),
400,
return make_response(
"Error",
f"Invalid Service Certificate, {error}",
http_status=400,
)
except SignatureMismatch:
return (
jsonify(
{
"status": 400,
"message": "Signature Validation failed on the Service Certificate, rejecting",
}
),
400,
return make_response(
"Error",
"Signature Validation failed on the Service Certificate, rejecting",
http_status=400,
)
return (
jsonify(
{
"status": 200,
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id,
},
}
),
200,
return make_response(
"Success",
f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
{"provider_id": provider_id},
http_status=200,
)
@ -292,45 +281,25 @@ def remote_cdm_widevine_set_service_certificate(device):
"/remotecdm/widevine/<device>/get_service_certificate", methods=["POST"]
)
def remote_cdm_widevine_get_service_certificate(device):
"""Handle the remote device Widevine get service certificate."""
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
missing_field = check_required_fields(body, ("session_id",))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
service_certificate = cdm.get_service_certificate(session_id)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
if service_certificate:
service_certificate_b64 = base64.b64encode(
@ -338,17 +307,11 @@ def remote_cdm_widevine_get_service_certificate(device):
).decode()
else:
service_certificate_b64 = None
return (
jsonify(
{
"status": 200,
"message": "Successfully got the Service Certificate",
"data": {
"service_certificate": service_certificate_b64,
},
}
),
200,
return make_response(
"Success",
"Successfully got the Service Certificate",
{"service_certificate": service_certificate_b64},
http_status=200,
)
@ -357,42 +320,23 @@ def remote_cdm_widevine_get_service_certificate(device):
methods=["POST"],
)
def remote_cdm_widevine_get_license_challenge(device, license_type):
"""Handle the remote device Widevine get license challenge."""
body = request.get_json()
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
missing_field = check_required_fields(body, ("session_id", "init_data"))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True)
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
if current_app.config.get("force_privacy_mode"):
privacy_mode = True
if not cdm.get_service_certificate(session_id):
return (
jsonify(
{
"status": 403,
"message": "No Service Certificate set but Privacy Mode is Enforced.",
}
),
403,
return make_response(
"Error",
"No Service Certificate set but Privacy Mode is Enforced.",
http_status=403,
)
current_app.config["pssh"] = body["init_data"]
@ -406,97 +350,72 @@ def remote_cdm_widevine_get_license_challenge(device, license_type):
privacy_mode=privacy_mode,
)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
except InvalidInitData as error:
return jsonify({"status": 400, "message": f"Invalid Init Data, {error}"}), 400
except InvalidLicenseType:
return (
jsonify({"status": 400, "message": f"Invalid License Type {license_type}"}),
400,
return make_response(
"Error",
f"Invalid Init Data, {error}",
http_status=400,
)
return (
jsonify(
{
"status": 200,
"message": "Success",
"data": {"challenge_b64": base64.b64encode(license_request).decode()},
}
),
200,
except InvalidLicenseType:
return make_response(
"Error",
f"Invalid License Type {license_type}",
http_status=400,
)
return make_response(
"Success",
"Successfully got the License Challenge",
{"challenge_b64": base64.b64encode(license_request).decode()},
http_status=200,
)
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/parse_license", methods=["POST"])
def remote_cdm_widevine_parse_license(device):
"""Handle the remote device Widevine parse license."""
body = request.get_json()
for required_field in ("session_id", "license_message"):
if not body.get(required_field):
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
missing_field = check_required_fields(body, ("session_id", "license_message"))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
cdm.parse_license(session_id, body["license_message"])
except InvalidLicenseMessage as error:
return (
jsonify({"status": 400, "message": f"Invalid License Message, {error}"}),
400,
return make_response(
"Error",
f"Invalid License Message, {error}",
http_status=400,
)
except InvalidContext as error:
return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400
return make_response(
"Error",
f"Invalid Context, {error}",
http_status=400,
)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
except SignatureMismatch:
return (
jsonify(
{
"status": 400,
"message": f"Signature Validation failed on the License Message, rejecting.",
}
),
400,
return make_response(
"Error",
"Signature Validation failed on the License Message, rejecting.",
http_status=400,
)
return (
jsonify(
{
"status": 200,
"message": "Successfully parsed and loaded the Keys from the License message.",
}
),
200,
return make_response(
"Success",
"Successfully parsed and loaded the Keys from the License message.",
http_status=200,
)
@ -504,54 +423,30 @@ def remote_cdm_widevine_parse_license(device):
"/remotecdm/widevine/<device>/get_keys/<key_type>", methods=["POST"]
)
def remote_cdm_widevine_get_keys(device, key_type):
"""Handle the remote device Widevine get keys."""
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
missing_field = check_required_fields(body, ("session_id",))
if missing_field:
return missing_field
session_id = bytes.fromhex(body["session_id"])
key_type: Optional[str] = key_type
if key_type == "ALL":
key_type = None
cdm = current_app.config["CDM"]
if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return (
jsonify(
{
"status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
except ValueError as error:
return (
jsonify(
{
"status": 400,
"message": f'The Key Type value "{key_type}" is invalid, {error}',
}
),
400,
return make_response(
"Error",
f'The Key Type value "{key_type}" is invalid, {error}',
http_status=400,
)
keys_json = [
{
@ -564,10 +459,6 @@ def remote_cdm_widevine_get_keys(device, key_type):
if not key_type or key.type == key_type
]
for entry in keys_json:
if config["database_type"].lower() != "mariadb":
from custom_functions.database.cache_to_db_sqlite import cache_to_db
elif config["database_type"].lower() == "mariadb":
from custom_functions.database.cache_to_db_mariadb import cache_to_db
if entry["type"] != "SIGNING":
cache_to_db(
pssh=str(current_app.config["pssh"]),
@ -575,7 +466,9 @@ def remote_cdm_widevine_get_keys(device, key_type):
key=entry["key"],
)
return (
jsonify({"status": 200, "message": "Success", "data": {"keys": keys_json}}),
200,
return make_response(
"Success",
"Successfully got the Keys",
{"keys": keys_json},
http_status=200,
)