CDRM-Project/routes/remote_device_wv.py

475 lines
15 KiB
Python
Raw Permalink Normal View History

"""Module to handle the remote device Widevine."""
2025-04-24 17:06:14 -04:00
import os
import base64
import re
from pathlib import Path
import yaml
from flask import Blueprint, jsonify, request, current_app, Response
2025-04-24 17:06:14 -04:00
from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH as widevinePSSH
from pywidevine import __version__
from pywidevine.cdm import Cdm as widevineCDM
from pywidevine.device import Device as widevineDevice
from pywidevine.exceptions import (
InvalidContext,
InvalidInitData,
InvalidLicenseMessage,
InvalidLicenseType,
InvalidSession,
SignatureMismatch,
)
2025-04-24 17:06:14 -04:00
from custom_functions.database.user_db import fetch_username_by_api_key
from custom_functions.database.unified_db_ops import cache_to_db
2025-04-30 20:11:17 -04:00
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
2025-04-24 17:06:14 -04:00
remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__)
with open(
os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
) as file:
2025-04-24 17:06:14 -04:00
config = yaml.safe_load(file)
def make_response(status, message, data=None, http_status=200):
"""Make a response."""
resp = {"status": status, "message": message}
if data is not None:
resp["data"] = data
return jsonify(resp), http_status
def check_required_fields(body, required_fields):
"""Return a response if a required field is missing, else None."""
for field in required_fields:
if not body.get(field):
return make_response(
"Error",
f'Missing required field "{field}" in JSON body',
http_status=400,
)
return None
@remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"])
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine():
"""Handle the remote device Widevine."""
if request.method == "GET":
return make_response(
"Success",
f"{config['fqdn'].upper()} Remote Widevine CDM.",
http_status=200,
)
if request.method == "HEAD":
2025-04-24 17:06:14 -04:00
response = Response(status=200)
response.headers["Server"] = (
f"https://github.com/devine-dl/pywidevine serve v{__version__}"
)
2025-04-24 17:06:14 -04:00
return response
return make_response(
"Error",
"Invalid request method",
http_status=405,
)
2025-04-24 17:06:14 -04:00
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"])
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_deviceinfo():
"""Handle the remote device Widevine device info."""
base_name = config["default_wv_cdm"]
if not base_name.endswith(".wvd"):
base_name = base_name + ".wvd"
device = widevineDevice.load(
os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name)
)
cdm = widevineCDM.from_device(device)
return make_response(
"Success",
"Successfully got the Widevine CDM device info",
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f'{config["remote_cdm_secret"]}',
"device_name": Path(base_name).stem,
},
http_status=200,
)
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/<device>", methods=["GET"])
2025-05-07 01:04:21 -04:00
def remote_cdm_widevine_deviceinfo_specific(device):
"""Handle the remote device Widevine device info specific."""
base_name = Path(device).with_suffix(".wvd").name
api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key)
safe_username = sanitize_username(username)
device = widevineDevice.load(
os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
base_name,
)
)
cdm = widevineCDM.from_device(device)
return make_response(
"Success",
"Successfully got the Widevine CDM device info (by user)",
{
"device_type": cdm.device_type.name,
"system_id": cdm.system_id,
"security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}",
"device_name": Path(base_name).stem,
},
http_status=200,
)
def load_widevine_device(device_name, api_key=None):
"""Load a Widevine device, either default or user-uploaded."""
try:
if device_name.lower() == config["default_wv_cdm"].lower():
path = os.path.join(
os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
)
else:
if not api_key:
return None
username = fetch_username_by_api_key(api_key)
if not username or not user_allowed_to_use_device(
device=device_name, username=username
):
return None
safe_username = sanitize_username(username)
path = os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
device_name + ".wvd",
)
return widevineDevice.load(path)
except (FileNotFoundError, ValueError):
return None
def get_cdm_or_error(device):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"])
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_open(device):
"""Handle the remote device Widevine open."""
api_key = request.headers.get("X-Secret-Key")
wv_device = load_widevine_device(device, api_key)
if not wv_device:
return make_response(
"Error",
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return make_response(
"Success",
"Successfully opened the Widevine CDM session",
{
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
http_status=200,
)
2025-04-30 20:11:17 -04:00
2025-04-24 17:06:14 -04:00
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/close/<session_id>", methods=["GET"]
)
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_close(device, session_id):
"""Handle the remote device Widevine close."""
session_id = bytes.fromhex(session_id)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
try:
cdm.close(session_id)
except InvalidSession:
return make_response(
"Error",
f'Invalid session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
return make_response(
"Success",
f'Successfully closed Session "{session_id.hex()}".',
http_status=200,
)
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/set_service_certificate", methods=["POST"]
)
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_set_service_certificate(device):
"""Handle the remote device Widevine set service certificate."""
2025-04-30 20:11:17 -04:00
body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "certificate"))
if missing_field:
return missing_field
2025-04-24 17:06:14 -04:00
2025-04-30 20:11:17 -04:00
session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
2025-04-30 20:11:17 -04:00
certificate = body["certificate"]
try:
provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession:
return make_response(
"Error",
f'Invalid session id: "{session_id.hex()}", it may have expired',
http_status=400,
)
2025-04-30 20:11:17 -04:00
except DecodeError as error:
return make_response(
"Error",
f"Invalid Service Certificate, {error}",
http_status=400,
)
2025-04-30 20:11:17 -04:00
except SignatureMismatch:
return make_response(
"Error",
"Signature Validation failed on the Service Certificate, rejecting",
http_status=400,
)
return make_response(
"Success",
f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
{"provider_id": provider_id},
http_status=200,
)
2025-04-24 17:06:14 -04:00
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/get_service_certificate", methods=["POST"]
)
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_get_service_certificate(device):
"""Handle the remote device Widevine get service certificate."""
2025-04-30 20:11:17 -04:00
body = request.get_json()
missing_field = check_required_fields(body, ("session_id",))
if missing_field:
return missing_field
2025-04-24 17:06:14 -04:00
2025-04-30 20:11:17 -04:00
session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
2025-04-30 20:11:17 -04:00
try:
service_certificate = cdm.get_service_certificate(session_id)
except InvalidSession:
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
2025-04-30 20:11:17 -04:00
if service_certificate:
service_certificate_b64 = base64.b64encode(
service_certificate.SerializeToString()
).decode()
2025-04-30 20:11:17 -04:00
else:
service_certificate_b64 = None
return make_response(
"Success",
"Successfully got the Service Certificate",
{"service_certificate": service_certificate_b64},
http_status=200,
)
2025-04-24 17:06:14 -04:00
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/get_license_challenge/<license_type>",
methods=["POST"],
)
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_get_license_challenge(device, license_type):
"""Handle the remote device Widevine get license challenge."""
2025-04-30 20:11:17 -04:00
body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "init_data"))
if missing_field:
return missing_field
2025-04-30 20:11:17 -04:00
session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True)
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
2025-04-30 20:11:17 -04:00
if current_app.config.get("force_privacy_mode"):
privacy_mode = True
if not cdm.get_service_certificate(session_id):
return make_response(
"Error",
"No Service Certificate set but Privacy Mode is Enforced.",
http_status=403,
)
2025-04-24 17:06:14 -04:00
current_app.config["pssh"] = body["init_data"]
init_data = widevinePSSH(body["init_data"])
2025-04-24 17:06:14 -04:00
2025-04-30 20:11:17 -04:00
try:
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
license_type=license_type,
privacy_mode=privacy_mode,
2025-04-30 20:11:17 -04:00
)
except InvalidSession:
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
2025-04-30 20:11:17 -04:00
except InvalidInitData as error:
return make_response(
"Error",
f"Invalid Init Data, {error}",
http_status=400,
)
2025-04-30 20:11:17 -04:00
except InvalidLicenseType:
return make_response(
"Error",
f"Invalid License Type {license_type}",
http_status=400,
)
return make_response(
"Success",
"Successfully got the License Challenge",
{"challenge_b64": base64.b64encode(license_request).decode()},
http_status=200,
)
2025-04-24 17:06:14 -04:00
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/parse_license", methods=["POST"])
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_parse_license(device):
"""Handle the remote device Widevine parse license."""
2025-04-30 20:11:17 -04:00
body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "license_message"))
if missing_field:
return missing_field
2025-04-30 20:11:17 -04:00
session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
2025-04-30 20:11:17 -04:00
try:
cdm.parse_license(session_id, body["license_message"])
2025-04-30 20:11:17 -04:00
except InvalidLicenseMessage as error:
return make_response(
"Error",
f"Invalid License Message, {error}",
http_status=400,
)
2025-04-30 20:11:17 -04:00
except InvalidContext as error:
return make_response(
"Error",
f"Invalid Context, {error}",
http_status=400,
)
2025-04-30 20:11:17 -04:00
except InvalidSession:
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
2025-04-30 20:11:17 -04:00
except SignatureMismatch:
return make_response(
"Error",
"Signature Validation failed on the License Message, rejecting.",
http_status=400,
)
return make_response(
"Success",
"Successfully parsed and loaded the Keys from the License message.",
http_status=200,
)
@remotecdm_wv_bp.route(
"/remotecdm/widevine/<device>/get_keys/<key_type>", methods=["POST"]
)
2025-04-24 17:06:14 -04:00
def remote_cdm_widevine_get_keys(device, key_type):
"""Handle the remote device Widevine get keys."""
2025-04-30 20:11:17 -04:00
body = request.get_json()
missing_field = check_required_fields(body, ("session_id",))
if missing_field:
return missing_field
2025-04-30 20:11:17 -04:00
session_id = bytes.fromhex(body["session_id"])
if key_type == "ALL":
2025-04-30 20:11:17 -04:00
key_type = None
cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response
return cdm
2025-04-30 20:11:17 -04:00
try:
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return make_response(
"Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400,
)
2025-04-30 20:11:17 -04:00
except ValueError as error:
return make_response(
"Error",
f'The Key Type value "{key_type}" is invalid, {error}',
http_status=400,
)
2025-04-30 20:11:17 -04:00
keys_json = [
{
"key_id": key.kid.hex,
"key": key.key.hex(),
"type": key.type,
"permissions": key.permissions,
2025-04-30 20:11:17 -04:00
}
for key in keys
if not key_type or key.type == key_type
]
for entry in keys_json:
if entry["type"] != "SIGNING":
cache_to_db(
pssh=str(current_app.config["pssh"]),
kid=entry["key_id"],
key=entry["key"],
)
return make_response(
"Success",
"Successfully got the Keys",
{"keys": keys_json},
http_status=200,
)