diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ccaa126 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +WVDs/ +venv/ +keys/ +Config/ +download/ +binaries/ +*.pyc \ No newline at end of file diff --git a/Helpers/__init__.py b/Helpers/__init__.py new file mode 100644 index 0000000..1961d81 --- /dev/null +++ b/Helpers/__init__.py @@ -0,0 +1,8 @@ +from . import wvd_check +from . import api_check +from . import capability_check +from . import database_check +from . import cache_key +from . import mpd_parse +from . import download +from . import binary_check \ No newline at end of file diff --git a/Helpers/api_check.py b/Helpers/api_check.py new file mode 100644 index 0000000..184f9dc --- /dev/null +++ b/Helpers/api_check.py @@ -0,0 +1,21 @@ +# Import dependencies +import os + + +# Define api key check +def api_check(): + # Create Config directory if it doesn't exist + if 'Config' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/Config') + # Create api-key.txt if it doesn't exist + if not os.path.isfile(f'{os.getcwd()}/Config/api-key.txt'): + with open(f'{os.getcwd()}/Config/api-key.txt', 'w') as api_key_text: + api_key_text.write("Place your API key on this line") + return "First run" + # Grab API Key from the text file + with open(f'{os.getcwd()}/Config/api-key.txt') as API_Key_Text: + api_key = API_Key_Text.readline() + if api_key != "Place your API key on this line": + return api_key + else: + return None diff --git a/Helpers/binary_check.py b/Helpers/binary_check.py new file mode 100644 index 0000000..23e15a9 --- /dev/null +++ b/Helpers/binary_check.py @@ -0,0 +1,161 @@ +import os +import zipfile +import shutil +import requests +from tqdm import tqdm + + +# Create / Check folders function +def create_folders(): + # Check for required directories + for directory in ['binaries', 'download']: + # Create them if they don't exist + if directory not in os.listdir(os.getcwd()): + os.makedirs(f'{os.getcwd()}/{directory}') + # If they do exist, check for temp, create if it doesn't exist + if 'temp' not in f'{os.listdir(os.getcwd())}/download': + os.makedirs(f'{os.getcwd()}/download/temp') + + +# Create / Check binaries function +def create_binaries(): + # Check if the required binaries exist, if not, download them. + + # Iterate through required binaries + for binary in ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"]: + + # Perform checks for each binary + if not os.path.isfile(f"{os.getcwd()}/binaries/{binary}"): + + # FFmpeg + if binary == "ffmpeg.exe": + + # Download windows zip file for FFmpeg + ffmpeg_download = requests.get( + "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", + stream=True) + total_size = int(ffmpeg_download.headers.get('content-length', 0)) + with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading ffmpeg.zip") as progress_bar: + for data in ffmpeg_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + + # Unzip FFmpeg + with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip: + file_count = len(ffmpeg_zip.infolist()) + with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar: + for file in ffmpeg_zip.infolist(): + ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp") + unzip_progress_bar.update(1) + + # Copy ffmpeg binary to binaries + shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe", + f"{os.getcwd()}/binaries") + + # Remove the zip + os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip") + + # Remove the folder + shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl") + + # Print a new line + print() + + # MP4 Decrypt + elif binary == "mp4decrypt.exe": + + # Download mp4decrypt zip file + mp4decrypt_download = requests.get( + "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True) + total_size = int(mp4decrypt_download.headers.get('content-length', 0)) + with open(f"{os.getcwd()}/download/temp/mp4decrypt.zip", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading mp4decrypt.zip") as progress_bar: + for data in mp4decrypt_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + + # Unzip mp4decrypt + with zipfile.ZipFile(f"{os.getcwd()}/download/temp/mp4decrypt.zip", "r") as mp4decrypt_zip: + file_count = len(mp4decrypt_zip.infolist()) + with tqdm(total=file_count, unit='file', desc="Extracting mp4decrypt.zip") as unzip_progress_bar: + for file in mp4decrypt_zip.infolist(): + mp4decrypt_zip.extract(file, path=f"{os.getcwd()}/download/temp") + unzip_progress_bar.update(1) + + # Copy mp4decrypt binary to binaries + shutil.copy2( + f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe", + f"{os.getcwd()}/binaries") + + # Deleting the zip file + os.remove(f"{os.getcwd()}/download/temp/mp4decrypt.zip") + + # Deleting the directory + shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32") + + # Print a new line + print() + + # n_m3u8dl-re + elif binary == "n_m3u8dl-re.exe": + + # Download n_m3u8dl-re zip file + n_m3u8dl_re_download = requests.get( + "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip", + stream=True) + total_size = int(n_m3u8dl_re_download.headers.get('content-length', 0)) + with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading n_m3u8dl-re.zip") as progress_bar: + for data in n_m3u8dl_re_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + + # Unzip n_m3u8dl-re + with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip: + file_count = len(nm3u8dl_re_zip.infolist()) + with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar: + for file in nm3u8dl_re_zip.infolist(): + nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp") + unzip_progress_bar.update(1) + + # Copy n_m3u8dl-re binary to binaries + shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe", + f"{os.getcwd()}/binaries") + + # Delete zip file + os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip") + + # Delete directory + shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64") + + # Print a new line + print() + + # YT-DLP + elif binary == "yt-dlp.exe": + + # Download yt-dlp exe + yt_dlp_download = requests.get( + "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe", + stream=True) + total_size = int(yt_dlp_download.headers.get('content-length', 0)) + with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading yt-dlp") as progress_bar: + for data in yt_dlp_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + + # Copy yt-dlp binary to binaries + shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe", + f"{os.getcwd()}/binaries") + + # Remove binary from download folder + os.remove(f"{os.getcwd()}/download/yt-dlp.exe") + + # Print a new line + print() diff --git a/Helpers/cache_key.py b/Helpers/cache_key.py new file mode 100644 index 0000000..9eba312 --- /dev/null +++ b/Helpers/cache_key.py @@ -0,0 +1,13 @@ +# Import dependencies + +import sqlite3 +import os + + +# Define cache function +def cache_keys(pssh: str, keys: str): + dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db") + dbcursor = dbconnection.cursor() + dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys)) + dbconnection.commit() + dbconnection.close() \ No newline at end of file diff --git a/Helpers/capability_check.py b/Helpers/capability_check.py new file mode 100644 index 0000000..c5c663c --- /dev/null +++ b/Helpers/capability_check.py @@ -0,0 +1,27 @@ +# Import dependencies +import Helpers +import os + + +def capability_check(): + # Check for .WVD and API Key, exit program if neither exist. + Device = Helpers.wvd_check.wvd_check() + if Device is None: + API_Key = Helpers.api_check.api_check() + if API_Key == "First run" or API_Key == None: + exit(f"No CDM or API key found, please place a CDM in {os.getcwd()}/WVDs or an API key in {os.getcwd()}/Config/api-key.txt") + else: + print("No local device found, remote decryption only.") + print(f'Using API Key: {API_Key}\n') + return None, API_Key + elif Device is not None: + API_Key = Helpers.api_check.api_check() + if API_Key == "First run" or API_Key == None: + print("No API key found, local decryption only.") + print(f'Using device at {Device}\n') + return Device, None + else: + print(f'Local and remote decryption available.') + print(f'Using device at {Device}') + print(f'Using API Key: {API_Key}\n') + return Device, API_Key diff --git a/Helpers/database_check.py b/Helpers/database_check.py new file mode 100644 index 0000000..d50f4c9 --- /dev/null +++ b/Helpers/database_check.py @@ -0,0 +1,19 @@ +# Import dependencies + +import os +import sqlite3 + + +# Check to see if the database already exists, if not create a keys folder, and create the database. +def database_check(): + # Check to see if the "keys" directory exists, if not creates it + if "keys" not in os.listdir(os.getcwd()): + os.makedirs('keys') + + # Check to see if a database exists in keys directory, if not create it + if not os.path.isfile(f"{os.getcwd()}/keys/database.db"): + print(f"Creating database.\n") + dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db") + dbcursor = dbconnection.cursor() + dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )') + dbconnection.close() \ No newline at end of file diff --git a/Helpers/download.py b/Helpers/download.py new file mode 100644 index 0000000..3673d44 --- /dev/null +++ b/Helpers/download.py @@ -0,0 +1,193 @@ +import subprocess +from os import urandom +import uuid +import glob +import os +import Helpers.binary_check +import Sites.Generic +import license_curl + + +# Web Download function generic +def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): + + # Check for folders + Helpers.binary_check.create_folders() + + # Check for binaries + Helpers.binary_check.create_binaries() + + # Create random download name + download_name = str(uuid.uuid4()) + + # Retrieve the keys + if not remote: + mp4decrypt_keys = Sites.Generic.decrypt_generic(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers) + if remote: + mp4decrypt_keys = Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd) + + # Define n_m3u8dl-re download parameters + n_m3u8dl_re_download = [ + f'{os.getcwd()}/binaries/n_m3u8dl-re.exe', + f'{mpd}', + '--ffmpeg-binary-path', + f'{os.getcwd()}/binaries/ffmpeg.exe', + '--decryption-binary-path', + f'{os.getcwd()}/binaries/mp4decrypt.exe', + '--tmp-dir', + f'{os.getcwd()}/download/temp', + '--save-dir', + f'{os.getcwd()}/download', + '--save-name', + f'{download_name}', + '--binary-merge', + 'True', + '--mux-after-done', + 'format=mkv' + ] + mp4decrypt_keys + + subprocess.run(n_m3u8dl_re_download) + + try: + download_name = glob.glob(f'{os.getcwd()}/download/{download_name}.*') + return download_name + except: + return f'Failed to download!' + + +# Web Download crunchyroll function +def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): + + # Check for folders + Helpers.binary_check.create_folders() + + # Check for binaries + Helpers.binary_check.create_binaries() + + # Create random download name + download_name = str(uuid.uuid4()) + + # Retrieve the keys + if not remote: + mp4decrypt_keys = Sites.Crunchyroll.decrypt_crunchyroll(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers) + if remote: + mp4decrypt_keys = Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd) + + # Define n_m3u8dl-re download parameters + n_m3u8dl_re_download = [ + f'{os.getcwd()}/binaries/n_m3u8dl-re.exe', + f'--header', + f'authorization: {license_curl.headers["authorization"]}', + f'{mpd}', + '--ffmpeg-binary-path', + f'{os.getcwd()}/binaries/ffmpeg.exe', + '--decryption-binary-path', + f'{os.getcwd()}/binaries/mp4decrypt.exe', + '--tmp-dir', + f'{os.getcwd()}/download/temp', + '--save-dir', + f'{os.getcwd()}/download', + '--save-name', + f'{download_name}', + '--binary-merge', + 'True', + '--mux-after-done', + 'format=mkv' + ] + mp4decrypt_keys + + subprocess.run(n_m3u8dl_re_download) + + try: + download_name = glob.glob(f'{os.getcwd()}/download/{download_name}.*') + return download_name + except: + return f'Failed to download!' + + +# YouTube Download function generic +def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote: bool = False): + + # Check for folders + Helpers.binary_check.create_folders() + + # Check for binaries + Helpers.binary_check.create_binaries() + + # Create random download name + download_name = str(uuid.uuid4()) + + # Retrieve the keys + if not remote: + mp4decrypt_keys = Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) + if remote: + mp4decrypt_keys = Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) + + # Define yt-dlp download parameters + yt_dlp_download = [ + f'{os.getcwd()}/binaries/yt-dlp.exe', + '-f', + 'bv*+ba/b', + '--allow-u', + '-o', + f'{os.getcwd()}/download/{download_name}.%(ext)s', + '-S', + 'ext', + '-S', + 'res:720', + f'{url}' + ] + + # Run yt-dlp + subprocess.run(yt_dlp_download) + + # Get the names of the newly downloaded files + files = glob.glob(f'{os.getcwd()}/download/{download_name}.*') + + # Declare empty list for decrypted files location to be stored + decrypted_files = [] + + # Iterate through all the files and decrypt them + for file in files: + + # Assign file name variable to be appended to decrypted files list + file_name = str(uuid.uuid4()) + + # define mp4 decrypt parameters + mp4_decrypt = [ + f'{os.getcwd()}/binaries/mp4decrypt.exe', + f'{file}', + f'{os.getcwd()}/download/{file_name}', + ] + mp4decrypt_keys + + # Run mp4decrypt + subprocess.run(mp4_decrypt) + + # Append the file to the decrypted file list + decrypted_files.append(f'{os.getcwd()}/download/{file_name}') + + # Declare a final mux variable + final_mux = str(uuid.uuid4()) + + # Define ffmpeg parameters + ffmpeg_merge = [ + f"{os.getcwd()}/binaries/ffmpeg.exe", + '-i', + f"{decrypted_files[0]}", + '-i', + f"{decrypted_files[1]}", + '-vcodec', + 'copy', + '-acodec', + 'copy', + f"{os.getcwd()}/downloads/{final_mux}.mkv", + ] + + # Run ffmpeg to merge the files + subprocess.run(ffmpeg_merge) + + # Try to get a download name and return it + download_name = glob.glob(f'{os.getcwd()}/download/{final_mux}.*') + if download_name: + return download_name + else: + return f"Couldn't complete download!" diff --git a/Helpers/mpd_parse.py b/Helpers/mpd_parse.py new file mode 100644 index 0000000..b69a391 --- /dev/null +++ b/Helpers/mpd_parse.py @@ -0,0 +1,28 @@ +import requests +import re + + +# Define MPD / m3u8 PSSH parser +def parse_pssh(manifest_url): + manifest = manifest_url + try: + response = requests.get(manifest) + except: + pssh = input("Couldn't retrieve manifest, please input PSSH: ") + return pssh + try: + matches = re.finditer(r'(.*))>(?P(.*))', response.text) + pssh_list = [] + + for match in matches: + if match.group and not match.group("pssh") in pssh_list and len(match.group("pssh")) < 300: + pssh_list.append(match.group("pssh")) + + if len(pssh_list) < 1: + matches = re.finditer(r'URI="data:text/plain;base64,(?P(.*))"', response.text) + for match in matches: + if match.group("pssh") and match.group("pssh").upper().startswith("A") and len(match.group("pssh")) < 300: + pssh_list.append(match.group("pssh")) + return f'{pssh_list[0]}' + except: + return None \ No newline at end of file diff --git a/Helpers/wvd_check.py b/Helpers/wvd_check.py new file mode 100644 index 0000000..61839fe --- /dev/null +++ b/Helpers/wvd_check.py @@ -0,0 +1,21 @@ +# Import dependencies +import os +import glob + + +# Define WVD device check +def wvd_check(): + try: + # Check to see if the WVDs folder exist, if not create it + if 'WVDs' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/WVDs') + # Use glob to get the name of the .wvd + extracted_device = glob.glob(f'{os.getcwd()}/WVDs/*.wvd')[0] + # Return the device path + return extracted_device + except: + # Check to see if the WVDs folder exist, if not create it + if 'WVDs' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/WVDs') + # Stop the program and print out instructions + return None diff --git a/License_cURL.py b/License_cURL.py index cebd5a8..e69de29 100644 --- a/License_cURL.py +++ b/License_cURL.py @@ -1,5 +0,0 @@ -headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US,en;q=0.5', -} \ No newline at end of file diff --git a/README.md b/README.md index 929adb7..fe5515c 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,23 @@ # TPD-Keys -#### Created by @TPD94, proxy function by Radziu +#### Created by @TPD94 ## Based on [pywidevine](https://cdm-project.com/Decryption-Tools/pywidevine "pywidevine") How to use: 1. Create `TPD-Keys` folder. -2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory +2. Download and extract `TPD-Keys.py`, `requirements.txt` and `License_curl.py` into the newly created `TPD-Keys` directory 3. Install the requirements with `pip install -r requirements.txt` 4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3` -5. Place your .wvd in the root of `TPD-Keys` directory +5. Place your .wvd in `/WVDs` directory, if you do not have this directory, create it or run the program with `python TPD-Keys.py` and it will be created for you -6. Paste any needed headers into `License_cURL.py` +6. Place your API key (if wanted) in `/Config/api-key.txt` if you do not have this file or directory, create it or run the program with `python TPD-Keys.py` and it will be created for you. If you don't have an API key, you can request one via [discord](https://discord.gg/cdrm-project "CDRM-Project") -7. Run with `python tpd-keys.py` +7. Paste dictionaries from license request curl post request into `License_curl.py` -8. Make a selection +8. Run with `python tpd-keys.py` + +To view additional options you can use `python tpd-keys.py -h` \ No newline at end of file diff --git a/Sites/Crunchyroll.py b/Sites/Crunchyroll.py new file mode 100644 index 0000000..96dec30 --- /dev/null +++ b/Sites/Crunchyroll.py @@ -0,0 +1,201 @@ +# Import dependencies + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import requests +import base64 +import os +import Helpers + + +# Defining decrypt function for Crunchyroll +def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no device + if wvd is None: + exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs") + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + # prepare pssh + pssh = PSSH(input_pssh) + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # get service certificate + service_cert = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + data=cdm.service_certificate_challenge, + headers=license_curl_headers + ) + if service_cert.status_code != 200: + print("Couldn't retrieve service cert") + else: + service_cert = service_cert.json()["license"] + cdm.set_service_certificate(session_id, service_cert) + + # generate license challenge + if service_cert: + challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True) + else: + challenge = cdm.get_license_challenge(session_id, pssh) + + # send license challenge + license = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + data=challenge, + headers=license_curl_headers + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.json()["license"] + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}') + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'\nKeys:\n{returned_keys}') + + # Return the keys for future ripper use. + return mp4decrypt_keys + + +# Defining remote decrypt function for Crunchyroll +def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no device + if api_key is None: + exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt") + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = license.json()["license"] + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}") + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'\nKeys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) + + # return mp4decrypt keys + return mp4decrypt_keys diff --git a/Sites/Generic.py b/Sites/Generic.py new file mode 100644 index 0000000..db0e958 --- /dev/null +++ b/Sites/Generic.py @@ -0,0 +1,207 @@ +# Import dependencies + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import requests +import base64 +import os +import Helpers + + +# Defining decrypt function for generic services +def decrypt_generic(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no device + if wvd is None: + exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs") + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + + # prepare pssh + pssh = PSSH(input_pssh) + + # Ask for license URL + license_url = input(f"\nLicense URL: ") + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # get service certificate + service_cert = requests.post( + url=license_url, + data=cdm.service_certificate_challenge, + headers=license_curl_headers + ) + if service_cert.status_code != 200: + print("Couldn't retrieve service cert") + else: + service_cert = service_cert.content + cdm.set_service_certificate(session_id, service_cert) + + # generate license challenge + if service_cert: + challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True) + else: + challenge = cdm.get_license_challenge(session_id, pssh) + + # send license challenge + license = requests.post( + url=license_url, + data=challenge, + headers=license_curl_headers + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.content + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}') + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'\nKeys:\n{returned_keys}') + + # Return the keys for future ripper use. + return mp4decrypt_keys + +# Defining remote decrypt function for generic services +def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no API key + if api_key is None: + exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt") + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + # Ask for license URL + input_license_url = input(f"\nLicense URL: ") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = base64.b64encode(license.content).decode() + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}") + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'\nKeys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) + + # Return mp4decrypt keys + return mp4decrypt_keys diff --git a/Sites/YouTube.py b/Sites/YouTube.py new file mode 100644 index 0000000..5c62e88 --- /dev/null +++ b/Sites/YouTube.py @@ -0,0 +1,180 @@ +# Import dependencies + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import requests +import base64 +import os +import Helpers + + +# Defining decrypt function for YouTube +def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, license_curl_json: dict = None): + + # Exit if no device + if wvd is None: + exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs") + + # prepare pssh + pssh = PSSH("AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=") + + # Ask for the license URL + license_url = input("License URL: ") + + # Print a new line between PSSH/License URL and keys + print("\n") + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # Generate challenge + challenge = cdm.get_license_challenge(session_id, pssh) + + # Insert the challenge into the JSON data + license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode() + + # send license challenge + license = requests.post( + url=license_url, + headers=license_curl_headers, + cookies=license_curl_cookies, + json=license_curl_json + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + licence = license.json()["license"].replace("-", "+").replace("_", "/") + + # parse license challenge + cdm.parse_license(session_id, licence) + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}') + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys) + + # Print out the keys + print(f'Keys:\n{returned_keys}') + + # Return the keys for future ripper use. + return mp4decrypt_keys + + +# Defining remote decrypt function for YouTube +def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, license_curl_cookies: dict = None): + + # Exit if no API key + if api_key is None: + exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt") + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Ask for License URL + input_license_url = input("License URL: ") + + # Print a line between license URL and keys + print("\n") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=" + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Insert the challenge into the JSON data + license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode() + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + json=license_curl_json, + cookies=license_curl_cookies + ) + + # Retrieve the license message + license = license.json()["license"].replace("-", "+").replace("_", "/") + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}") + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # Cache the keys + Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys) + + # Print out keys + print(f'Keys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) + + # Return mp4decrypt keys + return mp4decrypt_keys diff --git a/Sites/__init__.py b/Sites/__init__.py new file mode 100644 index 0000000..c216588 --- /dev/null +++ b/Sites/__init__.py @@ -0,0 +1,3 @@ +from . import Crunchyroll +from . import Generic +from . import YouTube \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 70e8eca..9931c2c 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/tpd-keys.py b/tpd-keys.py index ec1967f..c6b6cc0 100644 --- a/tpd-keys.py +++ b/tpd-keys.py @@ -1,443 +1,91 @@ -# Import needed libraries - -import requests -import json -import httpx -import sqlite3 -import License_cURL +# Import dependencies import os -from os import urandom -import glob -import inquirer -import uuid -import random -import re +import Helpers +import Sites +import license_curl +import argparse + +# Get device and api key +device, api_key = Helpers.capability_check.capability_check() + +# Database check, if it doesn't exist, create it. +Helpers.database_check.database_check() + +# Initialize argparse and set variable +parser = argparse.ArgumentParser(description="Options for decryption") + +# Create mutually exclusive groups for switches +services = parser.add_mutually_exclusive_group() + +# Add switches to the mutually exclusive groups +services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll") +services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely") +services.add_argument('--youtube', action='store_true', help="Decrypt YouTube") +services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely") +services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely") + +# Add web download switch +parser.add_argument('--web-dl', help="Web download", action='store_true') + +# Assign the switches a variable +switches = parser.parse_args() -from pywidevine.cdm import Cdm -from pywidevine.device import Device -from pywidevine.pssh import PSSH -import base64 - - -# Hola proxy - -class Settings: - def __init__(self, userCountry: str = None, randomProxy: bool = False) -> None: - self.randomProxy = randomProxy - self.userCountry = userCountry - self.ccgi_url = "https://client.hola.org/client_cgi/" - self.ext_ver = self.get_ext_ver() - self.ext_browser = "chrome" - self.user_uuid = uuid.uuid4().hex - self.user_agent = "Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36" - self.product = "cws" - self.port_type_choice: str - self.zoneAvailable = ["AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI", - "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL", - "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"] - - def get_ext_ver(self) -> str: - about = httpx.get("https://hola.org/access/my/settings#/about").text - if 'window.pub_config.init({"ver":"' in about: - version = about.split('window.pub_config.init({"ver":"')[1].split('"')[0] - return version - - # last know working version - return "1.199.485" - - -class Engine: - def __init__(self, Settings) -> None: - self.settings = Settings - - def get_proxy(self, tunnels, tls=False) -> str: - login = f"user-uuid-{self.settings.user_uuid}" - proxies = dict(tunnels) - protocol = "https" if tls else "http" - for k, v in proxies["ip_list"].items(): - return "%s://%s:%s@%s:%d" % ( - protocol, - login, - proxies["agent_key"], - k if tls else v, - proxies["port"][self.settings.port_type_choice], - ) - - def generate_session_key(self, timeout: float = 10.0) -> json: - post_data = {"login": "1", "ver": self.settings.ext_ver} - return httpx.post( - f"{self.settings.ccgi_url}background_init?uuid={self.settings.user_uuid}", - json=post_data, - headers={"User-Agent": self.settings.user_agent}, - timeout=timeout, - ).json()["key"] - - def zgettunnels( - self, session_key: str, country: str, timeout: float = 10.0 - ) -> json: - qs = { - "country": country.lower(), - "limit": 1, - "ping_id": random.random(), - "ext_ver": self.settings.ext_ver, - "browser": self.settings.ext_browser, - "uuid": self.settings.user_uuid, - "session_key": session_key, - } - - return httpx.post( - f"{self.settings.ccgi_url}zgettunnels", params=qs, timeout=timeout - ).json() - - -class Hola: - def __init__(self, Settings) -> None: - self.myipUri: str = "https://hola.org/myip.json" - self.settings = Settings - - def get_country(self) -> str: - - if not self.settings.randomProxy and not self.settings.userCountry: - self.settings.userCountry = httpx.get(self.myipUri).json()["country"] - - if ( - not self.settings.userCountry in self.settings.zoneAvailable - or self.settings.randomProxy - ): - self.settings.userCountry = random.choice(self.settings.zoneAvailable) - - return self.settings.userCountry - - -def init_proxy(data): - settings = Settings( - data["zone"] - ) - settings.port_type_choice = data[ - "port" - ] - - hola = Hola(settings) - engine = Engine(settings) - - userCountry = hola.get_country() - session_key = engine.generate_session_key() - # time.sleep(10) - tunnels = engine.zgettunnels(session_key, userCountry) - - return engine.get_proxy(tunnels) - - -# Get current working directory -main_directory = os.getcwd() - -# Check if API key exists, if not create file -if not os.path.isfile(f"{main_directory}/api-key.txt"): - with open(f'{main_directory}/api-key.txt', 'w') as api_key: - print(f"\nIf you have an API key please place it in api-key.txt") - api_key.write("Delete this and place your API key on this line") - -# Check if API key exists -with open(f'{main_directory}/api-key.txt') as api_key: - api_key = api_key.readline() - if api_key == "Delete this and place your API key on this line": - print(f"\nNo API key found!\n") - api_key = "" - -# Create database and table for local key caching if they don't exist -if not os.path.isfile(f"{main_directory}/database.db"): - dbconnection = sqlite3.connect("database.db") - dbcursor = dbconnection.cursor() - dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )') - dbconnection.close() - - -# Define key cache function - - -def key_cache(pssh: str, db_keys: str): - dbconnection = sqlite3.connect("database.db") - dbcursor = dbconnection.cursor() - dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys)) - dbconnection.commit() - dbconnection.close() - - -# Making sure a .wvd file exists and using that as the CDM -try: - cdm = glob.glob(f'{main_directory}/*.wvd')[0] -except: - cdm = None - print(f"Please place a WVD in {main_directory}") - print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.") - - -# Define key retrieval function -def retrieve_keys(proxy_used: str = None, headers: list = None, - json_data: json = None, device: str = cdm): - pssh = input("PSSH: ") - licence_url = input("License URL: ") - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } +# Based on the selected switch within the mutually exclusive group, perform actions +if switches.crunchyroll: + # Perform action for --crunchyroll + if switches.web_dl: + mpd = input("MPD URL: ") + file = Helpers.download.web_dl_crunchyroll(mpd=mpd, device=device) + print(f'Saved at {file[0]}') else: - proxies = None - challenge_pssh = PSSH(pssh) - try: - device = Device.load(device) - except: - print(f"Please place a WVD in {main_directory}") - exit() - cdm = Cdm.from_device(device) - session_id = cdm.open() - challenge = cdm.get_license_challenge(session_id, challenge_pssh) - license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data) - license.raise_for_status() - cdm.parse_license(session_id, license.content) - db_keys = '' - for key in cdm.get_keys(session_id): - if key.type != 'SIGNING': - db_keys += f'{key.kid.hex}:{key.key.hex()}\n' - key_cache(pssh=pssh, db_keys=db_keys) - return db_keys - -# Define retrieve keys remotely function + Sites.Crunchyroll.decrypt_crunchyroll(wvd=device, license_curl_headers=license_curl.headers) -def retrieve_keys_remotely(proxy_used: str = None): - api_url = "https://api.cdrm-project.com" - api_device = "CDM" - pssh = input("PSSH: ") - license_url = input("License URL: ") - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } +elif switches.crunchyroll_remote: + # Perform action for --crunchyroll-remote + if switches.web_dl: + mpd = input("MPD URL: ") + file = Helpers.download.web_dl_crunchyroll(mpd=mpd, api_key=api_key, remote=True) + print(f'Saved at {file[0]}') else: - proxies = None - x_headers = { - "X-Secret-Key": api_key - } - open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers) + Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers) - session_id = open_session.json()["data"]["session_id"] - - license_challenge_json_data = { - "session_id": session_id, - "init_data": pssh - } - - licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers, - json=license_challenge_json_data) - - license_message = licence_challenge.json()["data"]["challenge_b64"] - - license = requests.post( - headers=License_cURL.headers, - proxies=proxies, - url=license_url, - data=base64.b64decode(license_message) - ) - - parse_license_json_data = { - "session_id": session_id, - "license_message": f"{base64.b64encode(license.content).decode()}" - } - - requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data, - headers=x_headers) - - get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL", - json={"session_id": session_id}, headers=x_headers) - db_keys = '' - for key in get_keys.json()["data"]["keys"]: - if not key["type"] == "SIGNING": - db_keys += f"{key['key_id']}:{key['key']}\n" - key_cache(pssh=pssh, db_keys=db_keys) - - requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers) - - return db_keys - - -# Define retrieve keys remotely VDOCipher function -def retrieve_keys_remotely_vdocipher(proxy_used: str = None): - - # Get URL from function - url = input(f"Video URL: ") - - # Set the VDOCipher token headers - token_headers = { - 'accept': '*/*', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', - ## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/ - 'Origin': f"https://{urandom(8).hex()}.com", - } - - # Set the token response - token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers) - try: - otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0] - playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0] - except IndexError: - try: - otp_match = re.findall(r"otp=(.*)&", token_response.text)[0] - playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0] - except IndexError: - print("\nAn error occured while getting otp/playback") - exit() - - # Set the video ID - video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"] - - # Set new token response (1) - token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers) - try: - license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0] - mpd = token_response.json()["dash"]["manifest"] - except KeyError: - print("\n An error occured while getting mpd/license url") - - # Set new token response (2) - token_response = requests.get(mpd, headers=token_headers) - - # Set API URL - api_url = "https://api.cdrm-project.com" - - # Set API Device - api_device = "CDM" - - # Retrieve PSSH - pssh = re.search(r"(.*)", token_response.text).group(1) - - # Check if proxy was used - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } +elif switches.youtube: + # Perform action for --YouTube + if switches.web_dl: + url = input("YouTube URL: ") + file = Helpers.download.youtube_dlp(url=url, device=device) + print(f'Saved at {file}') else: - proxies = None - - # Set API headers - x_headers = { - "X-Secret-Key": api_key - } - - # Open API session - open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers) - - # Set the session ID - session_id = open_session.json()["data"]["session_id"] - - # Send json data to get license challenge - license_challenge_json_data = { - "session_id": session_id, - "init_data": pssh - } - - # Get the license challenge from PSSH - licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers, - json=license_challenge_json_data) - - # Set the final token - token = { - "otp":otp_match, - "playbackInfo":playbackinfo_match, - "href":url, - "tech":"wv", - "licenseRequest":licence_challenge.json()["data"]["challenge_b64"] - } - - # Send challenge - license = requests.post( - proxies=proxies, - url=license_url, - json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'} - ) - - # Set the parsing JSON data - parse_license_json_data = { - "session_id": session_id, - "license_message": license.json()["license"] - } - - # Send the parsing JSON data - requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data, - headers=x_headers) - - # Get the keys - get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL", - json={"session_id": session_id}, headers=x_headers) - - # Cache the keys - db_keys = '' - for key in get_keys.json()["data"]["keys"]: - if not key["type"] == "SIGNING": - db_keys += f"{key['key_id']}:{key['key']}\n" - key_cache(pssh=pssh, db_keys=db_keys) - - # Close the session - requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers) - - # Return the keys - return db_keys - -# Defining service prompt function + Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) -def service_prompt(): - service_prompt = [ - inquirer.List('Service', - message="Please choose a service", - choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'], - ), - ] - service_selected = inquirer.prompt(service_prompt) - - proxy_needed_prompt = [ - inquirer.List('Proxy', - message="Will you need a proxy?", - choices=['Yes', 'No'], - ), - ] - - proxy_needed = inquirer.prompt(proxy_needed_prompt) - if proxy_needed["Proxy"] == "Yes": - allowed_countries = [ - "AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI", - "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL", - "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB" - ] - proxy_available = [ - inquirer.List('Proxys available', - message="Please choose a country", - choices=allowed_countries - ), - ] - selected_proxy = inquirer.prompt(proxy_available) - return service_selected["Service"], selected_proxy["Proxys available"] +elif switches.youtube_remote: + # Perform action for --youtube-remote + if switches.web_dl: + url = input("YouTube URL: ") + file = Helpers.download.youtube_dlp(url=url, api_key=api_key, remote=True) + print(f'Saved at {file}') else: - selected_proxy = None - return service_selected["Service"], selected_proxy + Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) -# Define variables for the service and proxy wanted +elif switches.generic_remote: + # Perform action for --generic-remote + if switches.web_dl: + mpd = input("MPD URL: ") + file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True) + print(f'Saved at {file[0]}') + else: + Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers) -service_selected, selected_proxy = service_prompt() - - -if service_selected == "Generic": - print(f"\n{retrieve_keys(proxy_used=selected_proxy)}") -elif service_selected == "Generic with headers from Licence cURL": - print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}") -elif service_selected == "Remote": - print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}") -elif service_selected == "Remote VDOCipher": - print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}") - +else: + # If no switch is provided, perform a default action + if switches.web_dl: + mpd = input("MPD URL: ") + file = Helpers.download.web_dl_generic(mpd=mpd, device=device) + print(f'Saved at {file[0]}') + else: + Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)