CDRM-Keys/CDRM_Modules/Decrypt.py

276 lines
10 KiB
Python
Raw Normal View History

import base64
2024-09-16 17:28:01 +00:00
import httpx
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
from pywidevine.remotecdm import RemoteCdm
import yaml
import os
from colorama import Fore
import re
2024-09-16 17:28:01 +00:00
from . import CDM_Selector
from . import Get_PSSH
from . import Database
from InquirerPy import inquirer
2024-09-16 17:28:01 +00:00
def load_license_curl():
class LicenseCURL:
def __init__(self):
self.headers = headers
self.cookies = cookies
self.data = data
2024-09-16 17:28:01 +00:00
try:
with open(f'{os.getcwd()}/License_cURL.py', 'r') as file:
local_namespace = {}
exec(file.read(), globals(), local_namespace)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
if 'headers' in local_namespace:
headers = local_namespace['headers']
else:
headers = None
if 'cookies' in local_namespace:
cookies = local_namespace['cookies']
else:
cookies = None
if 'data' in local_namespace:
data = local_namespace['data']
else:
data = None
2024-09-16 17:28:01 +00:00
return LicenseCURL()
def check_if_url(string):
# Regular expression for validating a URL
url_pattern = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', # optional path
re.IGNORECASE)
return re.match(url_pattern, string) is not None
def replace_widevine_challenge(dictionary, challenge):
for key, value in dictionary.items():
if 'widevine' in key.lower():
dictionary[key] = challenge
elif isinstance(value, dict):
replace_widevine_value(value, challenge_type)
def find_widevine_license(dictionary):
for key, value in dictionary.items():
if 'widevine' in key.lower() or 'license' in key.lower():
license = dictionary[key]
return license
elif isinstance(value, dict):
replace_widevine_value(value, challenge_type)
2024-09-16 17:28:01 +00:00
def decrypt_local_cdm():
cdm_choice = CDM_Selector.select_local_cdm()
try:
device = Device.load(f'{os.getcwd()}/WVDs/{cdm_choice}')
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm = Cdm.from_device(device)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh_or_mpd_input = input("MPD Link or PSSH: ")
if check_if_url(pssh_or_mpd_input):
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
if len(pssh_list) > 1:
choice = inquirer.select(
message="Select PSSH:",
choices=pssh_list,
).execute()
db_pssh = choice
db_mpd = pssh_or_mpd_input
pssh = PSSH(choice)
else:
if pssh_list:
db_pssh = pssh_list[0]
db_mpd = pssh_or_mpd_input
pssh = PSSH(pssh_list[0])
else:
db_pssh = pssh_or_mpd_input
db_mpd = None
pssh = PSSH(pssh_or_mpd_input)
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
db_url = user_license_url
2024-09-16 17:28:01 +00:00
print()
license_curl = load_license_curl()
db_headers = license_curl.headers
db_cookies = license_curl.cookies
service_cert_set_success = False
2024-09-16 17:28:01 +00:00
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
)
try:
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content)
service_cert_set_success = True
except Exception as error:
try:
cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json()))
service_cert_set_success = True
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
db_data = challenge
if license_curl.data:
try:
lic_data = replace_widevine_value(license_curl.data, base64.b64encode(challenge).decode())
except:
pass
else:
lic_data = challenge
2024-09-16 17:28:01 +00:00
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=lic_data
)
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response.content)
2024-09-16 17:28:01 +00:00
except Exception as error:
try:
cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json()))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
2024-09-16 17:28:01 +00:00
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
2024-09-16 17:28:01 +00:00
print(f"{key.kid.hex}:{key.key.hex()}")
return
def decrypt_remote_cdm():
remote_cdm_choice = CDM_Selector.select_remote_cdm()
try:
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
config = yaml.safe_load(ymlfile)
cdm = RemoteCdm(
device_type=config['Remote_CDMs'][remote_cdm_choice]['device_type'],
system_id=config['Remote_CDMs'][remote_cdm_choice]['system_id'],
security_level=config['Remote_CDMs'][remote_cdm_choice]['security_level'],
host=config['Remote_CDMs'][remote_cdm_choice]['host'],
secret=config['Remote_CDMs'][remote_cdm_choice]['secret'],
device_name=config['Remote_CDMs'][remote_cdm_choice]['name'],
)
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
session_id = cdm.open()
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
pssh_or_mpd_input = input("MPD Link or PSSH: ")
if check_if_url(pssh_or_mpd_input):
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
if len(pssh_list) > 1:
choice = inquirer.select(
message="Select PSSH:",
choices=pssh_list,
).execute()
db_pssh = choice
db_mpd = pssh_or_mpd_input
pssh = PSSH(choice)
else:
if pssh_list:
db_pssh = pssh_list[0]
db_mpd = pssh_or_mpd_input
pssh = PSSH(pssh_list[0])
else:
db_pssh = pssh_or_mpd_input
db_mpd = None
pssh = PSSH(pssh_or_mpd_input)
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
user_license_url = input("License URL: ")
db_url = user_license_url
2024-09-16 17:28:01 +00:00
print()
license_curl = load_license_curl()
db_headers = license_curl.headers
db_cookies = license_curl.cookies
set_service_certificate_success = False
2024-09-16 17:28:01 +00:00
try:
service_certificate = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=cdm.service_certificate_challenge
)
try:
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content)
set_service_certificate_success = True
except:
try:
cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json()))
set_service_certificate_success = True
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
db_data = challenge
if license_curl.data:
try:
lic_data = replace_widevine_value(license_curl.data, base64.b64encode(challenge).decode())
except:
pass
else:
lic_data = challenge
2024-09-16 17:28:01 +00:00
try:
license_response = httpx.post(
url=user_license_url,
headers=license_curl.headers,
cookies=license_curl.cookies,
data=lic_data
)
2024-09-16 17:28:01 +00:00
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
try:
cdm.parse_license(session_id=session_id, license_message=license_response.content)
2024-09-16 17:28:01 +00:00
except Exception as error:
try:
cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json()))
except Exception as error:
print(f'{Fore.RED}An error occurred.\n\n{error}')
exit(1)
2024-09-16 17:28:01 +00:00
for key in cdm.get_keys(session_id):
if key.type == 'CONTENT':
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
2024-09-16 17:28:01 +00:00
print(f"{key.kid.hex}:{key.key.hex()}")
return