import base64 from random import random import requests import httpx from pywidevine.cdm import Cdm from pywidevine.device import Device from pywidevine.pssh import PSSH from pywidevine.remotecdm import RemoteCdm import yaml import os import random from colorama import Fore import re from . import CDM_Selector from . import Get_PSSH from . import Database from InquirerPy import inquirer def load_license_curl(): class LicenseCURL: def __init__(self): self.headers = headers self.cookies = cookies self.data = data self.json_data = json_data try: with open(f'{os.getcwd()}/License_cURL.py', 'r') as file: local_namespace = {} exec(file.read(), globals(), local_namespace) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') if 'headers' in local_namespace: headers = local_namespace['headers'] else: headers = None if 'cookies' in local_namespace: cookies = local_namespace['cookies'] else: cookies = None if 'data' in local_namespace: data = local_namespace['data'] else: data = None if 'json_data' in local_namespace: json_data = local_namespace['json_data'] else: json_data = None return LicenseCURL() def check_if_url(string): # Regular expression for validating a URL url_pattern = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', # optional path re.IGNORECASE) return re.match(url_pattern, string) is not None def replace_widevine_challenge(dictionary, challenge): for key, value in dictionary.items(): if 'widevine' in key.lower() or 'license' in key.lower(): dictionary[key] = challenge elif isinstance(value, dict): replace_widevine_challenge(value, challenge) def find_widevine_license(dictionary): for key, value in dictionary.items(): if 'widevine' in key.lower() or 'license' in key.lower(): license = dictionary[key].replace("-", "+").replace("_", "/") return license elif isinstance(value, dict): find_widevine_license(value) def get_proxy(country_code): params = { 'request': 'displayproxies', 'protocol': 'http', 'timeout': '10000', 'country': f'{country_code}', 'ssl': 'all', 'anonymity': 'all', } response = requests.get('https://api.proxyscrape.com/v2/', params=params) lines = response.text.splitlines() random_line = random.choice(lines) proxies = { 'http://': f'http://{random_line}', 'https://': f'http://{random_line}' } return proxies def decrypt_local_cdm(): cdm_choice = CDM_Selector.select_local_cdm() try: device = Device.load(f'{os.getcwd()}/WVDs/{cdm_choice}') except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: cdm = Cdm.from_device(device) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: session_id = cdm.open() except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: pssh_or_mpd_input = input("MPD Link or PSSH: ") if check_if_url(pssh_or_mpd_input): pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input) if len(pssh_list) > 1: choice = inquirer.select( message="Select PSSH:", choices=pssh_list, ).execute() db_pssh = choice db_mpd = pssh_or_mpd_input pssh = PSSH(choice) else: if pssh_list: db_pssh = pssh_list[0] db_mpd = pssh_or_mpd_input pssh = PSSH(pssh_list[0]) else: db_pssh = pssh_or_mpd_input db_mpd = None pssh = PSSH(pssh_or_mpd_input) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) user_license_url = input("License URL: ") db_url = user_license_url Proxy = None with open('Config.yaml', 'r') as ymlfile: config = yaml.safe_load(ymlfile) if 'Proxy' in config: if len(config['Proxy']) == 2: Proxy = get_proxy(config['Proxy']) print(f'Using Proxy {Proxy}') print() license_curl = load_license_curl() db_headers = license_curl.headers db_cookies = license_curl.cookies service_cert_set_success = False try: service_certificate = httpx.post( url=user_license_url, headers=license_curl.headers, cookies=license_curl.cookies, data=cdm.service_certificate_challenge, proxies=Proxy ) try: cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content) service_cert_set_success = True except Exception as error: try: cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json())) service_cert_set_success = True except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate') except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate') challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success) db_data = challenge if license_curl.data: try: replace_widevine_challenge(license_curl.data, base64.b64encode(challenge).decode()) lic_data = license_curl.data db_data = license_curl.data except: pass else: lic_data = None if license_curl.json_data: try: replace_widevine_challenge(license_curl.json_data, base64.b64encode(challenge).decode()) json_data = license_curl.json_data db_data = license_curl.json_data except: pass else: json_data = None try: license_response = httpx.post( url=user_license_url, headers=license_curl.headers, cookies=license_curl.cookies, json=json_data, data=challenge if json_data is None and lic_data is None else lic_data, proxies=Proxy ) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: cdm.parse_license(session_id=session_id, license_message=license_response.content) except Exception as error: try: cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json())) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) for key in cdm.get_keys(session_id): if key.type == 'CONTENT': Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}') print(f"{key.kid.hex}:{key.key.hex()}") return def decrypt_remote_cdm(): remote_cdm_choice = CDM_Selector.select_remote_cdm() try: with open(f'{os.getcwd()}/Config.yaml', 'r') as ymlfile: config = yaml.safe_load(ymlfile) cdm = RemoteCdm( device_type=config['Remote_CDMs'][remote_cdm_choice]['device_type'], system_id=config['Remote_CDMs'][remote_cdm_choice]['system_id'], security_level=config['Remote_CDMs'][remote_cdm_choice]['security_level'], host=config['Remote_CDMs'][remote_cdm_choice]['host'], secret=config['Remote_CDMs'][remote_cdm_choice]['secret'], device_name=config['Remote_CDMs'][remote_cdm_choice]['name'], ) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: session_id = cdm.open() except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: pssh_or_mpd_input = input("MPD Link or PSSH: ") if check_if_url(pssh_or_mpd_input): pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input) if len(pssh_list) > 1: choice = inquirer.select( message="Select PSSH:", choices=pssh_list, ).execute() db_pssh = choice db_mpd = pssh_or_mpd_input pssh = PSSH(choice) else: if pssh_list: db_pssh = pssh_list[0] db_mpd = pssh_or_mpd_input pssh = PSSH(pssh_list[0]) else: db_pssh = pssh_or_mpd_input db_mpd = None pssh = PSSH(pssh_or_mpd_input) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) user_license_url = input("License URL: ") db_url = user_license_url Proxy = None with open('Config.yaml', 'r') as ymlfile: config = yaml.safe_load(ymlfile) if 'Proxy' in config: if len(config['Proxy']) == 2: Proxy = get_proxy(config['Proxy']) print(f'Using Proxy {Proxy}') print() license_curl = load_license_curl() db_headers = license_curl.headers db_cookies = license_curl.cookies set_service_certificate_success = False try: service_certificate = httpx.post( url=user_license_url, headers=license_curl.headers, cookies=license_curl.cookies, data=cdm.service_certificate_challenge, proxies=Proxy ) try: cdm.set_service_certificate(session_id=session_id, certificate=service_certificate.content) set_service_certificate_success = True except: try: cdm.set_service_certificate(session_id=session_id, certificate=find_widevine_license(service_certificate.json())) set_service_certificate_success = True except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate') except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate') challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=set_service_certificate_success) if license_curl.data: try: replace_widevine_challenge(license_curl.data, base64.b64encode(challenge).decode()) lic_data = license_curl.data db_data = license_curl.data except: pass else: lic_data = None if license_curl.json_data: try: replace_widevine_challenge(license_curl.json_data, base64.b64encode(challenge).decode()) json_data = license_curl.json_data db_data = license_curl.json_data except: pass else: json_data = None if json_data is None and lic_data is None: db_data = challenge try: license_response = httpx.post( url=user_license_url, headers=license_curl.headers, cookies=license_curl.cookies, json=json_data, data=challenge if json_data is None and lic_data is None else lic_data, proxies=Proxy ) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) try: cdm.parse_license(session_id=session_id, license_message=license_response.content) except Exception as error: try: cdm.parse_license(session_id=session_id, license_message=find_widevine_license(license_response.json())) except Exception as error: print(f'{Fore.RED}An error occurred.\n\n{error}') exit(1) for key in cdm.get_keys(session_id): if key.type == 'CONTENT': Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}') print(f"{key.kid.hex}:{key.key.hex()}") return