import base64 import html import logging import os import shutil import subprocess import sys import traceback from http.cookiejar import MozillaCookieJar import click import requests from appdirs import AppDirs from langcodes import Language from pymediainfo import MediaInfo from vinetrimmer import services from vinetrimmer.config import Config, config, credentials, directories, filenames from vinetrimmer.objects import AudioTrack, Credential, TextTrack, Title, Titles, VideoTrack from vinetrimmer.objects.vaults import InsertResult, Vault, Vaults from vinetrimmer.utils import Cdm, is_close_match from vinetrimmer.utils.click import (AliasedGroup, ContextData, acodec_param, language_param, quality_param, range_param, vcodec_param, wanted_param) from vinetrimmer.utils.collections import as_list, merge_dict from vinetrimmer.utils.io import load_yaml from vinetrimmer.utils.widevine.device import LocalDevice, RemoteDevice from vinetrimmer.vendor.pymp4.parser import Box from pyplayready.cdm import Cdm as CdmPr from pyplayready.device import Device from pyplayready.pssh import PSSH def get_cdm(service, profile=None, cdm_name=None): """ Get CDM Device (either remote or local) for a specified service. Raises a ValueError if there's a problem getting a CDM. """ if not cdm_name: cdm_name = config.cdm.get(service) or config.cdm.get("default") if not cdm_name: raise ValueError("A CDM to use wasn't listed in the vinetrimmer.yml config") if isinstance(cdm_name, dict): if not profile: raise ValueError("CDM config is mapped for profiles, but no profile was chosen") cdm_name = cdm_name.get(profile) or config.cdm.get("default") if not cdm_name: raise ValueError(f"A CDM to use was not mapped for the profile {profile}") if "sl2000" or "sl3000" in cdm_name.lower(): device_pr = Device.load(os.path.join(directories.devices, f"{cdm_name}.prd")) return device_pr try: return LocalDevice.load(os.path.join(directories.devices, f"{cdm_name}.wvd")) except FileNotFoundError: dirs = [ os.path.join(directories.devices, cdm_name), os.path.join(AppDirs("pywidevine", False).user_data_dir, "devices", cdm_name), os.path.join(AppDirs("pywidevine", False).site_data_dir, "devices", cdm_name), ] for d in dirs: try: return LocalDevice.from_dir(d) except FileNotFoundError: pass cdm_api = next(iter(x for x in config.cdm_api if x["name"] == cdm_name), None) if cdm_api: return RemoteDevice(**cdm_api) raise ValueError(f"Device {cdm_name!r} not found") def get_service_config(service): """Get both service config and service secrets as one merged dictionary.""" service_config = load_yaml(filenames.service_config.format(service=service.lower())) user_config = (load_yaml(filenames.user_service_config.format(service=service.lower())) or load_yaml(filenames.user_service_config.format(service=service))) if user_config: merge_dict(service_config, user_config) return service_config def get_profile(service): """ Get the default profile for a service from the config. """ profile = config.profiles.get(service) if profile is False: return None # auth-less service if `false` in config if not profile: profile = config.profiles.get("default") if not profile: raise ValueError(f"No profile has been defined for '{service}' in the config.") return profile def get_cookie_jar(service, profile): """Get the profile's cookies if available.""" cookie_file = os.path.join(directories.cookies, service.lower(), f"{profile}.txt") if not os.path.isfile(cookie_file): cookie_file = os.path.join(directories.cookies, service, f"{profile}.txt") if os.path.isfile(cookie_file): cookie_jar = MozillaCookieJar(cookie_file) with open(cookie_file, "r+", encoding="utf-8") as fd: unescaped = html.unescape(fd.read()) fd.seek(0) fd.truncate() fd.write(unescaped) cookie_jar.load(ignore_discard=True, ignore_expires=True) return cookie_jar return None def get_credentials(service, profile="default"): """Get the profile's credentials if available.""" cred = credentials.get(service, {}) if isinstance(cred, dict): cred = cred.get(profile) elif profile != "default": return None if cred: if isinstance(cred, list): return Credential(*cred) else: return Credential.loads(cred) @click.group(name="dl", short_help="Download from a service.", cls=AliasedGroup, context_settings=dict( help_option_names=["-?", "-h", "--help"], max_content_width=116, # max PEP8 line-width, -4 to adjust for initial indent default_map=config.arguments )) @click.option("--debug", is_flag=True, hidden=True) # Handled by vinetrimmer.py @click.option("-p", "--profile", type=str, default=None, help="Profile to use when multiple profiles are defined for a service.") @click.option("-q", "--quality", callback=quality_param, default=None, help="Download Resolution, defaults to best available.") @click.option("-v", "--vcodec", callback=vcodec_param, default="H264", help="Video Codec, defaults to H264.") @click.option("-a", "--acodec", callback=acodec_param, default=None, help="Audio Codec") @click.option("-vb", "--vbitrate", "vbitrate", type=int, default=None, help="Video Bitrate, defaults to Max.") @click.option("-ab", "--abitrate", "abitrate", type=int, default=None, help="Audio Bitrate, defaults to Max.") @click.option("-aa", "--atmos", is_flag=True, default=False, help="Prefer Atmos Audio") @click.option("-r", "--range", "range_", callback=range_param, default="SDR", help="Video Color Range, defaults to SDR.") @click.option("-w", "--wanted", callback=wanted_param, default=None, help="Wanted episodes, e.g. `S01-S05,S07`, `S01E01-S02E03`, `S02-S02E03`, e.t.c, defaults to all.") @click.option("-al", "--alang", callback=language_param, default="orig", help="Language wanted for audio.") @click.option("-sl", "--slang", callback=language_param, default="all", help="Language wanted for subtitles.") @click.option("--proxy", type=str, default=None, help="Proxy URI to use. If a 2-letter country is provided, it will try get a proxy from the config.") @click.option("-A", "--audio-only", is_flag=True, default=False, help="Only download audio tracks.") @click.option("-S", "--subs-only", is_flag=True, default=False, help="Only download subtitle tracks.") @click.option("-C", "--chapters-only", is_flag=True, default=False, help="Only download chapters.") @click.option("-ns", "--no-subs", is_flag=True, default=False, help="Do not download subtitle tracks.") @click.option("-na", "--no-audio", is_flag=True, default=False, help="Do not download audio tracks.") @click.option("-nv", "--no-video", is_flag=True, default=False, help="Do not download video tracks.") @click.option("-nc", "--no-chapters", is_flag=True, default=False, help="Do not download chapters tracks.") @click.option("-ad", "--audio-description", is_flag=True, default=False, help="Download audio description tracks.") @click.option("--list", "list_", is_flag=True, default=False, help="Skip downloading and list available tracks and what tracks would have been downloaded.") @click.option("--selected", is_flag=True, default=False, help="List selected tracks and what tracks are downloaded.") @click.option("--cdm", type=str, default=None, help="Override the CDM that will be used for decryption.") @click.option("--keys", is_flag=True, default=False, help="Skip downloading, retrieve the decryption keys (via CDM or Key Vaults) and print them.") @click.option("--cache", is_flag=True, default=False, help="Disable the use of the CDM and only retrieve decryption keys from Key Vaults. " "If a needed key is unable to be retrieved from any Key Vaults, the title is skipped.") @click.option("--no-cache", is_flag=True, default=False, help="Disable the use of Key Vaults and only retrieve decryption keys from the CDM.") @click.option("--no-proxy", is_flag=True, default=False, help="Force disable all proxy use.") @click.option("-nm", "--no-mux", is_flag=True, default=False, help="Do not mux the downloaded and decrypted tracks.") @click.option("--mux", is_flag=True, default=False, help="Force muxing when using --audio-only/--subs-only/--chapters-only.") @click.pass_context def dl(ctx, profile, cdm, *_, **__): log = logging.getLogger("dl") service = ctx.params.get("service_name") or services.get_service_key(ctx.invoked_subcommand) if not service: log.exit(" - Unable to find service") profile = profile or get_profile(service) service_config = get_service_config(service) vaults = [] for vault in config.key_vaults: try: vaults.append(Config.load_vault(vault)) except Exception as e: log.error(f" - Failed to load vault {vault['name']!r}: {e}") vaults = Vaults(vaults, service=service) local_vaults = sum(v.type == Vault.Types.LOCAL for v in vaults) remote_vaults = sum(v.type == Vault.Types.REMOTE for v in vaults) log.info(f" + {local_vaults} Local Vault{'' if local_vaults == 1 else 's'}") log.info(f" + {remote_vaults} Remote Vault{'' if remote_vaults == 1 else 's'}") try: device = get_cdm(service, profile, cdm) except ValueError as e: raise log.exit(f" - {e}") device_name = device.group_certificate.get_name() if "sl3000" or "sl2000" in device.get_name() else device.system_id log.info(f" + Loaded {device.__class__.__name__}: {device_name} (L{device.security_level})") cdm = CdmPr.from_device(device) if "sl3000" or "sl2000" in device.get_name() else Cdm(device) if profile: cookies = get_cookie_jar(service, profile) credentials = get_credentials(service, profile) if not cookies and not credentials and service_config.get("needs_auth", True): raise log.exit(f" - Profile {profile!r} has no cookies or credentials") else: cookies = None credentials = None ctx.obj = ContextData( config=service_config, vaults=vaults, cdm=cdm, profile=profile, cookies=cookies, credentials=credentials, ) @dl.result_callback() @click.pass_context def result(ctx, service, quality, range_, wanted, alang, slang, audio_only, subs_only, chapters_only, audio_description, list_, keys, cache, no_cache, no_subs, no_audio, no_video, no_chapters, atmos, vbitrate: int, abitrate: int, no_mux, mux, selected, *_, **__): def ccextractor(): log.info("Extracting EIA-608 captions from stream with CCExtractor") track_id = f"ccextractor-{track.id}" # TODO: Is it possible to determine the language of EIA-608 captions? cc_lang = track.language try: cc = track.ccextractor( track_id=track_id, out_path=filenames.subtitles.format(id=track_id, language_code=cc_lang), language=cc_lang, original=False, ) except EnvironmentError: log.warning(" - CCExtractor not found, cannot extract captions") else: if cc: title.tracks.add(cc) log.info(" + Extracted") else: log.info(" + No captions found") log = service.log service_name = service.__class__.__name__ log.info("Retrieving Titles") try: titles = Titles(as_list(service.get_titles())) except requests.HTTPError as e: log.debug(traceback.format_exc()) raise log.exit(f" - HTTP Error {e.response.status_code}: {e.response.reason}") if not titles: raise log.exit(" - No titles returned!") titles.order() titles.print() for title in titles.with_wanted(wanted): if title.type == Title.Types.TV: log.info("Getting tracks for {title} S{season:02}E{episode:02}{name} [{id}]".format( title=title.name, season=title.season or 0, episode=title.episode or 0, name=f" - {title.episode_name}" if title.episode_name else "", id=title.id, )) else: log.info("Getting tracks for {title}{year} [{id}]".format( title=title.name, year=f" ({title.year})" if title.year else "", id=title.id, )) try: title.tracks.add(service.get_tracks(title), warn_only=True) title.tracks.add(service.get_chapters(title)) #if not next((x for x in title.tracks.videos if x.language.language in (v_lang or lang)), None) and title.tracks.videos: # lang = [title.tracks.videos[0].language["language"]] # log.info("Defaulting language to {lang} for tracks".format(lang=lang[0].upper())) except requests.HTTPError as e: log.debug(traceback.format_exc()) raise log.exit(f" - HTTP Error {e.response.status_code}: {e.response.reason}") title.tracks.sort_videos() title.tracks.sort_audios(by_language=alang) title.tracks.sort_subtitles(by_language=slang) title.tracks.sort_chapters() for track in title.tracks: if track.language == Language.get("none"): track.language = title.original_lang track.is_original_lang = is_close_match(track.language, [title.original_lang]) if not list(title.tracks): log.error(" - No tracks returned!") continue if not selected: log.info("> All Tracks:") title.tracks.print() try: #title.tracks.select_videos(by_quality=quality, by_range=range_, one_only=True) title.tracks.select_videos(by_quality=quality, by_vbitrate=vbitrate, by_range=range_, one_only=True) title.tracks.select_audios(by_language=alang, by_bitrate=abitrate, with_descriptive=audio_description) # title.tracks.select_audios(by_language=alang, with_descriptive=audio_description) title.tracks.select_subtitles(by_language=slang, with_forced=True) except ValueError as e: log.error(f" - {e}") continue if no_video: title.tracks.videos.clear() if no_audio: title.tracks.audios.clear() if no_subs: title.tracks.subtitles.clear() if no_chapters: title.tracks.chapters.clear() if audio_only or subs_only or chapters_only: title.tracks.videos.clear() if audio_only: if not subs_only: title.tracks.subtitles.clear() if not chapters_only: title.tracks.chapters.clear() elif subs_only: if not audio_only: title.tracks.audios.clear() if not chapters_only: title.tracks.chapters.clear() elif chapters_only: if not audio_only: title.tracks.audios.clear() if not subs_only: title.tracks.subtitles.clear() if not mux: no_mux = True log.info("> Selected Tracks:") title.tracks.print() if list_: continue # only wanted to see what tracks were available and chosen skip_title = False for track in title.tracks: if not keys: log.info(f"Downloading: {track}") if (service_name == "AppleTVPlus" or service_name == "iTunes") and "VID" in str(track): track.encrypted = True if track.encrypted: if not track.get_pssh(service.session): raise log.exit(" - Failed to get PSSH") log.info(f" + PSSH: {track.pssh}") if not track.get_kid(service.session): raise log.exit(" - Failed to get KID") log.info(f" + KID: {track.kid}") if not keys: if track.needs_proxy: proxy = next(iter(service.session.proxies.values()), None) else: proxy = None track.download(directories.temp, headers=service.session.headers, proxy=proxy) log.info(" + Downloaded") if isinstance(track, VideoTrack) and track.needs_ccextractor_first and not no_subs: ccextractor() if track.encrypted: log.info("Decrypting...") if track.key: log.info(f" + KEY: {track.key} (Static)") elif not no_cache: track.key, vault_used = ctx.obj.vaults.get(track.kid, title.id) if track.key: log.info(f" + KEY: {track.key} (From {vault_used.name} {vault_used.type.name} Key Vault)") for vault in ctx.obj.vaults.vaults: if vault == vault_used: continue result = ctx.obj.vaults.insert_key( vault, service_name.lower(), track.kid, track.key, title.id, commit=True ) if result == InsertResult.SUCCESS: log.info(f" + Cached to {vault} vault") elif result == InsertResult.ALREADY_EXISTS: log.info(f" + Already exists in {vault} vault") if not track.key: if cache: skip_title = True break try: if "common_privacy_cert" in dir(ctx.obj.cdm): session_id = ctx.obj.cdm.open(track.pssh) log.info(f"CDM Session ID - {session_id.hex()}") ctx.obj.cdm.set_service_certificate( session_id, service.certificate( challenge=ctx.obj.cdm.service_certificate_challenge, title=title, track=track, session_id=session_id ) or ctx.obj.cdm.common_privacy_cert ) ctx.obj.cdm.parse_license( session_id, service.license( challenge=ctx.obj.cdm.get_license_challenge(session_id), title=title, track=track, session_id=session_id ) ) else: session_id = ctx.obj.cdm.open() log.info(f"CDM Session ID - {session_id.hex()}") wrm_header = PSSH(track.pssh).get_wrm_headers(downgrade_to_v4=True)[0] # Downgrade to v4 is necessary for ATVP challenge = ctx.obj.cdm.get_license_challenge(session_id, wrm_header) #log.info(f"{wrm_header} --> {challenge}") try: ctx.obj.cdm.parse_license( session_id, base64.b64decode( service.license( challenge=challenge, title=title, track=track, ).encode("ascii") ).decode("ascii") ) except: ctx.obj.cdm.parse_license( session_id, service.license( challenge=challenge, title=title, track=track, ) ) except requests.HTTPError as e: log.debug(traceback.format_exc()) raise log.exit(f" - HTTP Error {e.response.status_code}: {e.response.reason}") content_keys = [ (x.kid, x.key) for x in ctx.obj.cdm.get_keys(session_id, content_only=True) ] if "common_privacy_cert" in dir(ctx.obj.cdm) else [ (str(x.key_id).replace("-", ""), x.key.hex()) for x in ctx.obj.cdm.get_keys(session_id) ] try: ctx.obj.cdm.close(session_id) except Exception as e: raise log.exit(f" - Failed to close CDM session: {e}") if not content_keys: raise log.exit(" - No content keys were returned by the CDM!") log.info(f" + Obtained content keys from the CDM") for kid, key in content_keys: if kid == "b770d5b4bb6b594daf985845aae9aa5f": # Amazon HDCP test key continue log.info(f" + {kid}:{key}") # cache keys into all key vaults for vault in ctx.obj.vaults.vaults: log.info(f"Caching to {vault} vault") cached = 0 already_exists = 0 for kid, key in content_keys: result = ctx.obj.vaults.insert_key(vault, service_name.lower(), kid, key, title.id) if result == InsertResult.FAILURE: log.warning(f" - Failed, table {service_name.lower()} doesn't exist in the vault.") elif result == InsertResult.SUCCESS: cached += 1 elif result == InsertResult.ALREADY_EXISTS: already_exists += 1 ctx.obj.vaults.commit(vault) log.info(f" + Cached {cached}/{len(content_keys)} keys") if already_exists: log.info(f" + {already_exists}/{len(content_keys)} keys already existed in vault") if cached + already_exists < len(content_keys): log.warning(f" Failed to cache {len(content_keys) - cached - already_exists} keys") # use matching content key for the tracks key id track.key = next((key for kid, key in content_keys if kid == track.kid), None) if track.key: log.info(f" + KEY: {track.key} (From CDM)") else: raise log.exit(f" - No content key with the key ID \"{track.kid}\" was returned") if keys: continue # TODO: Move decryption code to Track if not config.decrypter: raise log.exit(" - No decrypter specified") if config.decrypter == "packager": platform = {"win32": "win", "darwin": "osx"}.get(sys.platform, sys.platform) names = ["shaka-packager", "packager", f"packager-{platform}"] executable = next((x for x in (shutil.which(x) for x in names) if x), None) if not executable: raise log.exit(" - Unable to find packager binary") dec = os.path.splitext(track.locate())[0] + ".dec.mp4" try: os.makedirs(directories.temp, exist_ok=True) subprocess.run([ executable, "input={},stream={},output={}".format( track.locate(), track.__class__.__name__.lower().replace("track", ""), dec ), "--enable_raw_key_decryption", "--keys", ",".join([ f"label=0:key_id={track.kid.lower()}:key={track.key.lower()}", # Apple TV+ needs this as shaka pulls the incorrect KID, idk why f"label=1:key_id=00000000000000000000000000000000:key={track.key.lower()}", ]), "--temp_dir", directories.temp ], check=True) except subprocess.CalledProcessError: raise log.exit(" - Failed!") elif config.decrypter == "mp4decrypt": executable = shutil.which("mp4decrypt") if not executable: raise log.exit(" - Unable to find mp4decrypt binary") dec = os.path.splitext(track.locate())[0] + ".dec.mp4" try: subprocess.run([ executable, "--show-progress", "--key", f"{track.kid.lower()}:{track.key.lower()}", track.locate(), dec, ]) except subprocess.CalledProcessError: raise log.exit(" - Failed!") else: log.exit(f" - Unsupported decrypter: {config.decrypter}") track.swap(dec) log.info(" + Decrypted") if keys: continue if track.needs_repack or (config.decrypter == "mp4decrypt" and isinstance(track, (VideoTrack, AudioTrack))): log.info("Repackaging stream with FFmpeg (to fix malformed streams)") track.repackage() log.info(" + Repackaged") if isinstance(track, VideoTrack) and track.needs_ccextractor and not no_subs: ccextractor() if skip_title: for track in title.tracks: track.delete() continue if keys: continue if not list(title.tracks) and not title.tracks.chapters: continue # mux all final tracks to a single mkv file if no_mux: if title.tracks.chapters: final_file_path = directories.downloads if title.type == Title.Types.TV: final_file_path = os.path.join(final_file_path, title.parse_filename(folder=True)) os.makedirs(final_file_path, exist_ok=True) chapters_loc = filenames.chapters.format(filename=title.filename) title.tracks.export_chapters(chapters_loc) shutil.move(chapters_loc, os.path.join(final_file_path, os.path.basename(chapters_loc))) for track in title.tracks: media_info = MediaInfo.parse(track.locate()) final_file_path = directories.downloads if title.type == Title.Types.TV: final_file_path = os.path.join( final_file_path, title.parse_filename(folder=True) ) os.makedirs(final_file_path, exist_ok=True) filename = title.parse_filename(media_info=media_info) if isinstance(track, (AudioTrack, TextTrack)): filename += f".{track.language}" extension = track.codec if isinstance(track, TextTrack) else os.path.splitext(track.locate())[1][1:] if isinstance(track, AudioTrack) and extension == "mp4": extension = "m4a" track.move(os.path.join(final_file_path, f"{filename}.{track.id}.{extension}")) else: log.info("Muxing tracks into an MKV container") muxed_location, returncode = title.tracks.mux(title.filename) if returncode == 1: log.warning(" - mkvmerge had at least one warning, will continue anyway...") elif returncode >= 2: raise log.exit(" - Failed to mux tracks into MKV file") log.info(" + Muxed") for track in title.tracks: track.delete() if title.tracks.chapters: try: os.unlink(filenames.chapters.format(filename=title.filename)) except FileNotFoundError: pass media_info = MediaInfo.parse(muxed_location) final_file_path = directories.downloads if title.type == Title.Types.TV: final_file_path = os.path.join( final_file_path, title.parse_filename(media_info=media_info, folder=True) ) os.makedirs(final_file_path, exist_ok=True) # rename muxed mkv file with new data from mediainfo data of it if audio_only: extension = "mka" elif subs_only: extension = "mks" else: extension = "mkv" shutil.move( muxed_location, os.path.join(final_file_path, f"{title.parse_filename(media_info=media_info)}.{extension}") ) log.info("Processed all titles!") def load_services(): for service in services.__dict__.values(): if callable(getattr(service, "cli", None)): dl.add_command(service.cli) load_services()