236 lines
7.5 KiB
Python
Raw Permalink Normal View History

2025-03-18 00:17:27 +05:30
import asyncio
import contextlib
import os
import re
import shutil
import subprocess
import sys
import httpx
import pproxy
import requests
import yaml
from vinetrimmer import config
from vinetrimmer.utils.collections import as_list
def load_yaml(path):
2025-03-18 00:23:51 +05:30
if not os.path.isfile(path):
return {}
with open(path) as fd:
return yaml.safe_load(fd)
2025-03-18 00:17:27 +05:30
_ip_info = None
def get_ip_info(session=None, fresh=False):
2025-03-18 00:23:51 +05:30
"""Use extreme-ip-lookup.com to get IP location information."""
global _ip_info
2025-03-18 00:17:27 +05:30
2025-03-18 00:23:51 +05:30
if fresh or not _ip_info:
# alternatives: http://www.geoplugin.net/json.gp, http://ip-api.com/json/, https://extreme-ip-lookup.com/json
_ip_info = (session or httpx).get("http://ip-api.com/json/").json()
2025-03-18 00:17:27 +05:30
2025-03-18 00:23:51 +05:30
return _ip_info
2025-03-18 00:17:27 +05:30
@contextlib.asynccontextmanager
async def start_pproxy(host, port, username, password):
2025-03-18 00:23:51 +05:30
rerouted_proxy = "http://localhost:8081"
server = pproxy.Server(rerouted_proxy)
remote = pproxy.Connection(f"http+ssl://{host}:{port}#{username}:{password}")
handler = await server.start_server(dict(rserver=[remote]))
try:
yield rerouted_proxy
finally:
handler.close()
await handler.wait_closed()
2025-03-18 00:17:27 +05:30
def download_range(url, count, start=0, proxy=None):
2025-03-18 00:23:51 +05:30
"""Download n bytes without using the Range header due to support issues."""
# TODO: Can this be done with Aria2c?
executable = shutil.which("curl")
if not executable:
raise EnvironmentError("Track needs curl to download a chunk of data but wasn't found...")
arguments = [
executable,
"-s", # use -s instead of --no-progress-meter due to version requirements
"-L", # follow redirects, e.g. http->https
"--proxy-insecure", # disable SSL verification of proxy
"--output", "-", # output to stdout
"--url", url
]
if proxy:
arguments.extend(["--proxy", proxy])
curl = subprocess.Popen(
arguments,
stdout=subprocess.PIPE,
stderr=open(os.devnull, "wb"),
shell=False
)
buffer = b''
location = -1
while len(buffer) < count:
stdout = curl.stdout
data = b''
if stdout:
data = stdout.read(1)
if len(data) > 0:
location += len(data)
if location >= start:
buffer += data
else:
if curl.poll() is not None:
break
curl.kill() # stop downloading
return buffer
2025-03-18 00:17:27 +05:30
async def aria2c(uri, out, headers=None, proxy=None):
2025-03-18 00:23:51 +05:30
"""
Downloads file(s) using Aria2(c).
Parameters:
uri: URL to download. If uri is a list of urls, they will be downloaded and
concatenated into one file.
out: The output file path to save to.
headers: Headers to apply on aria2c.
proxy: Proxy to apply on aria2c.
"""
executable = shutil.which("aria2c") or shutil.which("aria2")
if not executable:
raise EnvironmentError("Aria2c executable not found...")
arguments = [
executable,
"-c", # Continue downloading a partially downloaded file
"--remote-time", # Retrieve timestamp of the remote file from the and apply if available
"-o", os.path.basename(out), # The file name of the downloaded file, relative to -d
"-x", "16", # The maximum number of connections to one server for each download
"-j", "16", # The maximum number of parallel downloads for every static (HTTP/FTP) URL
"-s", "16", # Download a file using N connections.
"--allow-overwrite=true",
"--auto-file-renaming=false",
"--retry-wait", "5", # Set the seconds to wait between retries.
"--max-tries", "15",
"--max-file-not-found", "15",
"--summary-interval", "0",
"--file-allocation", "none" if sys.platform == "win32" else "falloc",
"--console-log-level", "warn",
"--download-result", "hide"
]
for option, value in config.config.aria2c.items():
arguments.append(f"--{option.replace('_', '-')}={value}")
for header, value in (headers or {}).items():
if header.lower() == "accept-encoding":
# we cannot set an allowed encoding, or it will return compressed
# and the code is not set up to uncompress the data
continue
arguments.extend(["--header", f"{header}: {value}"])
segmented = isinstance(uri, list)
segments_dir = f"{out}_segments"
if segmented:
uri = "\n".join([
f"{url}\n"
f"\tdir={segments_dir}\n"
f"\tout={i:08}.mp4"
for i, url in enumerate(uri)
])
if proxy:
arguments.append("--all-proxy")
if proxy.lower().startswith("https://"):
auth, hostname = proxy[8:].split("@")
async with start_pproxy(*hostname.split(":"), *auth.split(":")) as pproxy_:
arguments.extend([pproxy_, "-d"])
if segmented:
arguments.extend([segments_dir, "-i-"])
proc = await asyncio.create_subprocess_exec(*arguments, stdin=subprocess.PIPE)
await proc.communicate(as_list(uri)[0].encode("utf-8"))
else:
arguments.extend([os.path.dirname(out), uri])
proc = await asyncio.create_subprocess_exec(*arguments)
await proc.communicate()
else:
arguments.append(proxy)
try:
if segmented:
subprocess.run(
arguments + ["-d", segments_dir, "-i-"],
input=as_list(uri)[0],
encoding="utf-8",
check=True
)
else:
subprocess.run(
arguments + ["-d", os.path.dirname(out), uri],
check=True
)
except subprocess.CalledProcessError:
raise ValueError("Aria2c failed too many times, aborting")
if segmented:
# merge the segments together
with open(out, "wb") as ofd:
for file in sorted(os.listdir(segments_dir)):
file = os.path.join(segments_dir, file)
with open(file, "rb") as ifd:
data = ifd.read()
# Apple TV+ needs this done to fix audio decryption
data = re.sub(b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", b"\\g<1>\x01", data)
ofd.write(data)
os.unlink(file)
os.rmdir(segments_dir)
print()
2025-03-18 00:17:27 +05:30
async def saldl(uri, out, headers=None, proxy=None):
2025-03-18 00:23:51 +05:30
if headers:
headers.update({k: v for k, v in headers.items() if k.lower() != "accept-encoding"})
executable = shutil.which("saldl") or shutil.which("saldl-win64") or shutil.which("saldl-win32")
if not executable:
raise EnvironmentError("Saldl executable not found...")
arguments = [
executable,
# "--no-status",
"--skip-TLS-verification",
"--resume",
"--merge-in-order",
"-c8",
"--auto-size", "1",
"-D", os.path.dirname(out),
"-o", os.path.basename(out),
]
if headers:
arguments.extend([
"--custom-headers",
"\r\n".join([f"{k}: {v}" for k, v in headers.items()])
])
if proxy:
arguments.extend(["--proxy", proxy])
if isinstance(uri, list):
raise ValueError("Saldl code does not yet support multiple uri (e.g. segmented) downloads.")
arguments.append(uri)
try:
subprocess.run(arguments, check=True)
except subprocess.CalledProcessError:
raise ValueError("Saldl failed too many times, aborting")
print()