CDRM-Project/main.py
2024-09-08 20:13:39 -04:00

421 lines
12 KiB
Python

# Import dependencies
from flask import Flask, render_template, request, jsonify, session, redirect, send_file, Response, current_app
import requests
import scripts
import uuid
import json
import base64
from pywidevine import __version__
from pywidevine.pssh import PSSH
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine import RemoteCdm
# Create database if it doesn't exist
scripts.create_database.create_database()
# Check for .WVD file and assign it a variable
WVD = scripts.wvd_check.check_for_wvd()
# If no WVD found, exit
if WVD is None:
WVD = 'Remote'
rcdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# Define Flask app object, give template and static arguments.
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/')
# Create a secret key for logins
app.secret_key = str(uuid.uuid4())
# Route for root '/'
@app.route("/", methods=['GET', 'POST'])
def main_page():
if request.method == 'GET':
return render_template('index.html')
elif request.method == 'POST':
# Get the JSON data from the request
data = json.loads(request.data.decode())
# Get the proxy
proxy = data['Proxy']
if proxy == '':
proxy = None
decrypt_response = scripts.decrypt.decrypt_content(
in_pssh=request.json['PSSH'],
license_url=request.json['License URL'],
headers=request.json['Headers'],
json_data=request.json['JSON'],
cookies_data=request.json['Cookies'],
input_data=request.json['Data'],
wvd=WVD,
proxy=proxy,
)
return jsonify(decrypt_response)
# Route for '/cache'
@app.route("/cache", methods=['GET', 'POST'])
def cache_page():
if request.method == 'GET':
cache_page_key_count = scripts.key_count.count_keys()
return render_template('cache.html', cache_page_key_count=cache_page_key_count)
if request.method == 'POST':
results = scripts.vault_check.check_database(pssh=request.json['PSSH'])
message = {
'Message': results
}
return jsonify(message)
@app.route("/key_count", methods=['GET'])
def key_count():
if request.method == 'GET':
results = scripts.key_count.count_keys()
results = 'Total Keys: ' + str(results)
message = {
'Message': results
}
return jsonify(message)
# Route for '/faq'
@app.route("/faq", methods=['GET'])
def faq_page():
if request.method == 'GET':
return render_template('faq.html')
@app.route("/api", methods=['GET', 'POST'])
def api_page():
if request.method == 'GET':
return render_template('api.html')
elif request.method == 'POST':
return
@app.route("/extension", methods=['GET', 'POST'])
def extension_page():
if request.method == 'GET':
return render_template('extension.html')
elif request.method == 'POST':
# Get the JSON data from the request
data = json.loads(request.data.decode())
# Get the MPD url
json_data = data['JSON']
if json_data:
try:
json_data = base64.b64decode(json_data).decode()
json_data = json.loads(json_data)
except:
json_data = json_data
# Get the proxy
proxy = data['Proxy']
if proxy == '':
proxy = None
try:
keys = scripts.extension_decrypt.decrypt_content(in_pssh=data['PSSH'], license_url=data['License URL'], headers=json.loads(data['Headers']),
wvd=WVD, scheme=data['Scheme'], proxy=proxy, json_data=json_data)
return {'Message': f'{keys}'}
except Exception as error:
return {"Message": [f'{error}']}
@app.route("/download-extension", methods=['GET', 'POST'])
def download_extension_page():
if request.method == 'GET':
file_path = 'static/assets/wvg-next-cdrm.zip'
return send_file(file_path, as_attachment=True)
elif request.method == 'POST':
version = {
'Version': '1.16'
}
return jsonify(version)
# Route for '/login'
@app.route("/login", methods=['GET', 'POST'])
def login_page():
if request.method == 'GET':
if session.get('logged_in'):
return redirect('/profile')
else:
return render_template('login.html')
if request.method == 'POST':
username = request.json['Username']
if scripts.check_user.check_username_exist(username=username.lower()):
if scripts.check_user.check_password(username=username.lower(), password=request.json['Password']):
session['logged_in'] = True
return {
'Message': 'Success'
}
else:
return {
'Message': 'Failed to Login'
}
else:
return {
'Message': 'Username does not exist'
}
# Route for '/register'
@app.route("/register", methods=['POST'])
def register():
if request.method == 'POST':
username = request.json['Username']
if username == '':
return {
'Message': 'Username cannot be empty'
}
if not scripts.check_user.check_username_exist(username=username.lower()):
scripts.check_user.insert_user(username=username.lower(), password=request.json['Password'])
session['logged_in'] = True
return {
'Message': 'Success'
}
else:
return {
'Message': 'Username already taken'
}
# Route for '/profile'
@app.route("/profile", methods=['GET'])
def profile():
if request.method == 'GET':
return render_template('profile.html')
# Route for '/devine'
@app.route("/devine", methods=['GET', 'POST', 'HEAD'])
def devine_page():
if request.method == 'GET':
if WVD != 'Remote':
cdm = Cdm.from_device(device=Device.load(WVD))
cdm_version = cdm.system_id
else:
cdm = rcdm
cdm_version = cdm.system_id
devine_service_count = scripts.key_count.get_service_count_devine()
devine_key_count = scripts.key_count.count_keys_devine()
return render_template('devine.html', cdm_version=cdm_version, devine_service_count=devine_service_count, devine_key_count=devine_key_count)
if request.method == 'POST':
if WVD != 'Remote':
cdm = Cdm.from_device(device=Device.load(WVD))
cdm_version = cdm.system_id
else:
cdm = rcdm
cdm_version = cdm.system_id
return str(cdm_version)
if request.method == 'HEAD':
response = Response(status=200)
response.headers.update({
"Server": f"https://github.com/devine-dl/pywidevine serve v{__version__}"
})
return response
# Route for '/{device}/open'
@app.route("/devine/<device>/open", methods=['GET'])
def device_open(device):
if request.method == 'GET':
if WVD != 'Remote':
cdm_device = Device.load(WVD)
cdm = current_app.config['cdms'] = Cdm.from_device(cdm_device)
else:
cdm = current_app.config['cdms'] = rcdm
session_id = cdm.open()
response_data = {
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level
}
}
}
return jsonify(response_data)
# Route for '/{device}/set_service_certificate'
@app.route("/devine/<device>/set_service_certificate", methods=['POST'])
def set_cert(device):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
# get session id
session_id = bytes.fromhex(body["session_id"])
# set service certificate
certificate = body.get("certificate")
provider_id = cdm.set_service_certificate(session_id, certificate)
response_data = {
"status": 200,
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id
}
}
return jsonify(response_data)
@app.route("/devine/<device>/get_license_challenge/<licensetype>", methods=['POST'])
def get_license_challenge_page(device, licensetype):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True)
current_app.config['PSSH'] = body['init_data']
init_data = PSSH(body["init_data"])
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
license_type=licensetype,
privacy_mode=privacy_mode
)
results = {
"status": 200,
"message": "Success",
"data": {
"challenge_b64": base64.b64encode(license_request).decode()
}
}
return jsonify(results)
@app.route("/devine/<device>/parse_license", methods=['POST'])
def parse_license(device):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
cdm.parse_license(session_id, body["license_message"])
results = {
"status": 200,
"message": "Successfully parsed and loaded the Keys from the License message."
}
return jsonify(results)
@app.route("/devine/<device>/get_keys/<key_type>", methods=['POST'])
def get_keys(device, key_type):
if request.method == 'POST':
cdm = current_app.config["cdms"]
body = request.json
session_id = bytes.fromhex(body["session_id"])
if key_type == "ALL":
key_type = None
keys = cdm.get_keys(session_id, key_type)
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
keys_json = [
{
"key_id": key.kid.hex,
"key": key.key.hex(),
"type": key.type,
"permissions": key.permissions,
}
for key in keys
if not key_type or key.type == key_type
]
results = {
"status": 200,
"message": "Success",
"data": {
"keys": keys_json
}
}
scripts.key_cache.cache_keys(pssh=current_app.config['PSSH'], keys=returned_keys)
return jsonify(results)
@app.route("/devine/<device>/close/<session_id>", methods=['GET'])
def close_session(device, session_id):
if request.method == 'GET':
cdm = current_app.config["cdms"]
session_id = bytes.fromhex(session_id)
cdm.close(session_id)
results = {
"status": 200,
"message": f"Successfully closed Session '{session_id.hex()}'."
}
return jsonify(results)
@app.route("/devine/vault/<service>", methods=['POST'])
def add_keys_devine_vault(service):
data = request.json['content_keys']
replaced = 0
inserted = 0
for key_id, key in data.items():
result = scripts.key_cache.cache_keys_devine(service=service, kid=key_id, key=key)
if result == 'inserted':
inserted += 1
if result == 'replaced':
replaced += 1
message = {
"code": 0,
"added": inserted,
"updated": replaced
}
return jsonify(message)
@app.route("/devine/vault/<service>/<kid>", methods=['GET'])
def get_key_devine_vault(service, kid):
key = scripts.vault_check.get_key_by_kid_and_service(service=service, kid=kid)
message = {
"code": 0,
"content_key": key
}
return jsonify(message)
# If the script is called directly, start the flask app.
if __name__ == '__main__':
app.run(debug=True)