This commit is contained in:
CDM-Project 2024-09-08 20:13:39 -04:00
commit 0e04cb1285
38 changed files with 2432 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
.idea
/databases/key_cache.db
/databases/WVDs/*
/static/css/tailwind.css
/package.json
/package-lock.json
/tailwind.config.js
/databases/users.db
*.pyc
/databases/devine.db

28
README.md Normal file
View File

@ -0,0 +1,28 @@
## CDRM-Project
![forthebadge](https://forthebadge.com/images/badges/uses-html.svg) ![forthebadge](https://forthebadge.com/images/badges/uses-css.svg) ![forthebadge](https://forthebadge.com/images/badges/uses-javascript.svg) ![forthebadge](https://forthebadge.com/images/badges/made-with-python.svg)
## What is this?
An open source web application written in python to decrypt Widevine protected content.
## Prerequisites
- [Python](https://www.python.org/downloads/) with PIP installed
> Python 3.12 was used at the time of writing
## Installation
- Open your terminal and navigate to where you'd like to store the application
- Create a new python virtual environment using `python -m venv CDRM-Project`
- Change directory into the new `CDRM-Project` folder
- Activate the virtual environment
> Windows - change directory into the `Scripts` directory then `activate.bat`
>
> Linux - `source bin/activate`
- Extract CDRM-Project 2.0 git contents into the newly created `CDRM-Project` folder
- Install python dependencies `pip install -r requirements.txt`
- (Optional) Place your .WVD file into `/databases/WVDs`
- Run the application `python main.py`

420
main.py Normal file
View File

@ -0,0 +1,420 @@
# Import dependencies
from flask import Flask, render_template, request, jsonify, session, redirect, send_file, Response, current_app
import requests
import scripts
import uuid
import json
import base64
from pywidevine import __version__
from pywidevine.pssh import PSSH
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine import RemoteCdm
# Create database if it doesn't exist
scripts.create_database.create_database()
# Check for .WVD file and assign it a variable
WVD = scripts.wvd_check.check_for_wvd()
# If no WVD found, exit
if WVD is None:
WVD = 'Remote'
rcdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# Define Flask app object, give template and static arguments.
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/')
# Create a secret key for logins
app.secret_key = str(uuid.uuid4())
# Route for root '/'
@app.route("/", methods=['GET', 'POST'])
def main_page():
if request.method == 'GET':
return render_template('index.html')
elif request.method == 'POST':
# Get the JSON data from the request
data = json.loads(request.data.decode())
# Get the proxy
proxy = data['Proxy']
if proxy == '':
proxy = None
decrypt_response = scripts.decrypt.decrypt_content(
in_pssh=request.json['PSSH'],
license_url=request.json['License URL'],
headers=request.json['Headers'],
json_data=request.json['JSON'],
cookies_data=request.json['Cookies'],
input_data=request.json['Data'],
wvd=WVD,
proxy=proxy,
)
return jsonify(decrypt_response)
# Route for '/cache'
@app.route("/cache", methods=['GET', 'POST'])
def cache_page():
if request.method == 'GET':
cache_page_key_count = scripts.key_count.count_keys()
return render_template('cache.html', cache_page_key_count=cache_page_key_count)
if request.method == 'POST':
results = scripts.vault_check.check_database(pssh=request.json['PSSH'])
message = {
'Message': results
}
return jsonify(message)
@app.route("/key_count", methods=['GET'])
def key_count():
if request.method == 'GET':
results = scripts.key_count.count_keys()
results = 'Total Keys: ' + str(results)
message = {
'Message': results
}
return jsonify(message)
# Route for '/faq'
@app.route("/faq", methods=['GET'])
def faq_page():
if request.method == 'GET':
return render_template('faq.html')
@app.route("/api", methods=['GET', 'POST'])
def api_page():
if request.method == 'GET':
return render_template('api.html')
elif request.method == 'POST':
return
@app.route("/extension", methods=['GET', 'POST'])
def extension_page():
if request.method == 'GET':
return render_template('extension.html')
elif request.method == 'POST':
# Get the JSON data from the request
data = json.loads(request.data.decode())
# Get the MPD url
json_data = data['JSON']
if json_data:
try:
json_data = base64.b64decode(json_data).decode()
json_data = json.loads(json_data)
except:
json_data = json_data
# Get the proxy
proxy = data['Proxy']
if proxy == '':
proxy = None
try:
keys = scripts.extension_decrypt.decrypt_content(in_pssh=data['PSSH'], license_url=data['License URL'], headers=json.loads(data['Headers']),
wvd=WVD, scheme=data['Scheme'], proxy=proxy, json_data=json_data)
return {'Message': f'{keys}'}
except Exception as error:
return {"Message": [f'{error}']}
@app.route("/download-extension", methods=['GET', 'POST'])
def download_extension_page():
if request.method == 'GET':
file_path = 'static/assets/wvg-next-cdrm.zip'
return send_file(file_path, as_attachment=True)
elif request.method == 'POST':
version = {
'Version': '1.16'
}
return jsonify(version)
# Route for '/login'
@app.route("/login", methods=['GET', 'POST'])
def login_page():
if request.method == 'GET':
if session.get('logged_in'):
return redirect('/profile')
else:
return render_template('login.html')
if request.method == 'POST':
username = request.json['Username']
if scripts.check_user.check_username_exist(username=username.lower()):
if scripts.check_user.check_password(username=username.lower(), password=request.json['Password']):
session['logged_in'] = True
return {
'Message': 'Success'
}
else:
return {
'Message': 'Failed to Login'
}
else:
return {
'Message': 'Username does not exist'
}
# Route for '/register'
@app.route("/register", methods=['POST'])
def register():
if request.method == 'POST':
username = request.json['Username']
if username == '':
return {
'Message': 'Username cannot be empty'
}
if not scripts.check_user.check_username_exist(username=username.lower()):
scripts.check_user.insert_user(username=username.lower(), password=request.json['Password'])
session['logged_in'] = True
return {
'Message': 'Success'
}
else:
return {
'Message': 'Username already taken'
}
# Route for '/profile'
@app.route("/profile", methods=['GET'])
def profile():
if request.method == 'GET':
return render_template('profile.html')
# Route for '/devine'
@app.route("/devine", methods=['GET', 'POST', 'HEAD'])
def devine_page():
if request.method == 'GET':
if WVD != 'Remote':
cdm = Cdm.from_device(device=Device.load(WVD))
cdm_version = cdm.system_id
else:
cdm = rcdm
cdm_version = cdm.system_id
devine_service_count = scripts.key_count.get_service_count_devine()
devine_key_count = scripts.key_count.count_keys_devine()
return render_template('devine.html', cdm_version=cdm_version, devine_service_count=devine_service_count, devine_key_count=devine_key_count)
if request.method == 'POST':
if WVD != 'Remote':
cdm = Cdm.from_device(device=Device.load(WVD))
cdm_version = cdm.system_id
else:
cdm = rcdm
cdm_version = cdm.system_id
return str(cdm_version)
if request.method == 'HEAD':
response = Response(status=200)
response.headers.update({
"Server": f"https://github.com/devine-dl/pywidevine serve v{__version__}"
})
return response
# Route for '/{device}/open'
@app.route("/devine/<device>/open", methods=['GET'])
def device_open(device):
if request.method == 'GET':
if WVD != 'Remote':
cdm_device = Device.load(WVD)
cdm = current_app.config['cdms'] = Cdm.from_device(cdm_device)
else:
cdm = current_app.config['cdms'] = rcdm
session_id = cdm.open()
response_data = {
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level
}
}
}
return jsonify(response_data)
# Route for '/{device}/set_service_certificate'
@app.route("/devine/<device>/set_service_certificate", methods=['POST'])
def set_cert(device):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
# get session id
session_id = bytes.fromhex(body["session_id"])
# set service certificate
certificate = body.get("certificate")
provider_id = cdm.set_service_certificate(session_id, certificate)
response_data = {
"status": 200,
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id
}
}
return jsonify(response_data)
@app.route("/devine/<device>/get_license_challenge/<licensetype>", methods=['POST'])
def get_license_challenge_page(device, licensetype):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True)
current_app.config['PSSH'] = body['init_data']
init_data = PSSH(body["init_data"])
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
license_type=licensetype,
privacy_mode=privacy_mode
)
results = {
"status": 200,
"message": "Success",
"data": {
"challenge_b64": base64.b64encode(license_request).decode()
}
}
return jsonify(results)
@app.route("/devine/<device>/parse_license", methods=['POST'])
def parse_license(device):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
cdm.parse_license(session_id, body["license_message"])
results = {
"status": 200,
"message": "Successfully parsed and loaded the Keys from the License message."
}
return jsonify(results)
@app.route("/devine/<device>/get_keys/<key_type>", methods=['POST'])
def get_keys(device, key_type):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
if key_type == "ALL":
key_type = None
keys = cdm.get_keys(session_id, key_type)
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
keys_json = [
{
"key_id": key.kid.hex,
"key": key.key.hex(),
"type": key.type,
"permissions": key.permissions,
}
for key in keys
if not key_type or key.type == key_type
]
results = {
"status": 200,
"message": "Success",
"data": {
"keys": keys_json
}
}
scripts.key_cache.cache_keys(pssh=current_app.config['PSSH'], keys=returned_keys)
return jsonify(results)
@app.route("/devine/<device>/close/<session_id>", methods=['GET'])
def close_session(device, session_id):
if request.method == 'GET':
cdm = current_app.config["cdms"]
session_id = bytes.fromhex(session_id)
cdm.close(session_id)
results = {
"status": 200,
"message": f"Successfully closed Session '{session_id.hex()}'."
}
return jsonify(results)
@app.route("/devine/vault/<service>", methods=['POST'])
def add_keys_devine_vault(service):
data = request.json['content_keys']
replaced = 0
inserted = 0
for key_id, key in data.items():
result = scripts.key_cache.cache_keys_devine(service=service, kid=key_id, key=key)
if result == 'inserted':
inserted += 1
if result == 'replaced':
replaced += 1
message = {
"code": 0,
"added": inserted,
"updated": replaced
}
return jsonify(message)
@app.route("/devine/vault/<service>/<kid>", methods=['GET'])
def get_key_devine_vault(service, kid):
key = scripts.vault_check.get_key_by_kid_and_service(service=service, kid=kid)
message = {
"code": 0,
"content_key": key
}
return jsonify(message)
# If the script is called directly, start the flask app.
if __name__ == '__main__':
app.run(debug=True)

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
Flask
pywidevine
requests
tls-client
typing-extensions

9
scripts/__init__.py Normal file
View File

@ -0,0 +1,9 @@
from . import create_database
from . import wvd_check
from . import decrypt
from . import vault_check
from . import key_cache
from . import check_user
from . import gen_hash
from . import key_count
from . import extension_decrypt

65
scripts/check_user.py Normal file
View File

@ -0,0 +1,65 @@
import sqlite3
import os
from . import gen_hash
def check_username_exist(username):
# Connect to the SQLite database
conn = sqlite3.connect(f'{os.getcwd()}/databases/users.db')
cursor = conn.cursor()
# Execute a SELECT query to check if the username exists
cursor.execute("SELECT * FROM DATABASE WHERE username=?", (username,))
# Fetch the result
result = cursor.fetchone()
# Close the database connection
conn.close()
# If result is not None, username exists in the database, return True
if result:
return True
else:
return False
def check_password(username, password):
# Connect to the SQLite database
conn = sqlite3.connect(f'{os.getcwd()}/databases/users.db')
cursor = conn.cursor()
# Execute a SELECT query to retrieve the password for the given username
cursor.execute("SELECT hashedpassword FROM DATABASE WHERE username=?", (username,))
# Fetch the result
result = cursor.fetchone()
# Close the database connection
conn.close()
# If result is not None, username exists in the database, return the password
if result[0]:
if result[0] == gen_hash.generate_md5(username.lower(), password):
return True
else:
return False
def insert_user(username, password):
# Connect to the SQLite database
conn = sqlite3.connect(f'{os.getcwd()}/databases/users.db')
cursor = conn.cursor()
try:
# Execute an INSERT query to insert the username and password into the users table
cursor.execute("INSERT INTO DATABASE (username, hashedpassword) VALUES (?, ?)", (username.lower(), gen_hash.generate_md5(username.lower(), password)))
# Commit the transaction
conn.commit()
except sqlite3.Error as e:
print(e)
# Rollback the transaction in case of any error
conn.rollback()
finally:
# Close the database connection
conn.close()

View File

@ -0,0 +1,54 @@
# import dependencies
import sqlite3
import os
# Check to see if the database already exists, if not create a databases folder, and create the database.
def create_database():
# Check to see if the "databases" directory exists, if not creates it
if "databases" not in os.listdir():
os.makedirs('databases')
# Change to the databases directory
os.chdir("databases")
# Check to see if a database exists in databases directory, if not create it
if not os.path.isfile("key_cache.db"):
# Connect / Create "key_cache.db"
dbconnection = sqlite3.connect("key_cache.db")
# Create cursor
dbcursor = dbconnection.cursor()
# Create table through the cursor
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
# Close the connection
dbconnection.close()
# Check to see if a database exists in databases directory, if not create it
if not os.path.isfile("users.db"):
# Connect / Create "key_cache.db"
dbconnection = sqlite3.connect("users.db")
# Create cursor
dbcursor = dbconnection.cursor()
# Create table through the cursor
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "username" TEXT, "hashedpassword" TEXT, '
'"cached_keys" TEXT, PRIMARY KEY("username") )')
# Close the connection
dbconnection.close()
# Check to see if a database exists in databases directory, if not create it
if not os.path.isfile("devine.db"):
dbconnection = sqlite3.connect("devine.db")
dbcursor = dbconnection.cursor()
dbcursor.execute('CREATE TABLE IF NOT EXISTS "vault" ( "service" TEXT, "kid" TEXT, '
'"key" TEXT, PRIMARY KEY("kid") )')
# Change back to root directory
os.chdir(os.path.join(os.getcwd(), ".."))

220
scripts/decrypt.py Normal file
View File

@ -0,0 +1,220 @@
# import dependencies
import base64
import json
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
from pywidevine import RemoteCdm
import requests
import ast
from . import key_cache
# Define a function to clean dictionary values sent as string from the post request
def clean_my_dict(dirty_dict: str = None):
header_string = f"'''" \
f"{dirty_dict}" \
f"'''"
cleaned_string = '\n'.join(line for line in header_string.split('\n') if not line.strip().startswith('#'))
clean_dict = ast.literal_eval(cleaned_string)
return clean_dict
# Defining decrypt function
def decrypt_content(in_pssh: str = None, license_url: str = None,
headers: str = None, json_data: str = None, cookies_data: str = None, input_data: str = None, wvd: str = None, proxy: str = None,):
# prepare pssh
try:
pssh = PSSH(in_pssh)
except Exception as error:
return {
'Message': str(error)
}
if wvd != 'Remote':
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
else:
cdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# open CDM session
session_id = cdm.open()
# Generate the challenge
challenge = cdm.get_license_challenge(session_id, pssh)
challenge_not_in_data = False
extra_data_challenge = False
if headers != '':
try:
headers = ast.literal_eval(clean_my_dict(dirty_dict=headers))
# Iterate through the dictionary
for key, value in headers.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
headers[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'Headers could not be loaded correctly, please make sure they are formatted in python dictionary'
}
if json_data != '':
try:
json_data = ast.literal_eval(clean_my_dict(dirty_dict=json_data))
# Iterate through the dictionary
for key, value in json_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
json_data[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'JSON could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
if cookies_data != '':
try:
cookies_data = ast.literal_eval(clean_my_dict(dirty_dict=cookies_data))
# Iterate through the dictionary
for key, value in cookies_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
cookies_data[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'Cookies could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
if input_data != '':
try:
input_data = ast.literal_eval(clean_my_dict(dirty_dict=input_data))
# Iterate through the dictionary
for key, value in input_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
input_data[key] = base64.b64encode(challenge).decode()
extra_data_challenge = True
except:
return {
'Message': 'Data could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
# Try statement here, probably the most common point of failure
try:
if extra_data_challenge == False:
if challenge_not_in_data == False:
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=challenge,
proxies={
'http': proxy,
}
)
else:
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
proxies={
'http': proxy,
}
)
else:
print("Extra challenge!!")
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=input_data,
proxies={
'http': proxy,
}
)
if license.status_code != 200:
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=json.dumps(input_data),
proxies={
'http': proxy,
}
)
except Exception as error:
return {
'Message': f'An error occured {error}\n\n{license.content}'
}
# Another try statement to parse licenses
try:
cdm.parse_license(session_id, license.content)
except:
try:
cdm.parse_license(session_id, license.json().get('license'))
except:
try:
replaced_license = license.json()["license"].replace("-", "+").replace("_", "/")
cdm.parse_license(session_id, replaced_license)
except:
try:
cdm.parse_license(session_id, license.json().get('licenseData'))
except:
try:
cdm.parse_license(session_id, license.json().get('widevine2License'))
except:
try:
cdm.parse_license(session_id, license.json().get('license')[0])
except Exception as error:
return {
'Message': f'An error occured {error}\n\n{license.content}'
}
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=returned_keys)
# Return the keys
return {
'Message': returned_keys
}

View File

@ -0,0 +1,389 @@
# import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
from pywidevine import RemoteCdm
import requests
from . import key_cache
import base64
import tls_client
from xml.etree import ElementTree as ET
# Defining decrypt function
def decrypt_content(in_pssh: str = None, license_url: str = None, headers: dict = None, json_data: dict = None, wvd: str = None, scheme: str = None, proxy: str = None,):
# prepare pssh
pssh = PSSH(in_pssh)
if wvd != 'Remote':
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
else:
cdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# open CDM session
session_id = cdm.open()
# Generate the challenge
challenge = cdm.get_license_challenge(session_id, pssh)
if scheme == 'CommonWV':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
data=challenge,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.content)
except:
# Exception, try to find by regex via json
try:
cdm.parse_license(session_id, license.json()['license'])
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Amazon':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
data={
'widevine2Challenge': f'{base64.b64encode(challenge).decode()}',
'includeHdcpTestKeyInLicense': 'true',
},
proxies={
'http': proxy,
}
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['widevine2License']['license'])
except Exception as error:
return [f'{error}\n\n{license.content}']
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'YouTube':
json_data['licenseRequest'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
}
)
# Extract license from json dict
license = license.json()["license"].replace("-", "+").replace("_", "/")
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license)
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'RTE':
json_data['getWidevineLicense']['widevineChallenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
}
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['getWidevineLicenseResponse']['license'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Canal+':
try:
json_data['ServiceRequest']['InData']['ChallengeInfo'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data
)
# Parse the license
try:
cdm.parse_license(session_id, license.json()['ServiceResponse']['OutData']['LicenseInfo'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
except:
try:
session = tls_client.Session()
response = session.post(
url=license_url,
headers=headers,
data=f'{base64.b64encode(challenge).decode()}'
).content.decode()
# Define the namespace
namespace = {'ns': 'http://www.canal-plus.com/DRM/V1'}
# Parse the XML string
root = ET.fromstring(response)
# Find the license element using the namespace
license_element = root.find('.//ns:license', namespace)
# Extract the text content of the license element
license_content = license_element.text
# Parse the license
try:
cdm.parse_license(session_id, license_content)
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
except Exception as error:
return [f'{error}\n\n{license.content}']
if scheme == 'NosTV':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json={
'challenge': base64.b64encode(challenge).decode(),
},
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['license'][0])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'AstroGo':
# Insert the challenge
json_data['licenseChallenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.content)
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'])
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'][0])
except:
try:
cdm.parse_license(session_id, str(license.json()['licenseData'][0]))
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Vodafone':
print(json_data)
# Find the challenge field
for request in json_data['requests']:
if request['action'] == 'license':
request['params']['challenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['license'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys

11
scripts/gen_hash.py Normal file
View File

@ -0,0 +1,11 @@
import hashlib
def generate_md5(user_login, user_pass):
# Concatenate username and password
data = user_login.encode() + user_pass.encode()
# Create MD5 hash
user_md5_hash = hashlib.md5(data).hexdigest()
return user_md5_hash

50
scripts/key_cache.py Normal file
View File

@ -0,0 +1,50 @@
# Import dependencies
import sqlite3
import os
# Define cache function
def cache_keys(pssh: str, keys: str):
# Connect to database
dbconnection = sqlite3.connect(f"{os.getcwd()}/databases/key_cache.db")
# Initialize a cursor
dbcursor = dbconnection.cursor()
# Insert PSSH and keys
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys))
# Commit the changes
dbconnection.commit()
# Close the connection
dbconnection.close()
def cache_keys_devine(service: str, kid: str, key: str):
# Connect to database
dbconnection = sqlite3.connect(f"{os.getcwd()}/databases/devine.db")
# Initialize a cursor
dbcursor = dbconnection.cursor()
# Check if the key already exists
dbcursor.execute("SELECT COUNT(*) FROM vault WHERE service = ? AND kid = ?", (service, kid))
row = dbcursor.fetchone()
if row[0] > 0:
# Key already exists, perform REPLACE operation
dbcursor.execute("REPLACE INTO vault VALUES (?, ?, ?)", (service, kid, key))
operation = "replaced"
else:
# Key does not exist, perform INSERT operation
dbcursor.execute("INSERT INTO vault VALUES (?, ?, ?)", (service, kid, key))
operation = "inserted"
# Commit the changes
dbconnection.commit()
# Close the connection
dbconnection.close()
return operation

67
scripts/key_count.py Normal file
View File

@ -0,0 +1,67 @@
# Import dependencies
import sqlite3
import os
# Define cache function
def count_keys():
# Connect to database
dbconnection = sqlite3.connect(f"{os.getcwd()}/databases/key_cache.db")
# Initialize a cursor
dbcursor = dbconnection.cursor()
# Get the count
dbcursor.execute("SELECT COUNT (*) FROM database")
# Get the result
result = list(dbcursor)
# Convert it into a string
formatted_result = str(result)
# Strip special characters
stripped_result = formatted_result.strip("[](),")
# return the result
return stripped_result
def count_keys_devine():
# Connect to the database
connection = sqlite3.connect(f"{os.getcwd()}/databases/devine.db")
cursor = connection.cursor()
# Execute the query to count total number of keys
cursor.execute("SELECT COUNT(kid) FROM vault")
# Fetch the result
total_keys = cursor.fetchone()[0]
# Close the cursor and connection
cursor.close()
connection.close()
# return the key count
return total_keys
def get_service_count_devine():
# Connect to the database
connection = sqlite3.connect(f"{os.getcwd()}/databases/devine.db")
cursor = connection.cursor()
# Execute the query to count unique services
cursor.execute("SELECT COUNT(DISTINCT service) FROM vault")
# Fetch the result
count = cursor.fetchone()[0]
# Close the cursor and connection
cursor.close()
connection.close()
# return the count
return count

68
scripts/vault_check.py Normal file
View File

@ -0,0 +1,68 @@
# Import dependencies
import sqlite3
import os
# Define check database function
def check_database(pssh: str):
# Connect to DB
dbconnection = sqlite3.connect(f"{os.getcwd()}/databases/key_cache.db")
# Initialize a cursor object
dbcursor = dbconnection.cursor()
# Find PSSH
dbcursor.execute("SELECT keys FROM database WHERE pssh = :pssh", {"pssh": pssh})
# Fetch all results
vaultkeys = dbcursor.fetchall()
# If any found
if vaultkeys:
# Assign variable
vaultkey = str(vaultkeys[0])
# Strip of sqlite special characters
stripped_vault_key = vaultkey.strip(",'()")
# Remove double \\ for single \
formatted_vault_key = stripped_vault_key.replace('\\n', '\n')
# Close the connections
dbconnection.close()
return formatted_vault_key
# If no keys found
else:
# Close the connection
dbconnection.close()
# Return not found
return "Not found"
def get_key_by_kid_and_service(service: str, kid: str):
# Connect to the database
dbconnection = sqlite3.connect(f"{os.getcwd()}/databases/devine.db")
dbcursor = dbconnection.cursor()
# Execute the SELECT query
dbcursor.execute("SELECT key FROM vault WHERE service = ? AND kid = ?", (service, kid))
# Fetch the result
result = dbcursor.fetchone()
# Close the connection
dbconnection.close()
# If result is None, no matching key found
if result is None:
return None
else:
return result[0] # Returning the key

22
scripts/wvd_check.py Normal file
View File

@ -0,0 +1,22 @@
# Import dependencies
import os
import glob
# Define WVD device check
def check_for_wvd():
try:
# Use glob to get the name of the .wvd
extracted_device = glob.glob(f'{os.getcwd()}/databases/WVDs/*.wvd')[0]
# Return the device path
return extracted_device
except:
# Check to see if the WVDs folder exist, if not create it
if 'WVDs' not in os.listdir(fr'{os.getcwd()}/databases'):
os.makedirs(f'{os.getcwd()}/databases/WVDs')
# Stop the program and print out instructions
return None

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.3 MiB

Binary file not shown.

33
static/css/api.css Normal file
View File

@ -0,0 +1,33 @@
main {
display: grid;
grid-template-rows: auto auto 2fr;
margin-top: 5%;
}
h1 {
color: white;
justify-self: center;
}
h2 {
color: white;
justify-self: center;
border-bottom-style: solid;
border-bottom-width: thin;
}
div {
display: grid;
}
pre {
color: white;
justify-self: center;
border-style: solid;
text-align: left;
border-radius: 2%;
border-color: rgba(0, 0, 0, 0.3);
background-color: rgba(0, 0, 0, 0.1);
padding: 1%;
width: auto;
}