389 lines
12 KiB
Python
389 lines
12 KiB
Python
# import dependencies
|
|
from pywidevine import PSSH
|
|
from pywidevine import Cdm
|
|
from pywidevine import Device
|
|
from pywidevine import RemoteCdm
|
|
import requests
|
|
from . import key_cache
|
|
import base64
|
|
import tls_client
|
|
from xml.etree import ElementTree as ET
|
|
|
|
# Defining decrypt function
|
|
def decrypt_content(in_pssh: str = None, license_url: str = None, headers: dict = None, json_data: dict = None, wvd: str = None, scheme: str = None, proxy: str = None,):
|
|
|
|
# prepare pssh
|
|
pssh = PSSH(in_pssh)
|
|
|
|
if wvd != 'Remote':
|
|
|
|
# load device
|
|
device = Device.load(wvd)
|
|
|
|
# load CDM from device
|
|
cdm = Cdm.from_device(device)
|
|
|
|
else:
|
|
cdm = RemoteCdm(
|
|
device_type='ANDROID',
|
|
system_id=int(requests.post(url='https://cdrm-project.com/devine').content),
|
|
security_level=3,
|
|
host='https://cdrm-project.com/devine',
|
|
secret='CDRM-Project',
|
|
device_name='CDM'
|
|
)
|
|
|
|
# open CDM session
|
|
session_id = cdm.open()
|
|
|
|
# Generate the challenge
|
|
challenge = cdm.get_license_challenge(session_id, pssh)
|
|
|
|
if scheme == 'CommonWV':
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
data=challenge,
|
|
proxies={
|
|
'http': proxy,
|
|
},
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.content)
|
|
except:
|
|
# Exception, try to find by regex via json
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['license'])
|
|
except:
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['licenseData'])
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'Amazon':
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
data={
|
|
'widevine2Challenge': f'{base64.b64encode(challenge).decode()}',
|
|
'includeHdcpTestKeyInLicense': 'true',
|
|
},
|
|
proxies={
|
|
'http': proxy,
|
|
}
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['widevine2License']['license'])
|
|
except Exception as error:
|
|
return [f'{error}\n\n{license.content}']
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'YouTube':
|
|
json_data['licenseRequest'] = base64.b64encode(challenge).decode()
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json=json_data,
|
|
proxies={
|
|
'http': proxy,
|
|
}
|
|
)
|
|
# Extract license from json dict
|
|
license = license.json()["license"].replace("-", "+").replace("_", "/")
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license)
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'RTE':
|
|
json_data['getWidevineLicense']['widevineChallenge'] = base64.b64encode(challenge).decode()
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json=json_data,
|
|
proxies={
|
|
'http': proxy,
|
|
}
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['getWidevineLicenseResponse']['license'])
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'Canal+':
|
|
|
|
try:
|
|
json_data['ServiceRequest']['InData']['ChallengeInfo'] = base64.b64encode(challenge).decode()
|
|
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json=json_data
|
|
)
|
|
|
|
# Parse the license
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['ServiceResponse']['OutData']['LicenseInfo'])
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
except:
|
|
|
|
try:
|
|
session = tls_client.Session()
|
|
|
|
response = session.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
data=f'{base64.b64encode(challenge).decode()}'
|
|
).content.decode()
|
|
|
|
# Define the namespace
|
|
namespace = {'ns': 'http://www.canal-plus.com/DRM/V1'}
|
|
|
|
# Parse the XML string
|
|
root = ET.fromstring(response)
|
|
|
|
# Find the license element using the namespace
|
|
license_element = root.find('.//ns:license', namespace)
|
|
|
|
# Extract the text content of the license element
|
|
license_content = license_element.text
|
|
|
|
# Parse the license
|
|
try:
|
|
cdm.parse_license(session_id, license_content)
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
except Exception as error:
|
|
return [f'{error}\n\n{license.content}']
|
|
|
|
if scheme == 'NosTV':
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json={
|
|
'challenge': base64.b64encode(challenge).decode(),
|
|
},
|
|
proxies={
|
|
'http': proxy,
|
|
},
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['license'][0])
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'AstroGo':
|
|
|
|
# Insert the challenge
|
|
json_data['licenseChallenge'] = base64.b64encode(challenge).decode()
|
|
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json=json_data,
|
|
proxies={
|
|
'http': proxy,
|
|
},
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.content)
|
|
except:
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['licenseData'])
|
|
except:
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['licenseData'][0])
|
|
except:
|
|
try:
|
|
cdm.parse_license(session_id, str(license.json()['licenseData'][0]))
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys
|
|
|
|
if scheme == 'Vodafone':
|
|
print(json_data)
|
|
|
|
# Find the challenge field
|
|
for request in json_data['requests']:
|
|
if request['action'] == 'license':
|
|
request['params']['challenge'] = base64.b64encode(challenge).decode()
|
|
|
|
# send license challenge
|
|
license = requests.post(
|
|
url=license_url,
|
|
headers=headers,
|
|
json=json_data,
|
|
proxies={
|
|
'http': proxy,
|
|
},
|
|
)
|
|
|
|
# Parse the license if it comes back in plain bytes
|
|
try:
|
|
cdm.parse_license(session_id, license.json()['license'])
|
|
except Exception as error:
|
|
return f'{error}\n\n{license.content}'
|
|
|
|
# Assign variable for caching keys
|
|
cached_keys = ""
|
|
|
|
for key in cdm.get_keys(session_id):
|
|
if key.type != "SIGNING":
|
|
cached_keys += f"{key.kid.hex}:{key.key.hex()}\n"
|
|
|
|
# Cache the keys
|
|
key_cache.cache_keys(pssh=in_pssh, keys=cached_keys)
|
|
|
|
# close session, disposes of session data
|
|
cdm.close(session_id)
|
|
|
|
# Return the keys
|
|
return cached_keys |