Update 3.0.1 - alpha
- Added MPD / HLS PSSH Parser - Added Caching via KID
This commit is contained in:
parent
3eac4d7706
commit
89bcd55f6f
@ -5,4 +5,4 @@ import CDRM_Modules
|
||||
CDRM_Modules.Startup_Checks.run_startup_checks(startup=True)
|
||||
|
||||
# Run main menu
|
||||
CDRM_Modules.Main_Menu.main_menu()
|
||||
test = CDRM_Modules.Main_Menu.main_menu()
|
30
CDRM_Modules/Database.py
Normal file
30
CDRM_Modules/Database.py
Normal file
@ -0,0 +1,30 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
|
||||
def upsert_key(PSSH, MPD, KID, KEY, License_URL, Headers, Cookies, Data):
|
||||
# Connect to the SQLite database
|
||||
conn = sqlite3.connect(f'{os.getcwd()}/Keys.db')
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create the SQL command for upserting
|
||||
sql = '''
|
||||
INSERT INTO keys (PSSH, MPD, KID, KEY, License_URL, Headers, Cookies, Data)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(KID)
|
||||
DO UPDATE SET
|
||||
PSSH = excluded.PSSH,
|
||||
MPD = excluded.MPD,
|
||||
KEY = excluded.KEY,
|
||||
License_URL = excluded.License_URL,
|
||||
Headers = excluded.Headers,
|
||||
Cookies = excluded.Cookies,
|
||||
Data = excluded.Data;
|
||||
'''
|
||||
|
||||
# Execute the SQL command
|
||||
cursor.execute(sql, (PSSH, MPD, KID, KEY, License_URL, Headers, Cookies, Data))
|
||||
|
||||
# Commit the transaction and close the connection
|
||||
conn.commit()
|
||||
conn.close()
|
@ -1,4 +1,5 @@
|
||||
import httpx
|
||||
from License_cURL import headers
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.pssh import PSSH
|
||||
@ -6,7 +7,11 @@ from pywidevine.remotecdm import RemoteCdm
|
||||
import yaml
|
||||
import os
|
||||
from colorama import Fore
|
||||
import re
|
||||
from . import CDM_Selector
|
||||
from . import Get_PSSH
|
||||
from . import Database
|
||||
from InquirerPy import inquirer
|
||||
|
||||
def load_license_curl():
|
||||
class LicenseCURL:
|
||||
@ -31,6 +36,19 @@ def load_license_curl():
|
||||
cookies = None
|
||||
return LicenseCURL()
|
||||
|
||||
def check_if_url(string):
|
||||
# Regular expression for validating a URL
|
||||
url_pattern = re.compile(
|
||||
r'^(?:http|ftp)s?://' # http:// or https://
|
||||
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
|
||||
r'localhost|' # localhost...
|
||||
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
|
||||
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
|
||||
r'(?::\d+)?' # optional port
|
||||
r'(?:/?|[/?]\S+)$', # optional path
|
||||
re.IGNORECASE)
|
||||
return re.match(url_pattern, string) is not None
|
||||
|
||||
def decrypt_local_cdm():
|
||||
cdm_choice = CDM_Selector.select_local_cdm()
|
||||
try:
|
||||
@ -49,13 +67,36 @@ def decrypt_local_cdm():
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
||||
exit(1)
|
||||
try:
|
||||
pssh = PSSH(input("\nPSSH: "))
|
||||
pssh_or_mpd_input = input("MPD Link or PSSH: ")
|
||||
if check_if_url(pssh_or_mpd_input):
|
||||
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
|
||||
if len(pssh_list) > 1:
|
||||
choice = inquirer.select(
|
||||
message="Select PSSH:",
|
||||
choices=pssh_list,
|
||||
).execute()
|
||||
db_pssh = choice
|
||||
db_mpd = pssh_or_mpd_input
|
||||
pssh = PSSH(choice)
|
||||
else:
|
||||
if pssh_list:
|
||||
db_pssh = pssh_list[0]
|
||||
db_mpd = pssh_or_mpd_input
|
||||
pssh = PSSH(pssh_list[0])
|
||||
else:
|
||||
db_pssh = pssh_or_mpd_input
|
||||
db_mpd = None
|
||||
pssh = PSSH(pssh_or_mpd_input)
|
||||
except Exception as error:
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
||||
exit(1)
|
||||
user_license_url = input("License URL: ")
|
||||
db_url = user_license_url
|
||||
print()
|
||||
license_curl = load_license_curl()
|
||||
db_headers = license_curl.headers
|
||||
db_cookies = license_curl.cookies
|
||||
service_cert_set_success = False
|
||||
try:
|
||||
service_certificate = httpx.post(
|
||||
url=user_license_url,
|
||||
@ -64,9 +105,11 @@ def decrypt_local_cdm():
|
||||
data=cdm.service_certificate_challenge
|
||||
).content
|
||||
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
|
||||
service_cert_set_success = True
|
||||
except Exception as error:
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
|
||||
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh)
|
||||
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
|
||||
db_data = challenge
|
||||
try:
|
||||
license_response = httpx.post(
|
||||
url=user_license_url,
|
||||
@ -84,6 +127,7 @@ def decrypt_local_cdm():
|
||||
exit(1)
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type == 'CONTENT':
|
||||
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
|
||||
print(f"{key.kid.hex}:{key.key.hex()}")
|
||||
|
||||
return
|
||||
@ -110,13 +154,36 @@ def decrypt_remote_cdm():
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
||||
exit(1)
|
||||
try:
|
||||
pssh = PSSH(input("\nPSSH: "))
|
||||
pssh_or_mpd_input = input("MPD Link or PSSH: ")
|
||||
if check_if_url(pssh_or_mpd_input):
|
||||
pssh_list = Get_PSSH.get_pssh(pssh_or_mpd_input)
|
||||
if len(pssh_list) > 1:
|
||||
choice = inquirer.select(
|
||||
message="Select PSSH:",
|
||||
choices=pssh_list,
|
||||
).execute()
|
||||
db_pssh = choice
|
||||
db_mpd = pssh_or_mpd_input
|
||||
pssh = PSSH(choice)
|
||||
else:
|
||||
if pssh_list:
|
||||
db_pssh = pssh_list[0]
|
||||
db_mpd = pssh_or_mpd_input
|
||||
pssh = PSSH(pssh_list[0])
|
||||
else:
|
||||
db_pssh = pssh_or_mpd_input
|
||||
db_mpd = None
|
||||
pssh = PSSH(pssh_or_mpd_input)
|
||||
except Exception as error:
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}')
|
||||
exit(1)
|
||||
user_license_url = input("License URL: ")
|
||||
db_url = user_license_url
|
||||
print()
|
||||
license_curl = load_license_curl()
|
||||
db_headers = license_curl.headers
|
||||
db_cookies = license_curl.cookies
|
||||
set_service_certificate_success = False
|
||||
try:
|
||||
service_certificate = httpx.post(
|
||||
url=user_license_url,
|
||||
@ -125,9 +192,11 @@ def decrypt_remote_cdm():
|
||||
data=cdm.service_certificate_challenge
|
||||
).content
|
||||
cdm.set_service_certificate(session_id=session_id, certificate=service_certificate)
|
||||
service_cert_set_success = True
|
||||
except Exception as error:
|
||||
print(f'{Fore.RED}An error occurred.\n\n{error}\n\nTrying without service certificate')
|
||||
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh)
|
||||
challenge = cdm.get_license_challenge(session_id=session_id, pssh=pssh, privacy_mode=service_cert_set_success)
|
||||
db_data = challenge
|
||||
try:
|
||||
license_response = httpx.post(
|
||||
url=user_license_url,
|
||||
@ -145,6 +214,7 @@ def decrypt_remote_cdm():
|
||||
exit(1)
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type == 'CONTENT':
|
||||
Database.upsert_key(PSSH=f'{db_pssh}', MPD=f'{db_mpd}', KID=f'{key.kid.hex}', KEY=f'{key.key.hex()}', License_URL=f'{db_url}', Headers=f'{db_headers}', Cookies=f'{db_cookies}', Data=f'{db_data}')
|
||||
print(f"{key.kid.hex}:{key.key.hex()}")
|
||||
|
||||
return
|
58
CDRM_Modules/Get_PSSH.py
Normal file
58
CDRM_Modules/Get_PSSH.py
Normal file
@ -0,0 +1,58 @@
|
||||
from devine.core.manifests import dash, hls
|
||||
from devine.core.tracks import Tracks
|
||||
import requests
|
||||
from . import Decrypt
|
||||
|
||||
# Define a function to get PSSH from MPD
|
||||
def get_pssh(mpd_or_m3u8_url):
|
||||
try:
|
||||
tracks = Tracks()
|
||||
client = requests.Session()
|
||||
License_cURL = Decrypt.load_license_curl()
|
||||
if License_cURL.headers:
|
||||
client.headers.update(License_cURL.headers)
|
||||
if License_cURL.cookies:
|
||||
client.cookies.update(License_cURL.cookies)
|
||||
mpd = dash.DASH.from_url(url=mpd_or_m3u8_url).to_tracks(language="en")
|
||||
tracks.add(mpd)
|
||||
PSSH = []
|
||||
for track in tracks:
|
||||
representation = track.data['dash']['representation']
|
||||
adaptation_set = track.data['dash']['adaptation_set']
|
||||
track_drm = dash.DASH.get_drm(
|
||||
representation.findall("ContentProtection") +
|
||||
adaptation_set.findall("ContentProtection")
|
||||
)
|
||||
for drm in track_drm:
|
||||
PSSH.append(drm.pssh)
|
||||
pssh_list = []
|
||||
for pssh in PSSH:
|
||||
if str(pssh) not in pssh_list:
|
||||
pssh_list.append(str(pssh))
|
||||
return pssh_list
|
||||
except:
|
||||
try:
|
||||
tracks = Tracks()
|
||||
client = requests.Session()
|
||||
License_cURL = Decrypt.load_license_curl()
|
||||
client.headers.update(License_cURL.headers)
|
||||
client.cookies.update(License_cURL.cookies)
|
||||
hls = hls.from_url(url=mpd_or_m3u8_url, session=client).to_tracks()
|
||||
tracks.add(hls)
|
||||
PSSH = []
|
||||
for track in tracks:
|
||||
representation = track.data['dash']['representation']
|
||||
adaptation_set = track.data['dash']['adaptation_set']
|
||||
track_drm = dash.DASH.get_drm(
|
||||
representation.findall("ContentProtection") +
|
||||
adaptation_set.findall("ContentProtection")
|
||||
)
|
||||
for drm in track_drm:
|
||||
PSSH.append(drm.pssh)
|
||||
pssh_list = []
|
||||
for pssh in PSSH:
|
||||
if str(pssh) not in pssh_list:
|
||||
pssh_list.append(str(pssh))
|
||||
return pssh_list
|
||||
except:
|
||||
return
|
@ -59,9 +59,9 @@ def check_if_keys_db_exists():
|
||||
keys_db_cursor = keys_db_connection.cursor()
|
||||
keys_db_cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS keys (
|
||||
MPD TEXT,
|
||||
PSSH TEXT,
|
||||
KID TEXT,
|
||||
MPD TEXT,
|
||||
KID TEXT PRIMARY KEY,
|
||||
KEY TEXT,
|
||||
License_URL TEXT,
|
||||
Headers TEXT,
|
||||
|
@ -2,4 +2,6 @@ from . import Startup_Checks
|
||||
from . import CDRM_Keys_API
|
||||
from . import Decrypt
|
||||
from . import CDM_Selector
|
||||
from . import Main_Menu
|
||||
from . import Main_Menu
|
||||
from . import Get_PSSH
|
||||
from . import Database
|
Loading…
Reference in New Issue
Block a user