CDRM-Project/scripts/extension_decrypt.py
2024-09-08 20:13:39 -04:00

389 lines
12 KiB
Python

# import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
from pywidevine import RemoteCdm
import requests
from . import key_cache
import base64
import tls_client
from xml.etree import ElementTree as ET
# Defining decrypt function
def decrypt_content(in_pssh: str = None, license_url: str = None, headers: dict = None, json_data: dict = None, wvd: str = None, scheme: str = None, proxy: str = None,):
# prepare pssh
pssh = PSSH(in_pssh)
if wvd != 'Remote':
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
else:
cdm = RemoteCdm(
device_type='ANDROID',
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
security_level=3,
host='https://cdrm-project.com/devine',
secret='CDRM-Project',
device_name='CDM'
)
# open CDM session
session_id = cdm.open()
# Generate the challenge
challenge = cdm.get_license_challenge(session_id, pssh)
if scheme == 'CommonWV':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
data=challenge,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.content)
except:
# Exception, try to find by regex via json
try:
cdm.parse_license(session_id, license.json()['license'])
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Amazon':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
data={
'widevine2Challenge': f'{base64.b64encode(challenge).decode()}',
'includeHdcpTestKeyInLicense': 'true',
},
proxies={
'http': proxy,
}
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['widevine2License']['license'])
except Exception as error:
return [f'{error}\n\n{license.content}']
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'YouTube':
json_data['licenseRequest'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
}
)
# Extract license from json dict
license = license.json()["license"].replace("-", "+").replace("_", "/")
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license)
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'RTE':
json_data['getWidevineLicense']['widevineChallenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
}
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['getWidevineLicenseResponse']['license'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Canal+':
try:
json_data['ServiceRequest']['InData']['ChallengeInfo'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data
)
# Parse the license
try:
cdm.parse_license(session_id, license.json()['ServiceResponse']['OutData']['LicenseInfo'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
except:
try:
session = tls_client.Session()
response = session.post(
url=license_url,
headers=headers,
data=f'{base64.b64encode(challenge).decode()}'
).content.decode()
# Define the namespace
namespace = {'ns': 'http://www.canal-plus.com/DRM/V1'}
# Parse the XML string
root = ET.fromstring(response)
# Find the license element using the namespace
license_element = root.find('.//ns:license', namespace)
# Extract the text content of the license element
license_content = license_element.text
# Parse the license
try:
cdm.parse_license(session_id, license_content)
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
except Exception as error:
return [f'{error}\n\n{license.content}']
if scheme == 'NosTV':
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json={
'challenge': base64.b64encode(challenge).decode(),
},
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['license'][0])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'AstroGo':
# Insert the challenge
json_data['licenseChallenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.content)
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'])
except:
try:
cdm.parse_license(session_id, license.json()['licenseData'][0])
except:
try:
cdm.parse_license(session_id, str(license.json()['licenseData'][0]))
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys
if scheme == 'Vodafone':
print(json_data)
# Find the challenge field
for request in json_data['requests']:
if request['action'] == 'license':
request['params']['challenge'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=headers,
json=json_data,
proxies={
'http': proxy,
},
)
# Parse the license if it comes back in plain bytes
try:
cdm.parse_license(session_id, license.json()['license'])
except Exception as error:
return f'{error}\n\n{license.content}'
# Assign variable for caching keys
cached_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# Cache the keys
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
# close session, disposes of session data
cdm.close(session_id)
# Return the keys
return cached_keys