# Import dependencies import os import yaml from colorama import Fore from InquirerPy import inquirer from flask import Flask, request, jsonify, current_app, Response import base64 import sys from pathlib import Path from typing import Any, Optional, Union from google.protobuf.message import DecodeError from pywidevine.pssh import PSSH from pywidevine import __version__, Device from pywidevine.cdm import Cdm from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicenseMessage, InvalidLicenseType, InvalidSession, SignatureMismatch, TooManySessions) # Define a function to choose a WVD def choose_wvd(): try: files_in_wvds_folder = os.listdir(f'{os.getcwd()}/WVDs') wvds = [file for file in files_in_wvds_folder if file.endswith('.wvd')] choice = inquirer.select( message=f'\nChoose the WVD you wish to serve:', choices=wvds, ).execute() return choice except Exception as error: print(f'{Fore.RED}An error occurred!\n\n{error}') exit(1) # Define function to start flask app def start_cdrm_keys_api(): choice = choose_wvd() app = Flask("CDRM-Keys API") @app.route('/remote_cdm', methods=['HEAD', 'GET']) def remote_cdm(): if request.method == 'HEAD': response = Response(status=200) response.headers.update({ 'Server': f'https://github.com/devine-dl/pywidevine serve v{__version__}' }) return response elif request.method == 'GET': with open(f'{os.getcwd()}/Config.yaml', 'r') as file: config = yaml.safe_load(file) device = Device.load(f'{os.getcwd()}/WVDs/{choice}') remote_cdm_details = { 'name': 'TPD-Keys-API', 'device_type': device.type.name, 'system_id': device.system_id, 'security_level': device.security_level, 'host': f'{config["Remote_CDM_API_FQDN"]}/remote_cdm', 'secret': 'CDRM-Keys', } return jsonify(remote_cdm_details) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//open', methods=['GET']) def device_open(device): if request.method == 'GET': device = Device.load(f'{os.getcwd()}/WVDs/{choice}') cdm = current_app.config['cdm'] = Cdm.from_device(device) try: session_id = cdm.open() except TooManySessions as error: return jsonify({ 'status': 400, 'message': str(error) }) return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'session_id': session_id.hex(), 'device': { 'system_id': cdm.system_id, 'security_level': cdm.security_level } } }) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//close/', methods=['GET']) def device_close(device, session_id): if request.method == 'GET': cdm = current_app.config['cdm'] session_id = bytes.fromhex(session_id) try: cdm.close(session_id) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) return jsonify({ 'status': 200, 'message': f'Successfully closed Session "{session_id.hex()}"', }) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//set_service_certificate', methods=['POST']) def set_service_certificate(device): if request.method == 'POST': body = request.get_json() for required_field in ("session_id", "certificate"): if required_field == "certificate": has_field = required_field in body else: has_field = body.get(required_field) if not has_field: return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body['session_id']) cdm = current_app.config['cdm'] certificate = body.get('certificate') try: provider_id = cdm.set_service_certificate(session_id, certificate) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) except DecodeError as error: return jsonify({ 'status': 400, 'message': f'Invalid Service Certificate, {error}' }) except SignatureMismatch: return jsonify({ 'status': 400, 'message': f'Signature Validation failed on the Service Certificate, rejecting.' }) return jsonify({ 'status': 200, 'message': f'Successfully {["set", "unset"][not certificate]} the Service Certificate.', 'data': { 'provider_id': provider_id, } }) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//get_service_certificate', methods=['POST']) def get_service_certificate(device): if request.method == 'POST': body = request.get_json() for required_field in ("session_id",): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body['session_id']) cdm = current_app.config['cdm'] try: service_certificate = cdm.get_service_certificate(session_id) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) if service_certificate: service_certificate_b64 = base64.b64encode(service_certificate.SerializeToString()).decode() else: service_certificate_b64 = None return jsonify({ 'status': 200, 'message': 'Successfully got the Service Certificate.', 'data': { 'service_certificate': service_certificate_b64, } }) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//get_license_challenge/', methods=['POST']) def get_license_challenge(device, license_type): if request.method == 'POST': body = request.get_json() for required_field in ("session_id", "init_data"): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body['session_id']) privacy_mode = body.get('privacy_mode', True) cdm = current_app.config['cdm'] if not cdm.get_service_certificate(session_id): privacy_mode = False init_data = PSSH(body['init_data']) try: license_request = cdm.get_license_challenge( session_id=session_id, pssh=init_data, license_type=license_type, privacy_mode=privacy_mode ) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) except InvalidInitData as error: return jsonify({ 'status': 400, 'message': f'Invalid Init Data, {error}' }) except InvalidLicenseType: return jsonify({ 'status': 400, 'message': f'Invalid License Type "{license_type}"' }) return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'challenge_b64': base64.b64encode(license_request).decode() } }) else: return jsonify({"message": "Method not allowed"}) @app.route('/remote_cdm//parse_license', methods=['POST']) def parse_license(device): if request.method == 'POST': body = request.get_json() for required_field in ("session_id", "license_message"): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body['session_id']) cdm = current_app.config['cdm'] try: cdm.parse_license(session_id, body['license_message']) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) except InvalidLicenseMessage as error: return jsonify({ 'status': 400, 'message': f'Invalid License Message, {error}' }) except InvalidContext as error: return jsonify({ 'status': 400, 'message': f'Invalid Context "{error}"' }) except SignatureMismatch: return jsonify({ 'status': 400, 'message': f'Signature Validation failed on the License Message, rejecting.' }) return jsonify({ 'status': 200, 'message': 'Successfully parsed and loaded the Keys from the License message.', }) @app.route('/remote_cdm//get_keys/', methods=['POST']) def get_keys(device, key_type): if request.method == 'POST': body = request.get_json() for required_field in ("session_id",): if not body.get(required_field): return jsonify({ 'status': 400, 'message': f'Missing required field "{required_field}" in JSON body' }) session_id = bytes.fromhex(body['session_id']) key_type = str(key_type) if key_type == 'ALL': key_type = None cdm = current_app.config['cdm'] try: keys = cdm.get_keys(session_id, key_type) except InvalidSession: return jsonify({ 'status': 400, 'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.' }) except ValueError as error: return jsonify({ 'status': 400, 'message': f'They Key Type value "{key_type}" is invalid, {error}' }) keys_json = [ { 'key_id': key.kid.hex, 'key': key.key.hex(), 'type': key.type, 'permissions': key.permissions, } for key in keys ] return jsonify({ 'status': 200, 'message': 'Success', 'data': { 'keys': keys_json } }) app.run()