CDRM-Project/scripts/decrypt.py
2024-09-08 20:13:39 -04:00

221 lines
7.4 KiB
Python

# import dependencies
import base64
import json
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
from pywidevine import RemoteCdm
import requests
import ast
from . import key_cache
# Define a function to clean dictionary values sent as string from the post request
def clean_my_dict(dirty_dict: str = None):
header_string = f"'''" \
f"{dirty_dict}" \
f"'''"
cleaned_string = '\n'.join(line for line in header_string.split('\n') if not line.strip().startswith('#'))
clean_dict = ast.literal_eval(cleaned_string)
return clean_dict
# Defining decrypt function
def decrypt_content(in_pssh: str = None, license_url: str = None,
headers: str = None, json_data: str = None, cookies_data: str = None, input_data: str = None, wvd: str = None, proxy: str = None,):
# prepare pssh
try:
pssh = PSSH(in_pssh)
except Exception as error:
return {
'Message': str(error)
}
if wvd != 'Remote':
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
else:
cdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# open CDM session
session_id = cdm.open()
# Generate the challenge
challenge = cdm.get_license_challenge(session_id, pssh)
challenge_not_in_data = False
extra_data_challenge = False
if headers != '':
try:
headers = ast.literal_eval(clean_my_dict(dirty_dict=headers))
# Iterate through the dictionary
for key, value in headers.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
headers[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'Headers could not be loaded correctly, please make sure they are formatted in python dictionary'
}
if json_data != '':
try:
json_data = ast.literal_eval(clean_my_dict(dirty_dict=json_data))
# Iterate through the dictionary
for key, value in json_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
json_data[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'JSON could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
if cookies_data != '':
try:
cookies_data = ast.literal_eval(clean_my_dict(dirty_dict=cookies_data))
# Iterate through the dictionary
for key, value in cookies_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
cookies_data[key] = base64.b64encode(challenge).decode()
challenge_not_in_data = True
except:
return {
'Message': 'Cookies could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
if input_data != '':
try:
input_data = ast.literal_eval(clean_my_dict(dirty_dict=input_data))
# Iterate through the dictionary
for key, value in input_data.items():
# Check if the value is '!Challenge'
if value == '!Challenge':
# Replace the value with something else
input_data[key] = base64.b64encode(challenge).decode()
extra_data_challenge = True
except:
return {
'Message': 'Data could not be loaded correctly, please make sure they are formatted in python dictionary format'
}
# Try statement here, probably the most common point of failure
try:
if extra_data_challenge == False:
if challenge_not_in_data == False:
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=challenge,
proxies={
'http': proxy,
}
)
else:
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
proxies={
'http': proxy,
}
)
else:
print("Extra challenge!!")
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=input_data,
proxies={
'http': proxy,
}
)
if license.status_code != 200:
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
cookies=cookies_data,
data=json.dumps(input_data),
proxies={
'http': proxy,
}
)
except Exception as error:
return {
'Message': f'An error occured {error}\n\n{license.content}'
}
# Another try statement to parse licenses
try:
cdm.parse_license(session_id, license.content)
except:
try:
cdm.parse_license(session_id, license.json().get('license'))
except:
try:
replaced_license = license.json()["license"].replace("-", "+").replace("_", "/")
cdm.parse_license(session_id, replaced_license)
except:
try:
cdm.parse_license(session_id, license.json().get('licenseData'))
except:
try:
cdm.parse_license(session_id, license.json().get('widevine2License'))
except:
try:
cdm.parse_license(session_id, license.json().get('license')[0])
except Exception as error:
return {
'Message': f'An error occured {error}\n\n{license.content}'
}
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=returned_keys)
# Return the keys
return {
'Message': returned_keys
}