import logging from datetime import datetime from pathlib import Path from typing import Optional from zlib import crc32 import click import requests import yaml from construct import ConstructError from google.protobuf.json_format import MessageToDict from unidecode import UnidecodeError, unidecode from pywidevine import __version__ from pywidevine.cdm import Cdm from pywidevine.device import Device, DeviceTypes from pywidevine.license_protocol_pb2 import FileHashes, LicenseType from pywidevine.pssh import PSSH @click.group(invoke_without_command=True) @click.option("-v", "--version", is_flag=True, default=False, help="Print version information.") @click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.") def main(version: bool, debug: bool) -> None: """pywidevine—Python Widevine CDM implementation.""" logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) log = logging.getLogger() current_year = datetime.now().year copyright_years = f"2022-{current_year}" log.info("pywidevine version %s Copyright (c) %s rlaphoenix", __version__, copyright_years) log.info("https://github.com/rlaphoenix/pywidevine") if version: return @main.command(name="license") @click.argument("device_path", type=Path) @click.argument("pssh", type=PSSH) @click.argument("server", type=str) @click.option("-t", "--type", "license_type", type=click.Choice(LicenseType.keys(), case_sensitive=False), default="STREAMING", help="License Type to Request.") @click.option("-p", "--privacy", is_flag=True, default=False, help="Use Privacy Mode, off by default.") def license_(device_path: Path, pssh: PSSH, server: str, license_type: str, privacy: bool) -> None: """ Make a License Request for PSSH to SERVER using DEVICE. It will return a list of all keys within the returned license. This expects the Licence Server to be a simple opaque interface where the Challenge is sent as is (as bytes), and the License response is returned as is (as bytes). This is a common behavior for some License Servers and is our only option for a generic licensing function. You may modify this function to change how it sends the Challenge and how it parses the License response. However, for non-generic license calls, I recommend creating a new script that imports and uses the pywidevine module instead. This generic function is only useful as a quick generic license call. This is also a great way of showing you how to use pywidevine in your own projects. """ log = logging.getLogger("license") # load device device = Device.load(device_path) log.info("[+] Loaded Device (%s L%s)", device.system_id, device.security_level) log.debug(device) # load cdm cdm = Cdm.from_device(device) log.info("[+] Loaded CDM") log.debug(cdm) # open cdm session session_id = cdm.open() log.info("[+] Opened CDM Session: %s", session_id.hex()) if privacy: # get service cert for license server via cert challenge service_cert_res = requests.post( url=server, data=cdm.service_certificate_challenge ) if service_cert_res.status_code != 200: log.error( "[-] Failed to get Service Privacy Certificate: [%s] %s", service_cert_res.status_code, service_cert_res.text ) return service_cert = service_cert_res.content provider_id = cdm.set_service_certificate(session_id, service_cert) log.info("[+] Set Service Privacy Certificate: %s", provider_id) log.debug(service_cert) # get license challenge challenge = cdm.get_license_challenge(session_id, pssh, license_type, privacy_mode=True) log.info("[+] Created License Request Message (Challenge)") log.debug(challenge) # send license challenge license_res = requests.post( url=server, data=challenge ) if license_res.status_code != 200: log.error("[-] Failed to send challenge: [%s] %s", license_res.status_code, license_res.text) return licence = license_res.content log.info("[+] Got License Message") log.debug(licence) # parse license challenge cdm.parse_license(session_id, licence) log.info("[+] License Parsed Successfully") # print keys for key in cdm.get_keys(session_id): log.info("[%s] %s:%s", key.type, key.kid.hex, key.key.hex()) # close session, disposes of session data cdm.close(session_id) @main.command() @click.argument("device", type=Path) @click.option("-p", "--privacy", is_flag=True, default=False, help="Use Privacy Mode, off by default.") @click.pass_context def test(ctx: click.Context, device: Path, privacy: bool) -> None: """ Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example. https://bitmovin.com/demos/drm https://bitmovin-a.akamaihd.net/content/art-of-motion_drm/mpds/11331.mpd The device argument is a Path to a Widevine Device (.wvd) file which contains the device private key among other required information. """ # The PSSH is the same for all tracks both video and audio. # However, this might not be the case for all services/manifests. pssh = PSSH("AAAAW3Bzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAADsIARIQ62dqu8s0Xpa" "7z2FmMPGj2hoNd2lkZXZpbmVfdGVzdCIQZmtqM2xqYVNkZmFsa3IzaioCSEQyAA==") # This License Server requires no authorization at all, no cookies, no credentials # nothing. This is often not the case for real services. license_server = "https://cwip-shaka-proxy.appspot.com/no_auth" # Specify OFFLINE if it's a PSSH for a download/offline mode title, e.g., the # Download feature on Netflix Apps. Otherwise, use STREAMING or AUTOMATIC. license_type = "STREAMING" # this runs the `cdm license` CLI-command code with the data we set above # it will print information as it goes to the terminal ctx.invoke( license_, device_path=device, pssh=pssh, server=license_server, license_type=license_type, privacy=privacy ) @main.command() @click.option("-t", "--type", "type_", type=click.Choice([x.name for x in DeviceTypes], case_sensitive=False), required=True, help="Device Type") @click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level") @click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format") @click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file") @click.option("-v", "--vmp", type=Path, default=None, help="Widevine FileHashes Blob file") @click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") @click.pass_context def create_device( ctx: click.Context, type_: str, level: int, key: Path, client_id: Path, vmp: Optional[Path] = None, output: Optional[Path] = None ) -> None: """ Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob. Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID. """ if not key.is_file(): raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx) if not client_id.is_file(): raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx) if vmp and not vmp.is_file(): raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx) log = logging.getLogger("create-device") device = Device( type_=DeviceTypes[type_.upper()], security_level=level, flags=None, private_key=key.read_bytes(), client_id=client_id.read_bytes() ) if vmp: new_vmp_data = vmp.read_bytes() if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data: log.warning("Client ID already has Verified Media Path data") device.client_id.vmp_data = new_vmp_data client_info = {} for entry in device.client_id.client_info: client_info[entry.name] = entry.value wvd_bin = device.dumps() name = f"{client_info['company_name']} {client_info['model_name']}" if client_info.get("widevine_cdm_version"): name += f" {client_info['widevine_cdm_version']}" name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}" try: name = unidecode(name.strip().lower().replace(" ", "_")) except UnidecodeError as e: raise click.ClickException(f"Failed to sanitize name, {e}") if output and output.suffix: if output.suffix.lower() != ".wvd": log.warning(f"Saving WVD with the file extension '{output.suffix}' but '.wvd' is recommended.") out_path = output else: out_dir = output or Path.cwd() out_path = out_dir / f"{name}_{device.system_id}_l{device.security_level}.wvd" if out_path.exists(): log.error(f"A file already exists at the path '{out_path}', cannot overwrite.") return out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_bytes(wvd_bin) log.info("Created Widevine Device (.wvd) file, %s", out_path.name) log.info(" + Type: %s", device.type.name) log.info(" + System ID: %s", device.system_id) log.info(" + Security Level: %s", device.security_level) log.info(" + Flags: %s", device.flags) log.info(" + Private Key: %s (%s bit)", bool(device.private_key), device.private_key.size_in_bits()) log.info(" + Client ID: %s (%s bytes)", bool(device.client_id), len(device.client_id.SerializeToString())) if device.client_id.vmp_data: file_hashes_ = FileHashes() file_hashes_.ParseFromString(device.client_id.vmp_data) log.info(" + VMP: True (%s signatures)", len(file_hashes_.signatures)) else: log.info(" + VMP: False") log.info(" + Saved to: %s", out_path.absolute()) @main.command() @click.argument("wvd_path", type=Path) @click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory") @click.pass_context def export_device(ctx: click.Context, wvd_path: Path, out_dir: Optional[Path] = None) -> None: """ Export a Widevine Device (.wvd) file to an RSA Private Key (PEM and DER) and Client ID Blob. Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID. If an output directory is not specified, it will be stored in the current working directory. """ if not wvd_path.is_file(): raise click.UsageError("wvd_path: Not a path to a file, or it doesn't exist.", ctx) log = logging.getLogger("export-device") log.info("Exporting Widevine Device (.wvd) file, %s", wvd_path.stem) if not out_dir: out_dir = Path.cwd() out_path = out_dir / wvd_path.stem if out_path.exists(): if any(out_path.iterdir()): log.error("Output directory is not empty, cannot overwrite.") return else: log.warning("Output directory already exists, but is empty.") else: out_path.mkdir(parents=True) device = Device.load(wvd_path) log.info(f"L{device.security_level} {device.system_id} {device.type.name}") log.info(f"Saving to: {out_path}") device_meta = { "wvd": { "device_type": device.type.name, "security_level": device.security_level, **device.flags }, "client_info": {}, "capabilities": MessageToDict(device.client_id, preserving_proto_field_name=True)["client_capabilities"] } for client_info in device.client_id.client_info: device_meta["client_info"][client_info.name] = client_info.value device_meta_path = out_path / "metadata.yml" device_meta_path.write_text(yaml.dump(device_meta), encoding="utf8") log.info("Exported Device Metadata as metadata.yml") if device.private_key: private_key_path = out_path / "private_key.pem" private_key_path.write_text( data=device.private_key.export_key().decode(), encoding="utf8" ) private_key_path.with_suffix(".der").write_bytes( device.private_key.export_key(format="DER") ) log.info("Exported Private Key as private_key.der and private_key.pem") else: log.warning("No Private Key available") if device.client_id: client_id_path = out_path / "client_id.bin" client_id_path.write_bytes(device.client_id.SerializeToString()) log.info("Exported Client ID as client_id.bin") else: log.warning("No Client ID available") if device.client_id.vmp_data: vmp_path = out_path / "vmp.bin" vmp_path.write_bytes(device.client_id.vmp_data) log.info("Exported VMP (File Hashes) as vmp.bin") else: log.info("No VMP (File Hashes) available") @main.command() @click.argument("path", type=Path) @click.pass_context def migrate(ctx: click.Context, path: Path) -> None: """ Upgrade from earlier versions of the Widevine Device (.wvd) format. The path argument can be a direct path to a Widevine Device (.wvd) file, or a path to a folder of Widevine Devices files. The migrated devices are saved to its original location, overwriting the old version. """ if not path.exists(): raise click.UsageError(f"path: The path '{path}' does not exist.", ctx) log = logging.getLogger("migrate") if path.is_dir(): devices = list(path.glob("*.wvd")) else: devices = [path] migrated = 0 for device in devices: log.info("Migrating %s...", device.name) try: new_device = Device.migrate(device.read_bytes()) except (ConstructError, ValueError) as e: log.error(" - %s", e) continue log.debug(new_device) new_device.dump(device) log.info(" + Success") migrated += 1 log.info("Migrated %s/%s devices!", migrated, len(devices)) @main.command("serve", short_help="Serve your local CDM and Widevine Devices Remotely.") @click.argument("config_path", type=Path) @click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.") @click.option("-p", "--port", type=int, default=8786, help="Port to serve from.") def serve_(config_path: Path, host: str, port: int) -> None: """ Serve your local CDM and Widevine Devices Remotely. \b [CONFIG] is a path to a serve config file. See `serve.example.yml` for an example config file. \b Host as 127.0.0.1 may block remote access even if port-forwarded. Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded. """ from pywidevine import serve # isort:skip import yaml # isort:skip config = yaml.safe_load(config_path.read_text(encoding="utf8")) serve.run(config, host, port)