v3.00-alpha

This commit is contained in:
CDM-Project 2024-09-16 13:28:01 -04:00
commit 3eac4d7706
9 changed files with 761 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.pyc

8
CDRM-Keys.py Normal file
View File

@ -0,0 +1,8 @@
# Import dependencies
import CDRM_Modules
# Run startup checks
CDRM_Modules.Startup_Checks.run_startup_checks(startup=True)
# Run main menu
CDRM_Modules.Main_Menu.main_menu()

View File

@ -0,0 +1,27 @@
#Import dependencies
from distutils.command.config import config
import yaml
from InquirerPy import inquirer
import os
# Define function to choose local CDM
def select_local_cdm():
files_in_wvds_folder = os.listdir(f'{os.getcwd()}/WVDs')
wvds = [file for file in files_in_wvds_folder if file.endswith('.wvd')]
choice = inquirer.select(
message=f'\nChoose the WVD you wish to use:',
choices=wvds,
).execute()
return choice
# Define function to choose remote CDM
def select_remote_cdm():
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
remote_cdms = [remotecdm for remotecdm in config['Remote_CDMs']]
choice = inquirer.select(
message=f'\nChoose the Remote CDM you wish to use:',
choices=remote_cdms,
).execute()
return choice

View File

@ -0,0 +1,321 @@
# Import dependencies
import os
import yaml
from colorama import Fore
from InquirerPy import inquirer
from flask import Flask, request, jsonify, current_app, Response
import base64
import sys
from pathlib import Path
from typing import Any, Optional, Union
from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH
from pywidevine import __version__, Device
from pywidevine.cdm import Cdm
from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicenseMessage, InvalidLicenseType, InvalidSession, SignatureMismatch, TooManySessions)
# Define a function to choose a WVD
def choose_wvd():
try:
files_in_wvds_folder = os.listdir(f'{os.getcwd()}/WVDs')
wvds = [file for file in files_in_wvds_folder if file.endswith('.wvd')]
choice = inquirer.select(
message=f'\nChoose the WVD you wish to serve:',
choices=wvds,
).execute()
return choice
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define function to start flask app
def start_cdrm_keys_api():
choice = choose_wvd()
app = Flask("CDRM-Keys API")
@app.route('/remote_cdm', methods=['HEAD', 'GET'])
def remote_cdm():
if request.method == 'HEAD':
response = Response(status=200)
response.headers.update({
'Server': f'https://github.com/devine-dl/pywidevine serve v{__version__}'
})
return response
elif request.method == 'GET':
with open(f'{os.getcwd()}/Config.yaml', 'r') as file:
config = yaml.safe_load(file)
device = Device.load(f'{os.getcwd()}/WVDs/{choice}')
remote_cdm_details = {
'name': 'TPD-Keys-API',
'device_type': device.type.name,
'system_id': device.system_id,
'security_level': device.security_level,
'host': f'{config["Remote_CDM_API_FQDN"]}/remote_cdm',
'secret': 'CDRM-Keys',
}
return jsonify(remote_cdm_details)
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/open', methods=['GET'])
def device_open(device):
if request.method == 'GET':
device = Device.load(f'{os.getcwd()}/WVDs/{choice}')
cdm = current_app.config['cdm'] = Cdm.from_device(device)
try:
session_id = cdm.open()
except TooManySessions as error:
return jsonify({
'status': 400,
'message': str(error)
})
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'session_id': session_id.hex(),
'device': {
'system_id': cdm.system_id,
'security_level': cdm.security_level
}
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/close/<session_id>', methods=['GET'])
def device_close(device, session_id):
if request.method == 'GET':
cdm = current_app.config['cdm']
session_id = bytes.fromhex(session_id)
try:
cdm.close(session_id)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
return jsonify({
'status': 200,
'message': f'Successfully closed Session "{session_id.hex()}"',
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/set_service_certificate', methods=['POST'])
def set_service_certificate(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "certificate"):
if required_field == "certificate":
has_field = required_field in body
else:
has_field = body.get(required_field)
if not has_field:
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
certificate = body.get('certificate')
try:
provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except DecodeError as error:
return jsonify({
'status': 400,
'message': f'Invalid Service Certificate, {error}'
})
except SignatureMismatch:
return jsonify({
'status': 400,
'message': f'Signature Validation failed on the Service Certificate, rejecting.'
})
return jsonify({
'status': 200,
'message': f'Successfully {["set", "unset"][not certificate]} the Service Certificate.',
'data': {
'provider_id': provider_id,
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/get_service_certificate', methods=['POST'])
def get_service_certificate(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
try:
service_certificate = cdm.get_service_certificate(session_id)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
if service_certificate:
service_certificate_b64 = base64.b64encode(service_certificate.SerializeToString()).decode()
else:
service_certificate_b64 = None
return jsonify({
'status': 200,
'message': 'Successfully got the Service Certificate.',
'data': {
'service_certificate': service_certificate_b64,
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/get_license_challenge/<license_type>', methods=['POST'])
def get_license_challenge(device, license_type):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
privacy_mode = body.get('privacy_mode', True)
cdm = current_app.config['cdm']
if not cdm.get_service_certificate(session_id):
privacy_mode = False
init_data = PSSH(body['init_data'])
try:
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
license_type=license_type,
privacy_mode=privacy_mode
)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except InvalidInitData as error:
return jsonify({
'status': 400,
'message': f'Invalid Init Data, {error}'
})
except InvalidLicenseType:
return jsonify({
'status': 400,
'message': f'Invalid License Type "{license_type}"'
})
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'challenge_b64': base64.b64encode(license_request).decode()
}
})
else:
return jsonify({"message": "Method not allowed"})
@app.route('/remote_cdm/<device>/parse_license', methods=['POST'])
def parse_license(device):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id", "license_message"):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
cdm = current_app.config['cdm']
try:
cdm.parse_license(session_id, body['license_message'])
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except InvalidLicenseMessage as error:
return jsonify({
'status': 400,
'message': f'Invalid License Message, {error}'
})
except InvalidContext as error:
return jsonify({
'status': 400,
'message': f'Invalid Context "{error}"'
})
except SignatureMismatch:
return jsonify({
'status': 400,
'message': f'Signature Validation failed on the License Message, rejecting.'
})
return jsonify({
'status': 200,
'message': 'Successfully parsed and loaded the Keys from the License message.',
})
@app.route('/remote_cdm/<device>/get_keys/<key_type>', methods=['POST'])
def get_keys(device, key_type):
if request.method == 'POST':
body = request.get_json()
for required_field in ("session_id",):
if not body.get(required_field):
return jsonify({
'status': 400,
'message': f'Missing required field "{required_field}" in JSON body'
})
session_id = bytes.fromhex(body['session_id'])
key_type = str(key_type)
if key_type == 'ALL':
key_type = None
cdm = current_app.config['cdm']
try:
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return jsonify({
'status': 400,
'message': f'Invalid Session ID "{session_id.hex()}", it may have expired.'
})
except ValueError as error:
return jsonify({
'status': 400,
'message': f'They Key Type value "{key_type}" is invalid, {error}'
})
keys_json = [
{
'key_id': key.kid.hex,
'key': key.key.hex(),
'type': key.type,
'permissions': key.permissions,
}
for key in keys
]
return jsonify({
'status': 200,
'message': 'Success',
'data': {
'keys': keys_json
}
})
app.run()

150
CDRM_Modules/Decrypt.py Normal file
View File

@ -0,0 +1,150 @@
import httpx
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
from pywidevine.remotecdm import RemoteCdm
import yaml
import os
from colorama import Fore
from . import CDM_Selector
def load_license_curl():
class LicenseCURL:
def __init__(self):
self.headers = headers
self.cookies = cookies
try:
with open(f'{os.getcwd()}/License_cURL.py', 'r') as file:
local_namespace = {}
exec(file.read(), globals(), local_namespace)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
if 'headers' in local_namespace:
headers = local_namespace['headers']
else:
headers = None
if 'cookies' in local_namespace:
cookies = local_namespace['cookies']
else:
cookies = None
return LicenseCURL()
def decrypt_local_cdm():
cdm_choice = CDM_Selector.select_local_cdm()
try:
device = Device.load(f'{os.getcwd()}/WVDs/{cdm_choice}')
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm = Cdm.from_device(device)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh = PSSH(input("\nPSSH: "))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
print()
license_curl = load_license_curl()
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
).content
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh)
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=challenge
).content
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
print(f"{key.kid.hex}:{key.key.hex()}")
return
def decrypt_remote_cdm():
remote_cdm_choice = CDM_Selector.select_remote_cdm()
try:
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
cdm = RemoteCdm(
device_type=config['Remote_CDMs'][remote_cdm_choice]['device_type'],
system_id=config['Remote_CDMs'][remote_cdm_choice]['system_id'],
security_level=config['Remote_CDMs'][remote_cdm_choice]['security_level'],
host=config['Remote_CDMs'][remote_cdm_choice]['host'],
secret=config['Remote_CDMs'][remote_cdm_choice]['secret'],
device_name=config['Remote_CDMs'][remote_cdm_choice]['name'],
)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh = PSSH(input("\nPSSH: "))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
print()
license_curl = load_license_curl()
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
).content
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh)
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=challenge
).content
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
print(f"{key.kid.hex}:{key.key.hex()}")
return

23
CDRM_Modules/Main_Menu.py Normal file
View File

@ -0,0 +1,23 @@
from InquirerPy import inquirer
from . import Startup_Checks
from . import Decrypt
from . import CDRM_Keys_API
def main_menu():
local_cdms, remote_cdms = Startup_Checks.check_for_wvds()
choices = []
if local_cdms:
choices.append('Local CDM Decryption')
choices.append('CDRM-Keys API Mode')
if remote_cdms:
choices.append('Remote CDM Decryption')
choice = inquirer.select(
message='CDRM-Keys:',
choices=choices,
).execute()
if choice == 'Local CDM Decryption':
Decrypt.decrypt_local_cdm()
if choice == 'Remote CDM Decryption':
Decrypt.decrypt_remote_cdm()
if choice == 'CDRM-Keys API Mode':
CDRM_Keys_API.start_cdrm_keys_api()

View File

@ -0,0 +1,218 @@
# Import dependencies
import os
import time
import requests
import yaml
import sqlite3
from colorama import Fore
import sys
import json
# Define a function to check if script is running from a virtual environment
def check_is_venv():
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
print(f'{Fore.GREEN}Virtual Environment [✓]')
return
else:
print(f'{Fore.RED}Not running Virtual Environment!\n\nPlease run CDRM-Keys in a Python Virtual Environment')
exit(1)
# Define a check to see if config.yaml exists
def check_if_config_exists():
if os.path.isfile(f'{os.getcwd()}/Config.yaml'):
print(f'{Fore.GREEN}Config.yaml [✓]')
return
else:
initial_config_data = {
'First_Run': 'True',
'Remote_CDM_API_FQDN': 'http://127.0.0.1:5000'
}
create_input = input(f'\n{Fore.YELLOW}Config.yaml not found, would you like to create a Config.yaml? [(Y)es/(N)o]: ')
if create_input:
if create_input[0].lower() == 'y':
try:
with open(f'{os.getcwd()}/Config.yaml', 'w') as file:
yaml.dump(initial_config_data, file)
print(f'{Fore.GREEN}Config.yaml created!')
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
return
else:
print(f'{Fore.YELLOW}Exiting.')
exit(1)
# Define a function to see if keys.db exists
def check_if_keys_db_exists():
if os.path.isfile(f'{os.getcwd()}/Keys.db'):
print(f'{Fore.GREEN}Keys.db [✓]')
return
else:
try:
create_input = input(f'\n{Fore.YELLOW}Keys.db not found, would you like to create a Keys.db? [(Y)es/(N)o]: ')
if create_input:
if create_input[0].lower() == 'y':
keys_db_connection = sqlite3.connect(f'{os.getcwd()}/Keys.db')
keys_db_cursor = keys_db_connection.cursor()
keys_db_cursor.execute('''
CREATE TABLE IF NOT EXISTS keys (
MPD TEXT,
PSSH TEXT,
KID TEXT,
KEY TEXT,
License_URL TEXT,
Headers TEXT,
Cookies TEXT,
Data TEXT
)
''')
keys_db_connection.commit()
keys_db_connection.close()
print(f'{Fore.GREEN}Keys.db created!')
return
else:
print(f'{Fore.YELLOW}Exiting.')
exit(1)
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define a function to check if License_cURL.py exists
def check_if_license_curl_exists():
if os.path.isfile(f'{os.getcwd()}/License_cURL.py'):
print(f'{Fore.GREEN}License_cURL.py [✓]')
return
else:
try:
create_input = input(f'\n{Fore.YELLOW}License_cURL.py not found, would you like to create a License_cURL.py? [(Y)es/(N)o]: ')
if create_input:
if create_input[0].lower() == 'y':
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:130.0) Gecko/20100101 Firefox/130.0',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
# 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Origin': 'https://bitmovin.com',
'DNT': '1',
'Sec-GPC': '1',
'Connection': 'keep-alive',
'Referer': 'https://bitmovin.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'Content-Type': 'application/x-www-form-urlencoded',
}
with open(f'{os.getcwd()}/License_cURL.py', 'w') as file:
file.write("# This file contains a sample license URL curl from BitMovin (https://bitmovin.com/demos/drm)\n")
file.write("headers = ")
json.dump(headers, file, indent=4)
file.write("\n")
print(f'{Fore.GREEN}License_cURL.py created!')
return
else:
print(f'{Fore.YELLOW}Exiting.')
exit(1)
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define a function to check if WVDs folder exists
def check_if_wvd_folder_exists():
if os.path.isdir(f'{os.getcwd()}/WVDs'):
print(f'{Fore.GREEN}WVDs Folder [✓]')
return
else:
create_input = input(f'\n{Fore.YELLOW}WVDs folder not found, would you like to create a WVDs folder? [(Y)es/(N)o]: ')
try:
if create_input:
if create_input[0].lower() == 'y':
os.mkdir(f'{os.getcwd()}/WVDs')
print(f'{Fore.GREEN}WVDs folder created!')
return
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define a function to check for CDMs
def check_for_wvds(startup: bool = False):
try:
cmds_exist = False
remote_cdms_exist = False
files_in_wvds_folder = os.listdir(f'{os.getcwd()}/WVDs')
wvds = [file for file in files_in_wvds_folder if file.endswith('.wvd')]
with open('Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
if wvds:
if startup:
print(f'{Fore.GREEN}WVDs [✓]')
cmds_exist = True
if 'Remote_CDMs' in config:
if startup:
print(f'{Fore.GREEN}Remote CDMs [✓]')
remote_cdms_exist = True
elif 'Remote_CDMs' not in config and startup:
if config['First_Run'] == 'True':
create_input = input(
f'\n{Fore.YELLOW}No remote CDM found, would you like to use CDRM-Project\'s? [(Y)es/(N)o]: ')
if create_input:
if create_input[0].lower() == 'y':
try:
cdrm_project_remote_cdm_info = requests.get(
url='https://remote-cdm.cdrm-project.com/remote_cdm'
).json()
device_details = {
'Remote_CDMs': {
'CDRM_Project_CDM_API': {
'name': cdrm_project_remote_cdm_info['name'],
'host': cdrm_project_remote_cdm_info['host'],
'device_type': cdrm_project_remote_cdm_info['device_type'],
'security_level': cdrm_project_remote_cdm_info['security_level'],
'system_id': cdrm_project_remote_cdm_info['system_id'],
'secret': cdrm_project_remote_cdm_info['secret']
}
}
}
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
config.update(device_details)
with open(f'{os.getcwd()}/Config.yaml', 'w') as ymlfile:
yaml.safe_dump(config, ymlfile)
print(f'{Fore.GREEN}CDRM-Project RemoteCDM added')
remote_cdms_exist = True
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
if not cmds_exist and not remote_cdms_exist:
print(f'{Fore.YELLOW}No WVDs available! Exiting.')
exit(1)
return cmds_exist, remote_cdms_exist
except Exception as error:
print(f'{Fore.RED}An error occurred!\n\n{error}')
exit(1)
# Define a function to run all startup checks
def run_startup_checks(startup: bool = False):
check_is_venv()
check_if_license_curl_exists()
check_if_config_exists()
check_if_keys_db_exists()
check_if_wvd_folder_exists()
check_for_wvds(startup=startup)
time.sleep(1)
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
config['First_Run'] = 'False'
with open(f'{os.getcwd()}/Config.yaml', 'w') as ymlfile:
yaml.safe_dump(config, ymlfile)
if os.name == 'nt': # For Windows
os.system('cls')
else: # For Unix-based systems (Linux, macOS)
os.system('clear')
return

5
CDRM_Modules/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from . import Startup_Checks
from . import CDRM_Keys_API
from . import Decrypt
from . import CDM_Selector
from . import Main_Menu

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
pyyaml
colorama
pywidevine
flask
google
inquirerpy
requests
httpx