KeyDive/keydive/cdm.py
2024-10-28 21:38:58 +01:00

243 lines
9.4 KiB
Python

import base64
import json
import logging
from pathlib import Path
from typing import Union
from zlib import crc32
from unidecode import unidecode
from pathvalidate import sanitize_filepath, sanitize_filename
from Cryptodome.PublicKey import RSA
from Cryptodome.PublicKey.RSA import RsaKey
from pywidevine.device import Device, DeviceTypes
from pywidevine.license_protocol_pb2 import (SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate,
DrmCertificate, EncryptedClientIdentification)
from keydive.constants import OEM_CRYPTO_API
from keydive.keybox import Keybox
class Cdm:
"""
The Cdm class manages CDM-related operations, such as setting challenge data,
extracting and storing private keys, and exporting device information.
"""
def __init__(self, keybox: bool = False):
"""
Initializes the Cdm object, setting up a logger and containers for client IDs and private keys.
Attributes:
client_id (dict[int, ClientIdentification]): Stores client identification info mapped by key modulus.
private_key (dict[int, RsaKey]): Stores private keys mapped by key modulus.
keybox (bool, optional): Initializes a Keybox instance for secure key management.
"""
self.logger = logging.getLogger(self.__class__.__name__)
# https://github.com/devine-dl/pywidevine
self.client_id: dict[int, ClientIdentification] = {}
self.private_key: dict[int, RsaKey] = {}
self.keybox = Keybox() if keybox else None
@staticmethod
def __client_info(client_id: ClientIdentification) -> dict:
"""
Converts client identification information to a dictionary.
Args:
client_id (ClientIdentification): The client identification.
Returns:
dict: A dictionary of client information.
"""
return {e.name: e.value for e in client_id.client_info}
@staticmethod
def __encrypted_client_info(encrypted_client_id: EncryptedClientIdentification) -> dict:
"""
Converts encrypted client identification information to a dictionary.
Args:
encrypted_client_id (EncryptedClientIdentification): The encrypted client identification.
Returns:
dict: A dictionary of encrypted client information.
"""
content = {
'providerId': encrypted_client_id.provider_id,
'serviceCertificateSerialNumber': encrypted_client_id.service_certificate_serial_number,
'encryptedClientId': encrypted_client_id.encrypted_client_id,
'encryptedClientIdIv': encrypted_client_id.encrypted_client_id_iv,
'encryptedPrivacyKey': encrypted_client_id.encrypted_privacy_key
}
return {
k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else v
for k, v in content.items()
}
def set_challenge(self, data: Union[Path, bytes]) -> None:
"""
Sets the challenge data by extracting device information.
Args:
data (Union[Path, bytes]): The challenge data as a file path or bytes.
Raises:
FileNotFoundError: If the provided file path does not exist.
"""
if isinstance(data, Path):
if not data.is_file():
raise FileNotFoundError(data)
data = data.read_bytes()
try:
signed_message = SignedMessage()
signed_message.ParseFromString(data)
license_request = LicenseRequest()
license_request.ParseFromString(signed_message.msg)
# https://integration.widevine.com/diagnostics
encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id
if encrypted_client_id.SerializeToString():
self.logger.info('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2))
self.logger.warning('The client ID of the challenge is encrypted')
else:
client_id: ClientIdentification = license_request.client_id
self.set_client_id(data=client_id)
except Exception as e:
self.logger.debug('Failed to set challenge data: %s', e)
def set_private_key(self, data: Union[Path, bytes], name: str = None) -> None:
"""
Sets the private key from the provided data.
Args:
data (Union[Path, bytes]): The private key data, either as a file path or bytes.
name (str, optional): Function name for verification against known functions.
Raises:
FileNotFoundError: If the provided file path does not exist or is not a file.
"""
if isinstance(data, Path):
if not data.is_file():
raise FileNotFoundError(data)
data = data.read_bytes()
try:
key = RSA.import_key(data)
if key.n not in self.private_key:
self.logger.info('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8'))
if name and name not in OEM_CRYPTO_API:
self.logger.warning('The function "%s" does not belong to the referenced functions. Communicate it to the developer to improve the tool.', name)
self.private_key[key.n] = key
except Exception as e:
self.logger.debug('Failed to set private key: %s', e)
def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None:
"""
Sets the client ID from the provided data.
Args:
data (Union[ClientIdentification, bytes]): The client ID data.
"""
try:
if isinstance(data, ClientIdentification):
client_id = data
else:
client_id = ClientIdentification()
client_id.ParseFromString(data)
signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate()
signed_drm_certificate.ParseFromString(client_id.token)
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
public_key = drm_certificate.public_key
key = RSA.importKey(public_key)
if key.n not in self.client_id:
self.logger.info('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2))
self.client_id[key.n] = client_id
except Exception as e:
self.logger.debug('Failed to set client ID: %s', e)
def set_device_id(self, data: bytes) -> None:
"""
Sets the device ID in the keybox if it is enabled.
Args:
data (bytes): The device ID data to be stored in the keybox.
"""
if self.keybox:
self.keybox.set_device_id(data=data)
def set_keybox(self, data: bytes) -> None:
"""
Sets the keybox data.
Args:
data (bytes): The keybox data to be set.
"""
if self.keybox:
self.keybox.set_keybox(data=data)
def export(self, parent: Path, wvd: bool = False) -> bool:
"""
Exports the client ID and private key to disk.
Args:
parent (Path): The parent directory to export the files to.
wvd (bool): Whether to export WVD files.
Returns:
bool: True if any keys were exported, otherwise False.
"""
keys = self.client_id.keys() & self.private_key.keys()
for k in keys:
client_info = self.__client_info(self.client_id[k])
# https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211
device = Device(
client_id=self.client_id[k].SerializeToString(),
private_key=self.private_key[k].exportKey('PEM'),
type_=DeviceTypes.ANDROID,
security_level=3,
flags=None
)
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022
parent = sanitize_filepath(parent / client_info['company_name'] / client_info['model_name'] / str(device.system_id) / str(k)[:10])
parent.mkdir(parents=True, exist_ok=True)
path_id_bin = parent / 'client_id.bin'
path_id_bin.write_bytes(data=device.client_id.SerializeToString())
self.logger.info('Exported client ID: %s', path_id_bin)
path_key_bin = parent / 'private_key.pem'
path_key_bin.write_bytes(data=device.private_key.exportKey('PEM'))
self.logger.info('Exported private key: %s', path_key_bin)
if wvd:
wvd_bin = device.dumps()
name = f"{client_info['company_name']} {client_info['model_name']}"
if client_info.get('widevine_cdm_version'):
name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
name = unidecode(name.strip().lower().replace(' ', '_'))
path_wvd = parent / sanitize_filename(f'{name}_{device.system_id}_l{device.security_level}.wvd')
path_wvd.write_bytes(data=wvd_bin)
self.logger.info('Exported WVD: %s', path_wvd)
if self.keybox and not self.keybox.export(parent=parent.parent):
self.logger.warning('The keybox has not been intercepted or decrypted')
return len(keys) > 0
__all__ = ('Cdm',)