219 lines
8.1 KiB
Python
219 lines
8.1 KiB
Python
import httpx
|
|
from pywidevine.cdm import Cdm
|
|
from pywidevine.device import Device
|
|
from pywidevine.pssh import PSSH
|
|
from pywidevine.remotecdm import RemoteCdm
|
|
import yaml
|
|
import os
|
|
from colorama import Fore
|
|
import re
|
|
from . import CDM_Selector
|
|
from . import Get_PSSH
|
|
from . import Database
|
|
from InquirerPy import inquirer
|
|
|
|
def load_license_curl():
|
|
class LicenseCURL:
|
|
def __init__(self):
|
|
self.headers = headers
|
|
self.cookies = cookies
|
|
|
|
try:
|
|
with open(f'{os.getcwd()}/License_cURL.py', 'r') as file:
|
|
local_namespace = {}
|
|
exec(file.read(), globals(), local_namespace)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
|
|
if 'headers' in local_namespace:
|
|
headers = local_namespace['headers']
|
|
else:
|
|
headers = None
|
|
if 'cookies' in local_namespace:
|
|
cookies = local_namespace['cookies']
|
|
else:
|
|
cookies = None
|
|
return LicenseCURL()
|
|
|
|
def check_if_url(string):
|
|
# Regular expression for validating a URL
|
|
url_pattern = re.compile(
|
|
r'^(?:http|ftp)s?://' # http:// or https://
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
|
|
r'localhost|' # localhost...
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
|
|
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
|
|
r'(?::\d+)?' # optional port
|
|
r'(?:/?|[/?]\S+)$', # optional path
|
|
re.IGNORECASE)
|
|
return re.match(url_pattern, string) is not None
|
|
|
|
def decrypt_local_cdm():
|
|
cdm_choice = CDM_Selector.select_local_cdm()
|
|
try:
|
|
device = Device.load(f'{os.getcwd()}/WVDs/{cdm_choice}')
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
cdm = Cdm.from_device(device)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
session_id = cdm.open()
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
pssh_or_mpd_input = input("MPD Link or PSSH: ")
|
|
if check_if_url(pssh_or_mpd_input):
|
|
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
|
|
if len(pssh_list) > 1:
|
|
choice = inquirer.select(
|
|
message="Select PSSH:",
|
|
choices=pssh_list,
|
|
).execute()
|
|
db_pssh = choice
|
|
db_mpd = pssh_or_mpd_input
|
|
pssh = PSSH(choice)
|
|
else:
|
|
if pssh_list:
|
|
db_pssh = pssh_list[0]
|
|
db_mpd = pssh_or_mpd_input
|
|
pssh = PSSH(pssh_list[0])
|
|
else:
|
|
db_pssh = pssh_or_mpd_input
|
|
db_mpd = None
|
|
pssh = PSSH(pssh_or_mpd_input)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
user_license_url = input("License URL: ")
|
|
db_url = user_license_url
|
|
print()
|
|
license_curl = load_license_curl()
|
|
db_headers = license_curl.headers
|
|
db_cookies = license_curl.cookies
|
|
service_cert_set_success = False
|
|
try:
|
|
service_certificate = httpx.post(
|
|
url=user_license_url,
|
|
headers=license_curl.headers,
|
|
cookies=license_curl.cookies,
|
|
data=cdm.service_certificate_challenge
|
|
).content
|
|
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
|
|
service_cert_set_success = True
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
|
|
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
|
|
db_data = challenge
|
|
try:
|
|
license_response = httpx.post(
|
|
url=user_license_url,
|
|
headers=license_curl.headers,
|
|
cookies=license_curl.cookies,
|
|
data=challenge
|
|
).content
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
cdm.parse_license(session_id=session_id, license_message=license_response)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type == 'CONTENT':
|
|
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
|
|
print(f"{key.kid.hex}:{key.key.hex()}")
|
|
|
|
return
|
|
|
|
def decrypt_remote_cdm():
|
|
remote_cdm_choice = CDM_Selector.select_remote_cdm()
|
|
try:
|
|
with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile:
|
|
config = yaml.safe_load(ymlfile)
|
|
cdm = RemoteCdm(
|
|
device_type=config['Remote_CDMs'][remote_cdm_choice]['device_type'],
|
|
system_id=config['Remote_CDMs'][remote_cdm_choice]['system_id'],
|
|
security_level=config['Remote_CDMs'][remote_cdm_choice]['security_level'],
|
|
host=config['Remote_CDMs'][remote_cdm_choice]['host'],
|
|
secret=config['Remote_CDMs'][remote_cdm_choice]['secret'],
|
|
device_name=config['Remote_CDMs'][remote_cdm_choice]['name'],
|
|
)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
session_id = cdm.open()
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
pssh_or_mpd_input = input("MPD Link or PSSH: ")
|
|
if check_if_url(pssh_or_mpd_input):
|
|
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
|
|
if len(pssh_list) > 1:
|
|
choice = inquirer.select(
|
|
message="Select PSSH:",
|
|
choices=pssh_list,
|
|
).execute()
|
|
db_pssh = choice
|
|
db_mpd = pssh_or_mpd_input
|
|
pssh = PSSH(choice)
|
|
else:
|
|
if pssh_list:
|
|
db_pssh = pssh_list[0]
|
|
db_mpd = pssh_or_mpd_input
|
|
pssh = PSSH(pssh_list[0])
|
|
else:
|
|
db_pssh = pssh_or_mpd_input
|
|
db_mpd = None
|
|
pssh = PSSH(pssh_or_mpd_input)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
user_license_url = input("License URL: ")
|
|
db_url = user_license_url
|
|
print()
|
|
license_curl = load_license_curl()
|
|
db_headers = license_curl.headers
|
|
db_cookies = license_curl.cookies
|
|
set_service_certificate_success = False
|
|
try:
|
|
service_certificate = httpx.post(
|
|
url=user_license_url,
|
|
headers=license_curl.headers,
|
|
cookies=license_curl.cookies,
|
|
data=cdm.service_certificate_challenge
|
|
).content
|
|
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
|
|
service_cert_set_success = True
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
|
|
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
|
|
db_data = challenge
|
|
try:
|
|
license_response = httpx.post(
|
|
url=user_license_url,
|
|
headers=license_curl.headers,
|
|
cookies=license_curl.cookies,
|
|
data=challenge
|
|
).content
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
try:
|
|
cdm.parse_license(session_id=session_id, license_message=license_response)
|
|
except Exception as error:
|
|
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
|
exit(1)
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type == 'CONTENT':
|
|
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
|
|
print(f"{key.kid.hex}:{key.key.hex()}")
|
|
|
|
return |