import os from flask import Blueprint, jsonify, request, current_app, Response import base64 from typing import Any, Optional, Union from google.protobuf.message import DecodeError from pywidevine.pssh import PSSH as widevinePSSH from pywidevine import __version__ from pywidevine.cdm import Cdm as widevineCDM from pywidevine.device import Device as widevineDevice from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicenseMessage, InvalidLicenseType, InvalidSession, SignatureMismatch, TooManySessions) import yaml remotecdm_wv_bp = Blueprint('remotecdm_wv', __name__) with open(f'{os.getcwd()}/configs/config.yaml', 'r') as file: config = yaml.safe_load(file) @remotecdm_wv_bp.route('/remotecdm/widevine', methods=['GET', 'HEAD']) def remote_cdm_widevine(): if request.method == 'GET': return jsonify({ 'status': 200, 'message': f"{config['fqdn'].upper()} Remote Widevine CDM." }) if request.method == 'HEAD': response = Response(status=200) response.headers['Server'] = f'https://github.com/devine-dl/pywidevine serve v{__version__}' return response @remotecdm_wv_bp.route('/remotecdm/widevine/deviceinfo', methods=['GET']) def remote_cdm_widevine_deviceinfo(): if request.method == 'GET': base_name = config["default_wv_cdm"] if not base_name.endswith(".wvd"): full_file_name = (base_name + ".wvd") device = widevineDevice.load(f'{os.getcwd()}/configs/CDMs/WV/{full_file_name}') cdm = widevineCDM.from_device(device) return jsonify({ 'device_type': cdm.device_type.name, 'system_id': cdm.system_id, 'security_level': cdm.security_level, 'host': f'{config["fqdn"]}/remotecdm/widevine', 'secret': f'{config["remote_cdm_secret"]}', 'device_name': f'{base_name}' }) @remotecdm_wv_bp.route('/remotecdm/widevine//open', methods=['GET']) def remote_cdm_widevine_open(device): if str(device).lower() == config['default_wv_cdm'].lower(): wv_device = widevineDevice.load(f'{os.getcwd()}/configs/CDMs/WV/{config["default_wv_cdm"]}.wvd') cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) session_id = cdm.open() return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'session_id': session_id.hex(), 'device': { 'system_id': cdm.system_id, 'security_level': cdm.security_level, } } }) else: return jsonify({ 'status': 400, 'message': 'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//close/', methods=['GET']) def remote_cdm_widevine_close(device, session_id): if str(device).lower() == config['default_wv_cdm'].lower(): session_id = bytes.fromhex(session_id) cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM for "{device}" has been opened yet. No session to close' }) try: cdm.close(session_id) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid session ID "{session_id.hex()}", it may have expired' }) return jsonify({ 'status': 200, 'message': f'Successfully closed Session "{session_id.hex()}".', }) else: return jsonify({ 'status': 400, 'message': f'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//set_service_certificate', methods=['POST']) def remote_cdm_widevine_set_service_certificate(device): if str(device).lower() == config['default_wv_cdm'].lower(): body = request.get_json() for required_field in ("session_id", "certificate"): if required_field == "certificate": has_field = required_field in body # it needs the key, but can be empty/null else: has_field = body.get(required_field) if not has_field: return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body["session_id"]) cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM session for "{device}" has been opened yet. No session to use' }) certificate = body["certificate"] try: provider_id = cdm.set_service_certificate(session_id, certificate) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid session id: "{session_id.hex()}", it may have expired' }) except DecodeError as error: return jsonify({ 'status': 400, 'message': f'Invalid Service Certificate, {error}' }) except SignatureMismatch: return jsonify({ 'status': 400, 'message': 'Signature Validation failed on the Service Certificate, rejecting' }) return jsonify({ 'status': 200, 'message': f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", 'data': { 'provider_id': provider_id, } }) else: return jsonify({ 'status': 400, 'message': f'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//get_service_certificate', methods=['POST']) def remote_cdm_widevine_get_service_certificate(device): if str(device).lower() == config['default_wv_cdm'].lower(): body = request.get_json() for required_field in ("session_id",): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body["session_id"]) cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM session for "{device}" has been opened yet. No session to use' }) try: service_certificate = cdm.get_service_certificate(session_id) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired' }) if service_certificate: service_certificate_b64 = base64.b64encode(service_certificate.SerializeToString()).decode() else: service_certificate_b64 = None return jsonify({ 'status': 200, 'message': 'Successfully got the Service Certificate', 'data': { 'service_certificate': service_certificate_b64, } }) else: return jsonify({ 'status': 400, 'message': f'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//get_license_challenge/', methods=['POST']) def remote_cdm_widevine_get_license_challenge(device, license_type): if str(device).lower() == config['default_wv_cdm'].lower(): body = request.get_json() for required_field in ("session_id", "init_data"): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body["session_id"]) privacy_mode = body.get("privacy_mode", True) cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM session for "{device}" has been opened yet. No session to use' }) if current_app.config.get("force_privacy_mode"): privacy_mode = True if not cdm.get_service_certificate(session_id): return jsonify({ 'status': 403, 'message': 'No Service Certificate set but Privacy Mode is Enforced.' }) current_app.config['pssh'] = body['init_data'] init_data = widevinePSSH(body['init_data']) try: license_request = cdm.get_license_challenge( session_id=session_id, pssh=init_data, license_type=license_type, privacy_mode=privacy_mode ) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired' }) except InvalidInitData as error: return jsonify({ 'status': 400, 'message': f'Invalid Init Data, {error}' }) except InvalidLicenseType: return jsonify({ 'status': 400, 'message': f'Invalid License Type {license_type}' }) return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'challenge_b64': base64.b64encode(license_request).decode() } }) else: return jsonify({ 'status': 400, 'message': f'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//parse_license', methods=['POST']) def remote_cdm_widevine_parse_license(device): if str(device).lower() == config['default_wv_cdm'].lower(): body = request.get_json() for required_field in ("session_id", "license_message"): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body["session_id"]) cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM session for "{device}" has been opened yet. No session to use' }) try: cdm.parse_license(session_id, body['license_message']) except InvalidLicenseMessage as error: return jsonify({ 'status': 400, 'message': f'Invalid License Message, {error}' }) except InvalidContext as error: return jsonify({ 'status': 400, 'message': f'Invalid Context, {error}' }) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired' }) except SignatureMismatch: return jsonify({ 'status': 400, 'message': f'Signature Validation failed on the License Message, rejecting.' }) return jsonify({ 'status': 200, 'message': 'Successfully parsed and loaded the Keys from the License message.', }) else: return jsonify({ 'status': 400, 'message': 'Unauthorized' }) @remotecdm_wv_bp.route('/remotecdm/widevine//get_keys/', methods=['POST']) def remote_cdm_widevine_get_keys(device, key_type): if str(device).lower() == config['default_wv_cdm'].lower(): body = request.get_json() for required_field in ("session_id",): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body["session_id"]) key_type: Optional[str] = key_type if key_type == 'ALL': key_type = None cdm = current_app.config["CDM"] if not cdm: return jsonify({ 'status': 400, 'message': f'No CDM session for "{device}" has been opened yet. No session to use' }) try: keys = cdm.get_keys(session_id, key_type) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired' }) except ValueError as error: return jsonify({ 'status': 400, 'message': f'The Key Type value "{key_type}" is invalid, {error}' }) keys_json = [ { "key_id": key.kid.hex, "key": key.key.hex(), "type": key.type, "permissions": key.permissions } for key in keys if not key_type or key.type == key_type ] for entry in keys_json: if config['database_type'].lower() != 'mariadb': from custom_functions.database.cache_to_db_sqlite import cache_to_db elif config['database_type'].lower() == 'mariadb': from custom_functions.database.cache_to_db_mariadb import cache_to_db if entry['type'] != 'SIGNING': cache_to_db(pssh=str(current_app.config['pssh']), kid=entry['key_id'], key=entry['key']) return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'keys': keys_json } })