import asyncio import contextlib import logging import os from pathlib import Path import re import shutil import subprocess import sys import httpx import pproxy import requests import yaml from vinetrimmer import config from vinetrimmer.utils.collections import as_list from sys import platform def load_yaml(path): if not os.path.isfile(path): return {} with open(path) as fd: return yaml.safe_load(fd) _ip_info = None def get_ip_info(session=None, fresh=False): """Use extreme-ip-lookup.com to get IP location information.""" global _ip_info if fresh or not _ip_info: # alternatives: http://www.geoplugin.net/json.gp, http://ip-api.com/json/, https://extreme-ip-lookup.com/json # _ip_info = (session or httpx).get("https://ip-api.com/json/").json() _ip_info = (session or httpx).get("http://ip-api.com/json/").json() return _ip_info @contextlib.asynccontextmanager async def start_pproxy(host, port, username, password): rerouted_proxy = "http://localhost:8081" server = pproxy.Server(rerouted_proxy) remote = pproxy.Connection(f"http+ssl://{host}:{port}#{username}:{password}") handler = await server.start_server(dict(rserver=[remote])) try: yield rerouted_proxy finally: handler.close() await handler.wait_closed() def download_range(url, count, start=0, proxy=None): """Download n bytes without using the Range header due to support issues.""" # TODO: Can this be done with Aria2c? executable = shutil.which("curl") if not executable: raise EnvironmentError("Track needs curl to download a chunk of data but wasn't found...") arguments = [ executable, "-s", # use -s instead of --no-progress-meter due to version requirements "-L", # follow redirects, e.g. http->https "--proxy-insecure", # disable SSL verification of proxy "--output", "-", # output to stdout "--url", url ] if proxy: arguments.extend(["--proxy", proxy]) curl = subprocess.Popen( arguments, stdout=subprocess.PIPE, stderr=open(os.devnull, "wb"), shell=False ) buffer = b'' location = -1 while len(buffer) < count: stdout = curl.stdout data = b'' if stdout: data = stdout.read(1) if len(data) > 0: location += len(data) if location >= start: buffer += data else: if curl.poll() is not None: break curl.kill() # stop downloading return buffer async def aria2c(uri, out, headers=None, proxy=None): """ Downloads file(s) using Aria2(c). Parameters: uri: URL to download. If uri is a list of urls, they will be downloaded and concatenated into one file. out: The output file path to save to. headers: Headers to apply on aria2c. proxy: Proxy to apply on aria2c. """ executable = shutil.which("aria2c") or shutil.which("aria2") if not executable: raise EnvironmentError("Aria2c executable not found...") arguments = [ executable, "-c", # Continue downloading a partially downloaded file "--remote-time", # Retrieve timestamp of the remote file from the and apply if available "-o", os.path.basename(out), # The file name of the downloaded file, relative to -d "-x", "16", # The maximum number of connections to one server for each download "-j", "16", # The maximum number of parallel downloads for every static (HTTP/FTP) URL "-s", "16", # Download a file using N connections. "--allow-overwrite=true", "--auto-file-renaming=false", "--retry-wait", "5", # Set the seconds to wait between retries. "--max-tries", "15", "--max-file-not-found", "15", "--summary-interval", "0", "--file-allocation", "none" if sys.platform == "win32" else "falloc", "--console-log-level", "warn", "--download-result", "hide" ] for option, value in config.config.aria2c.items(): arguments.append(f"--{option.replace('_', '-')}={value}") for header, value in (headers or {}).items(): if header.lower() == "accept-encoding": # we cannot set an allowed encoding, or it will return compressed # and the code is not set up to uncompress the data continue arguments.extend(["--header", f"{header}: {value}"]) segmented = isinstance(uri, list) segments_dir = f"{out}_segments" if segmented: uri = "\n".join([ f"{url}\n" f"\tdir={segments_dir}\n" f"\tout={i:08}.mp4" for i, url in enumerate(uri) ]) if proxy: arguments.append("--all-proxy") if proxy.lower().startswith("https://"): auth, hostname = proxy[8:].split("@") async with start_pproxy(*hostname.split(":"), *auth.split(":")) as pproxy_: arguments.extend([pproxy_, "-d"]) if segmented: arguments.extend([segments_dir, "-i-"]) proc = await asyncio.create_subprocess_exec(*arguments, stdin=subprocess.PIPE) await proc.communicate(as_list(uri)[0].encode("utf-8")) else: arguments.extend([os.path.dirname(out), uri]) proc = await asyncio.create_subprocess_exec(*arguments) await proc.communicate() else: arguments.append(proxy) try: if segmented: subprocess.run( arguments + ["-d", segments_dir, "-i-"], input=as_list(uri)[0], encoding="utf-8", check=True ) else: subprocess.run( arguments + ["-d", os.path.dirname(out), uri], check=True ) except subprocess.CalledProcessError: raise ValueError("Aria2c failed too many times, aborting") if segmented: # merge the segments together with open(out, "wb") as ofd: for file in sorted(os.listdir(segments_dir)): file = os.path.join(segments_dir, file) with open(file, "rb") as ifd: data = ifd.read() # Apple TV+ needs this done to fix audio decryption data = re.sub(b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", b"\\g<1>\x01", data) ofd.write(data) os.unlink(file) os.rmdir(segments_dir) print() async def saldl(uri, out, headers=None, proxy=None): if headers: headers.update({k: v for k, v in headers.items() if k.lower() != "accept-encoding"}) executable = shutil.which("saldl") or shutil.which("saldl-win64") or shutil.which("saldl-win32") if not executable: raise EnvironmentError("Saldl executable not found...") arguments = [ executable, # "--no-status", "--skip-TLS-verification", "--resume", "--merge-in-order", "-c8", "--auto-size", "1", "-D", os.path.dirname(out), "-o", os.path.basename(out), ] if headers: arguments.extend([ "--custom-headers", "\r\n".join([f"{k}: {v}" for k, v in headers.items()]) ]) if proxy: arguments.extend(["--proxy", proxy]) if isinstance(uri, list): raise ValueError("Saldl code does not yet support multiple uri (e.g. segmented) downloads.") arguments.append(uri) try: subprocess.run(arguments, check=True) except subprocess.CalledProcessError: raise ValueError("Saldl failed too many times, aborting") print() async def m3u8dl(uri, out, track, headers=None, proxy=None): executable = shutil.which("N_m3u8DL-RE") or shutil.which("m3u8DL") or "/usr/bin/N_m3u8DL-RE" if not executable: raise EnvironmentError("N_m3u8DL-RE executable not found...") ffmpeg_binary = shutil.which("ffmpeg") or "/usr/bin/ffmpeg" arguments = [ executable, uri, "--save-dir", os.path.dirname(out), "--tmp-dir", os.path.dirname(out), "--save-name", os.path.basename(out).replace(".mp4", ""), "--write-meta-json", "False", "--log-level", "ERROR", "--thread-count", "96", "--download-retry-count", "8", "--ffmpeg-binary-path", ffmpeg_binary, "--binary-merge" ] if headers and track.source == "HS": arguments.extend(["--header", f'"Cookie:{headers["cookie"].replace(" ", "")}"']) #for k,v in headers.items(): if proxy: arguments.extend(["--custom-proxy", proxy]) if not ("linux" in platform): arguments.extend(["--http-request-timeout", "8"]) if track.__class__.__name__ == "VideoTrack": if track.height: arguments.extend([ "-sv", f"res='{track.height}*':codec='{track.codec}':for=best" ]) else: arguments.extend([ "-sv", "best" ]) arguments.extend([ "-da", "all", "-ds", "all", ]) elif track.__class__.__name__ == "AudioTrack": if track.language: arguments.extend([ "-sa", f"lang='{track.language}':for=best" ]) else: arguments.extend([ "-sa", "best" ]) arguments.extend([ "-dv", "all", "-ds", "all", ]) if track.source != "HS": arguments.extend(["-M", "format=mp4"]) else: raise ValueError(f"{track.__class__.__name__} not supported yet!") try: arg_str = " ".join(arguments) #print(arg_str) p = subprocess.run(arg_str, check=True) except subprocess.CalledProcessError: raise ValueError("N_m3u8DL-RE failed too many times, aborting")