KeyDive/keydive/core.py
2024-10-20 16:07:23 +02:00

243 lines
9.3 KiB
Python

import json
import logging
import re
import subprocess
from pathlib import Path
import frida
import xmltodict
from frida.core import Device, Session, Script
from keydive.cdm import Cdm
from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API
from keydive.vendor import Vendor
class Core:
"""
Core class for handling DRM operations and device interactions.
"""
def __init__(self, cdm: Cdm, device: str = None, functions: Path = None, skip: bool = False):
"""
Initializes a Core instance.
Args:
cdm (Cdm): Instance of Cdm for managing DRM related operations.
device (str, optional): ID of the Android device to connect to via ADB. Defaults to None (uses USB device).
functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None.
skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API).
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.running = True
self.cdm = cdm
# https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679
self.skip = skip
# Select device based on provided ID or default to the first USB device.
self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5)
self.logger.info('Device: %s (%s)', self.device.name, self.device.id)
# Obtain device properties
properties = self.device_properties()
self.logger.info('SDK API: %s', properties['ro.build.version.sdk'])
self.logger.info('ABI CPU: %s', properties['ro.product.cpu.abi'])
# Load the hook script
self.functions = functions
self.script = self.__prepare_hook_script()
self.logger.info('Script loaded successfully')
def __prepare_hook_script(self) -> str:
"""
Prepares the hook script content by injecting the library-specific scripts.
Returns:
str: The prepared script content.
"""
content = Path(__file__).with_name('keydive.js').read_text(encoding='utf-8')
symbols = self.__prepare_symbols(self.functions)
# Replace placeholders in script template
replacements = {
'${OEM_CRYPTO_API}': json.dumps(list(OEM_CRYPTO_API)),
'${NATIVE_C_API}': json.dumps(list(NATIVE_C_API)),
'${SYMBOLS}': json.dumps(symbols),
'${SKIP}': str(self.skip)
}
for placeholder, value in replacements.items():
content = content.replace(placeholder, value)
return content
def __prepare_symbols(self, path: Path) -> list:
"""
Parses the provided XML functions file to select relevant functions.
Args:
path (Path): Path to Ghidra XML functions file.
Returns:
list: List of selected functions as dictionaries.
Raises:
FileNotFoundError: If the functions file is not found.
ValueError: If functions extraction fails.
"""
if not path:
return []
elif not path.is_file():
raise FileNotFoundError('Functions file not found')
try:
program = xmltodict.parse(path.read_bytes())['PROGRAM']
addr_base = int(program['@IMAGE_BASE'], 16)
functions = program['FUNCTIONS']['FUNCTION']
# Find a target function from a predefined list
target = None if self.skip else next((f['@NAME'] for f in functions if f['@NAME'] in OEM_CRYPTO_API), None)
# Extract relevant functions
selected = {}
for func in functions:
name = func['@NAME']
args = len(func.get('REGISTER_VAR', []))
# Add function if it matches specific criteria
if name not in selected and (
name == target
or any(None if self.skip else keyword in name for keyword in CDM_FUNCTION_API)
or (not target and re.match(r'^[a-z]+$', name) and args >= 6)
):
selected[name] = {
'type': 'function',
'name': name,
'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base)
}
return list(selected.values())
except Exception as e:
raise ValueError('Failed to extract functions from Ghidra') from e
def device_properties(self) -> dict:
"""
Retrieves system properties from the connected device using ADB shell commands.
Returns:
dict: A dictionary of device properties.
"""
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {}
sp = subprocess.run(['adb', '-s', str(self.device.id), 'shell', 'getprop'], capture_output=True)
for line in sp.stdout.decode('utf-8').splitlines():
match = re.match(r'\[(.*?)\]: \[(.*?)\]', line)
if match:
key, value = match.groups()
# Attempt to cast numeric and boolean values to appropriate types
try:
value = int(value)
except ValueError:
if value.lower() in ('true', 'false'):
value = value.lower() == 'true'
properties[key] = value
return properties
def enumerate_processes(self) -> dict:
"""
Lists processes running on the device, returning a mapping of process names to PIDs.
Returns:
dict: A dictionary mapping process names to PIDs.
"""
processes = {}
# https://github.com/frida/frida/issues/1225#issuecomment-604181822
prompt = ['adb', '-s', str(self.device.id), 'shell', 'ps']
lines = subprocess.run([*prompt, '-A'], capture_output=True).stdout.decode('utf-8').splitlines()
if len(lines) < 10:
lines = subprocess.run(prompt, capture_output=True).stdout.decode('utf-8').splitlines()
# Iterate through lines starting from the second line (skipping header)
for line in lines[1:]:
try:
line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
name = ' '.join(line[8:]).strip()
name = name if name.startswith('[') else Path(name).name
processes[name] = int(line[1])
except Exception:
pass
return processes
def __process_message(self, message: dict, data: bytes) -> None:
"""
Handles messages received from the Frida script.
Args:
message (dict): The message payload.
data (bytes): The raw data associated with the message.
"""
logger = logging.getLogger('Script')
level = message.get('payload')
if isinstance(level, int):
# Process logging messages from Frida script
logger.log(level=level, msg=data.decode('utf-8'))
if level in (logging.FATAL, logging.CRITICAL):
self.running = False
elif isinstance(level, dict) and 'private_key' in level:
self.cdm.set_private_key(data=data, name=level['private_key'])
elif level == 'challenge':
self.cdm.set_challenge(data=data)
elif level == 'client_id':
self.cdm.set_client_id(data=data)
def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool:
"""
Hooks into the specified process.
Args:
pid (int): The process ID to hook.
vendor (Vendor): Instance of Vendor class representing the vendor information.
timeout (int, optional): Timeout for attaching to the process. Defaults to 0.
Returns:
bool: True if the process was successfully hooked, otherwise False.
"""
try:
session: Session = self.device.attach(pid, persist_timeout=timeout)
except frida.ServerNotRunningError as e:
raise EnvironmentError('Frida server is not running') from e
except Exception as e:
self.logger.error(e)
return False
def __process_destroyed() -> None:
session.detach()
script: Script = session.create_script(self.script)
script.on('message', self.__process_message)
script.on('destroyed', __process_destroyed)
script.load()
library = script.exports_sync.getlibrary(vendor.name)
if library:
self.logger.info('Library: %s (%s)', library['name'], library['path'])
# Check if Ghidra XML functions loaded
if vendor.oem > 17 and not self.functions:
self.logger.warning('For OEM API > 17, specifying "functions" is required, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md')
elif vendor.oem < 18 and self.functions:
self.logger.warning('The "functions" attribute is deprecated for OEM API < 18')
return script.exports_sync.hooklibrary(vendor.name)
script.unload()
self.logger.warning('Library not found: %s' % vendor.name)
return False
__all__ = ('Core',)