CDRM-Keys/CDRM_Modules/Decrypt.py
2024-09-17 01:41:54 -04:00

301 lines
11 KiB
Python

import base64
import httpx
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
from pywidevine.remotecdm import RemoteCdm
import yaml
import os
from colorama import Fore
import re
from . import CDM_Selector
from . import Get_PSSH
from . import Database
from InquirerPy import inquirer
def load_license_curl():
class LicenseCURL:
def __init__(self):
self.headers = headers
self.cookies = cookies
self.data = data
self.json_data = json_data
try:
with open(f'{os.getcwd()}/License_cURL.py', 'r') as file:
local_namespace = {}
exec(file.read(), globals(), local_namespace)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
if 'headers' in local_namespace:
headers = local_namespace['headers']
else:
headers = None
if 'cookies' in local_namespace:
cookies = local_namespace['cookies']
else:
cookies = None
if 'data' in local_namespace:
data = local_namespace['data']
else:
data = None
if 'json_data' in local_namespace:
json_data = local_namespace['json_data']
else:
json_data = None
return LicenseCURL()
def check_if_url(string):
# Regular expression for validating a URL
url_pattern = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', # optional path
re.IGNORECASE)
return re.match(url_pattern, string) is not None
def replace_widevine_challenge(dictionary, challenge):
for key, value in dictionary.items():
if 'widevine' in key.lower() or 'license' in key.lower():
dictionary[key] = challenge
elif isinstance(value, dict):
replace_widevine_challenge(value, challenge)
def find_widevine_license(dictionary):
for key, value in dictionary.items():
if 'widevine' in key.lower() or 'license' in key.lower():
license = dictionary[key].replace("-", "+").replace("_", "/")
return license
elif isinstance(value, dict):
find_widevine_license(value)
def decrypt_local_cdm():
cdm_choice = CDM_Selector.select_local_cdm()
try:
device = Device.load(f'{os.getcwd()}/WVDs/{cdm_choice}')
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm = Cdm.from_device(device)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh_or_mpd_input = input("MPD Link or PSSH: ")
if check_if_url(pssh_or_mpd_input):
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
if len(pssh_list) > 1:
choice = inquirer.select(
message="Select PSSH:",
choices=pssh_list,
).execute()
db_pssh = choice
db_mpd = pssh_or_mpd_input
pssh = PSSH(choice)
else:
if pssh_list:
db_pssh = pssh_list[0]
db_mpd = pssh_or_mpd_input
pssh = PSSH(pssh_list[0])
else:
db_pssh = pssh_or_mpd_input
db_mpd = None
pssh = PSSH(pssh_or_mpd_input)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
db_url = user_license_url
print()
license_curl = load_license_curl()
db_headers = license_curl.headers
db_cookies = license_curl.cookies
service_cert_set_success = False
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
)
try:
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content)
service_cert_set_success = True
except Exception as error:
try:
cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json()))
service_cert_set_success = True
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
db_data = challenge
if license_curl.data:
try:
replace_widevine_challenge(license_curl.data, base64.b64encode(challenge).decode())
lic_data = license_curl.data
except:
pass
else:
lic_data = None
if license_curl.json_data:
try:
replace_widevine_challenge(license_curl.json_data, base64.b64encode(challenge).decode())
json_data = license_curl.json_data
except:
pass
else:
json_data = None
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
json=json_data,
data=challenge if json_data is None and lic_data is None else lic_data
)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response.content)
except Exception as error:
try:
cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json()))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
print(f"{key.kid.hex}:{key.key.hex()}")
return
def decrypt_remote_cdm():
remote_cdm_choice = CDM_Selector.select_remote_cdm()
try:
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
cdm = RemoteCdm(
device_type=config['Remote_CDMs'][remote_cdm_choice]['device_type'],
system_id=config['Remote_CDMs'][remote_cdm_choice]['system_id'],
security_level=config['Remote_CDMs'][remote_cdm_choice]['security_level'],
host=config['Remote_CDMs'][remote_cdm_choice]['host'],
secret=config['Remote_CDMs'][remote_cdm_choice]['secret'],
device_name=config['Remote_CDMs'][remote_cdm_choice]['name'],
)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh_or_mpd_input = input("MPD Link or PSSH: ")
if check_if_url(pssh_or_mpd_input):
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
if len(pssh_list) > 1:
choice = inquirer.select(
message="Select PSSH:",
choices=pssh_list,
).execute()
db_pssh = choice
db_mpd = pssh_or_mpd_input
pssh = PSSH(choice)
else:
if pssh_list:
db_pssh = pssh_list[0]
db_mpd = pssh_or_mpd_input
pssh = PSSH(pssh_list[0])
else:
db_pssh = pssh_or_mpd_input
db_mpd = None
pssh = PSSH(pssh_or_mpd_input)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
db_url = user_license_url
print()
license_curl = load_license_curl()
db_headers = license_curl.headers
db_cookies = license_curl.cookies
set_service_certificate_success = False
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
)
try:
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content)
set_service_certificate_success = True
except:
try:
cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json()))
set_service_certificate_success = True
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=set_service_certificate_success)
db_data = challenge
if license_curl.data:
try:
replace_widevine_challenge(license_curl.data, base64.b64encode(challenge).decode())
lic_data = license_curl.data
except:
pass
else:
lic_data = None
if license_curl.json_data:
try:
replace_widevine_challenge(license_curl.json_data, base64.b64encode(challenge).decode())
json_data = license_curl.json_data
except:
pass
else:
json_data = None
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
json=json_data,
data=challenge if json_data is None and lic_data is None else lic_data
)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response.content)
except Exception as error:
try:
cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json()))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
print(f"{key.kid.hex}:{key.key.hex()}")
return