CDRM-Keys/CDRM_Modules/CDRM_Keys_API.py
2024-09-16 13:28:01 -04:00

321 lines
12 KiB
Python

# Import dependencies
import os
import yaml
from colorama import Fore
from InquirerPy import inquirer
from flask import Flask, request, jsonify, current_app, Response
import base64
import sys
from pathlib import Path
from typing import Any, Optional, Union
from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH
from pywidevine import __version__, Device
from pywidevine.cdm import Cdm
from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicenseMessage, InvalidLicenseType, InvalidSession, SignatureMismatch, TooManySessions)
# Define a function to choose a WVD
def choose_wvd():
try:
files_in_wvds_folder = os.listdir(f'{os.getcwd()}/WVDs')
wvds = [file for file in files_in_wvds_folder if file.endswith('.wvd')]
choice = inquirer.select(
message=f'\nChoose the WVD you wish to serve:',
choices=wvds,
).execute()
return choice
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define function to start flask app
def start_cdrm_keys_api():
choice = choose_wvd()
app = Flask("CDRM-Keys API")
@app.route('/remote_cdm', methods=['HEAD', 'GET'])
def remote_cdm():
if request.method == 'HEAD':
response = Response(status=200)
response.headers.update({
'Server': f'https://github.com/devine-dl/pywidevine serve v{__version__}'
})
return response
elif request.method == 'GET':
with open(f'{os.getcwd()}/Config.yaml', 'r') as file:
config = yaml.safe_load(file)
device = Device.load(f'{os.getcwd()}/WVDs/{choice}')
remote_cdm_details = {
'name': 'TPD-Keys-API',
'device_type': device.type.name,
'system_id': device.system_id,
'security_level': device.security_level,
'host': f'{config["Remote_CDM_API_FQDN"]}/remote_cdm',
'secret': 'CDRM-Keys',
}
return jsonify(remote_cdm_details)
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/open', methods=['GET'])
def device_open(device):
if request.method == 'GET':
device = Device.load(f'{os.getcwd()}/WVDs/{choice}')
cdm = current_app.config['cdm'] = Cdm.from_device(device)
try:
session_id = cdm.open()
except TooManySessions as error:
return jsonify({
'status': 400,
'message': str(error)
})
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'session_id': session_id.hex(),
'device': {
'system_id': cdm.system_id,
'security_level': cdm.security_level
}
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/close/<session_id>', methods=['GET'])
def device_close(device, session_id):
if request.method == 'GET':
cdm = current_app.config['cdm']
session_id = bytes.fromhex(session_id)
try:
cdm.close(session_id)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
return jsonify({
'status': 200,
'message': f'Successfully closed Session "{session_id.hex()}"',
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/set_service_certificate', methods=['POST'])
def set_service_certificate(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "certificate"):
if required_field == "certificate":
has_field = required_field in body
else:
has_field = body.get(required_field)
if not has_field:
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
certificate = body.get('certificate')
try:
provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except DecodeError as error:
return jsonify({
'status': 400,
'message': f'Invalid Service Certificate, {error}'
})
except SignatureMismatch:
return jsonify({
'status': 400,
'message': f'Signature Validation failed on the Service Certificate, rejecting.'
})
return jsonify({
'status': 200,
'message': f'Successfully {["set", "unset"][not certificate]} the Service Certificate.',
'data': {
'provider_id': provider_id,
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/get_service_certificate', methods=['POST'])
def get_service_certificate(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
try:
service_certificate = cdm.get_service_certificate(session_id)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
if service_certificate:
service_certificate_b64 = base64.b64encode(service_certificate.SerializeToString()).decode()
else:
service_certificate_b64 = None
return jsonify({
'status': 200,
'message': 'Successfully got the Service Certificate.',
'data': {
'service_certificate': service_certificate_b64,
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/get_license_challenge/<license_type>', methods=['POST'])
def get_license_challenge(device, license_type):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
privacy_mode = body.get('privacy_mode', True)
cdm = current_app.config['cdm']
if not cdm.get_service_certificate(session_id):
privacy_mode = False
init_data = PSSH(body['init_data'])
try:
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
license_type=license_type,
privacy_mode=privacy_mode
)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except InvalidInitData as error:
return jsonify({
'status': 400,
'message': f'Invalid Init Data, {error}'
})
except InvalidLicenseType:
return jsonify({
'status': 400,
'message': f'Invalid License Type "{license_type}"'
})
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'challenge_b64': base64.b64encode(license_request).decode()
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/parse_license', methods=['POST'])
def parse_license(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "license_message"):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
try:
cdm.parse_license(session_id, body['license_message'])
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except InvalidLicenseMessage as error:
return jsonify({
'status': 400,
'message': f'Invalid License Message, {error}'
})
except InvalidContext as error:
return jsonify({
'status': 400,
'message': f'Invalid Context "{error}"'
})
except SignatureMismatch:
return jsonify({
'status': 400,
'message': f'Signature Validation failed on the License Message, rejecting.'
})
return jsonify({
'status': 200,
'message': 'Successfully parsed and loaded the Keys from the License message.',
})
@app.route('/remote_cdm/<device>/get_keys/<key_type>', methods=['POST'])
def get_keys(device, key_type):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
key_type = str(key_type)
if key_type == 'ALL':
key_type = None
cdm = current_app.config['cdm']
try:
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except ValueError as error:
return jsonify({
'status': 400,
'message': f'They Key Type value "{key_type}" is invalid, {error}'
})
keys_json = [
{
'key_id': key.kid.hex,
'key': key.key.hex(),
'type': key.type,
'permissions': key.permissions,
}
for key in keys
]
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'keys': keys_json
}
})
app.run()