From 0e96d18af6cd1013d12701f16955b249f3b5bc0f Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Thu, 15 Feb 2024 11:12:17 +0000 Subject: [PATCH] Rework the Requests and Curl-Impersonate Downloaders - Downloads are now multithreaded directly in the downloader. - Requests and Curl-Impersonate use one singular Session for all downloads, keeping connections alive and cached so it doesn't have to close and reopen connections for every single download. - Progress updates are now yielded back to the caller instead of drilling down a progress callable. --- devine/core/downloaders/curl_impersonate.py | 319 +++++++++++++++----- devine/core/downloaders/requests.py | 315 ++++++++++++++----- 2 files changed, 480 insertions(+), 154 deletions(-) diff --git a/devine/core/downloaders/curl_impersonate.py b/devine/core/downloaders/curl_impersonate.py index cbc6aef..49a0811 100644 --- a/devine/core/downloaders/curl_impersonate.py +++ b/devine/core/downloaders/curl_impersonate.py @@ -1,7 +1,9 @@ +import math import time -from functools import partial +from concurrent import futures +from concurrent.futures.thread import ThreadPoolExecutor from pathlib import Path -from typing import Any, MutableMapping, Optional, Union +from typing import Any, Generator, MutableMapping, Optional, Union from curl_cffi.requests import Session from requests.cookies import RequestsCookieJar @@ -12,38 +14,203 @@ from devine.core.constants import DOWNLOAD_CANCELLED MAX_ATTEMPTS = 5 RETRY_WAIT = 2 +CHUNK_SIZE = 1024 +PROGRESS_WINDOW = 5 BROWSER = config.curl_impersonate.get("browser", "chrome110") -def curl_impersonate( - uri: Union[str, list[str]], - out: Path, - headers: Optional[dict] = None, - cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None, - proxy: Optional[str] = None, - progress: Optional[partial] = None, - *_: Any, - **__: Any -) -> int: +def download( + url: str, + save_path: Path, + session: Optional[Session] = None, + **kwargs: Any +) -> Generator[dict[str, Any], None, None]: """ Download files using Curl Impersonate. https://github.com/lwthiker/curl-impersonate - If multiple URLs are provided they will be downloaded in the provided order - to the output directory. They will not be merged together. - """ - if isinstance(uri, list) and len(uri) == 1: - uri = uri[0] + Yields the following download status updates while chunks are downloading: - if isinstance(uri, list): - if out.is_file(): - raise ValueError("Expecting out to be a Directory path not a File as multiple URLs were provided") - uri = [ - (url, out / f"{i:08}.mp4") - for i, url in enumerate(uri) - ] - else: - uri = [(uri, out.parent / out.name)] + - {total: 123} (there are 123 chunks to download) + - {total: None} (there are an unknown number of chunks to download) + - {advance: 1} (one chunk was downloaded) + - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) + - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) + + The data is in the same format accepted by rich's progress.update() function. The + `downloaded` key is custom and is not natively accepted by all rich progress bars. + + Parameters: + url: Web URL of a file to download. + save_path: The path to save the file to. If the save path's directory does not + exist then it will be made automatically. + session: The Requests or Curl-Impersonate Session to make HTTP requests with. + Useful to set Header, Cookie, and Proxy data. Connections are saved and + re-used with the session so long as the server keeps the connection alive. + kwargs: Any extra keyword arguments to pass to the session.get() call. Use this + for one-time request changes like a header, cookie, or proxy. For example, + to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. + """ + if not session: + session = Session(impersonate=BROWSER) + + save_dir = save_path.parent + control_file = save_path.with_name(f"{save_path.name}.!dev") + + save_dir.mkdir(parents=True, exist_ok=True) + + if control_file.exists(): + # consider the file corrupt if the control file exists + save_path.unlink(missing_ok=True) + control_file.unlink() + elif save_path.exists(): + # if it exists, and no control file, then it should be safe + yield dict( + file_downloaded=save_path, + written=save_path.stat().st_size + ) + + # TODO: Design a control file format so we know how much of the file is missing + control_file.write_bytes(b"") + + attempts = 1 + try: + while True: + written = 0 + download_sizes = [] + last_speed_refresh = time.time() + + try: + stream = session.get(url, stream=True, **kwargs) + stream.raise_for_status() + + try: + content_length = int(stream.headers.get("Content-Length", "0")) + except ValueError: + content_length = 0 + + if content_length > 0: + yield dict(total=math.ceil(content_length / CHUNK_SIZE)) + else: + # we have no data to calculate total chunks + yield dict(total=None) # indeterminate mode + + with open(save_path, "wb") as f: + for chunk in stream.iter_content(chunk_size=CHUNK_SIZE): + download_size = len(chunk) + f.write(chunk) + written += download_size + + yield dict(advance=1) + + now = time.time() + time_since = now - last_speed_refresh + + download_sizes.append(download_size) + if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE: + data_size = sum(download_sizes) + download_speed = math.ceil(data_size / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() + + yield dict( + file_downloaded=save_path, + written=written + ) + break + except Exception as e: + save_path.unlink(missing_ok=True) + if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: + raise e + time.sleep(RETRY_WAIT) + attempts += 1 + finally: + control_file.unlink() + + +def curl_impersonate( + urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]], + output_dir: Path, + filename: str, + headers: Optional[MutableMapping[str, Union[str, bytes]]] = None, + cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None, + proxy: Optional[str] = None, + max_workers: Optional[int] = None +) -> Generator[dict[str, Any], None, None]: + """ + Download files using Curl Impersonate. + https://github.com/lwthiker/curl-impersonate + + Yields the following download status updates while chunks are downloading: + + - {total: 123} (there are 123 chunks to download) + - {total: None} (there are an unknown number of chunks to download) + - {advance: 1} (one chunk was downloaded) + - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) + - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) + + The data is in the same format accepted by rich's progress.update() function. + However, The `downloaded`, `file_downloaded` and `written` keys are custom and not + natively accepted by rich progress bars. + + Parameters: + urls: Web URL(s) to file(s) to download. You can use a dictionary with the key + "url" for the URI, and other keys for extra arguments to use per-URL. + output_dir: The folder to save the file into. If the save path's directory does + not exist then it will be made automatically. + filename: The filename or filename template to use for each file. The variables + you can use are `i` for the URL index and `ext` for the URL extension. + headers: A mapping of HTTP Header Key/Values to use for all downloads. + cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. + proxy: An optional proxy URI to route connections through for all downloads. + max_workers: The maximum amount of threads to use for downloads. Defaults to + min(32,(cpu_count+4)). + """ + if not urls: + raise ValueError("urls must be provided and not empty") + elif not isinstance(urls, (str, dict, list)): + raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}") + + if not output_dir: + raise ValueError("output_dir must be provided") + elif not isinstance(output_dir, Path): + raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}") + + if not filename: + raise ValueError("filename must be provided") + elif not isinstance(filename, str): + raise TypeError(f"Expected filename to be {str}, not {type(filename)}") + + if not isinstance(headers, (MutableMapping, type(None))): + raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}") + + if not isinstance(cookies, (MutableMapping, RequestsCookieJar, type(None))): + raise TypeError(f"Expected cookies to be {MutableMapping} or {RequestsCookieJar}, not {type(cookies)}") + + if not isinstance(proxy, (str, type(None))): + raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}") + + if not isinstance(max_workers, (int, type(None))): + raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") + + if not isinstance(urls, list): + urls = [urls] + + urls = [ + dict( + save_path=save_path, + **url + ) if isinstance(url, dict) else dict( + url=url, + save_path=save_path + ) + for i, url in enumerate(urls) + for save_path in [output_dir / filename.format( + i=i, + ext=Path((url["url"]) if isinstance(url, dict) else url).suffix + )] + ] session = Session(impersonate=BROWSER) if headers: @@ -57,65 +224,65 @@ def curl_impersonate( session.cookies.update(cookies) if proxy: session.proxies.update({ - "http": proxy, - "https": proxy + "http": proxy.replace("https://", "http://"), + "https": proxy.replace("https://", "http://") }) - if progress: - progress(total=len(uri)) + yield dict(total=len(urls)) download_sizes = [] last_speed_refresh = time.time() - for url, out_path in uri: - out_path.parent.mkdir(parents=True, exist_ok=True) + with ThreadPoolExecutor(max_workers=max_workers) as pool: + for i, future in enumerate(futures.as_completed(( + pool.submit( + download, + session=session, + **url + ) + for url in urls + ))): + file_path, download_size = None, None + try: + for status_update in future.result(): + if status_update.get("file_downloaded") and status_update.get("written"): + file_path = status_update["file_downloaded"] + download_size = status_update["written"] + elif len(urls) == 1: + # these are per-chunk updates, only useful if it's one big file + yield status_update + except KeyboardInterrupt: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[yellow]CANCELLING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[yellow]CANCELLED") + # tell dl that it was cancelled + # the pool is already shut down, so exiting loop is fine + raise + except Exception: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[red]FAILING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[red]FAILED") + # tell dl that it failed + # the pool is already shut down, so exiting loop is fine + raise + else: + yield dict(file_downloaded=file_path) + yield dict(advance=1) - control_file = out_path.with_name(f"{out_path.name}.!dev") - if control_file.exists(): - # consider the file corrupt if the control file exists - # TODO: Design a control file format so we know how much of the file is missing - out_path.unlink(missing_ok=True) - control_file.unlink() - elif out_path.exists(): - continue - control_file.write_bytes(b"") + now = time.time() + time_since = now - last_speed_refresh - attempts = 1 - try: - while True: - try: - stream = session.get(url, stream=True) - stream.raise_for_status() - with open(out_path, "wb") as f: - written = 0 - for chunk in stream.iter_content(chunk_size=1024): - download_size = len(chunk) - f.write(chunk) - written += download_size - if progress: - progress(advance=1) + if download_size: # no size == skipped dl + download_sizes.append(download_size) - now = time.time() - time_since = now - last_speed_refresh - - download_sizes.append(download_size) - if time_since > 5 or download_size < 1024: - data_size = sum(download_sizes) - download_speed = data_size / (time_since or 1) - progress(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() - break - except Exception as e: - out_path.unlink(missing_ok=True) - if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: - raise e - time.sleep(RETRY_WAIT) - attempts += 1 - finally: - control_file.unlink() - - return 0 + if download_sizes and (time_since > PROGRESS_WINDOW or i == len(urls)): + data_size = sum(download_sizes) + download_speed = math.ceil(data_size / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() __all__ = ("curl_impersonate",) diff --git a/devine/core/downloaders/requests.py b/devine/core/downloaders/requests.py index 5b40b7a..c91a6e3 100644 --- a/devine/core/downloaders/requests.py +++ b/devine/core/downloaders/requests.py @@ -1,8 +1,9 @@ import math import time -from functools import partial +from concurrent import futures +from concurrent.futures.thread import ThreadPoolExecutor from pathlib import Path -from typing import Any, MutableMapping, Optional, Union +from typing import Any, Generator, MutableMapping, Optional, Union from requests import Session from requests.cookies import RequestsCookieJar @@ -12,37 +13,201 @@ from devine.core.constants import DOWNLOAD_CANCELLED MAX_ATTEMPTS = 5 RETRY_WAIT = 2 +CHUNK_SIZE = 1024 +PROGRESS_WINDOW = 5 + + +def download( + url: str, + save_path: Path, + session: Optional[Session] = None, + **kwargs: Any +) -> Generator[dict[str, Any], None, None]: + """ + Download a file using Python Requests. + https://requests.readthedocs.io + + Yields the following download status updates while chunks are downloading: + + - {total: 123} (there are 123 chunks to download) + - {total: None} (there are an unknown number of chunks to download) + - {advance: 1} (one chunk was downloaded) + - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) + - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) + + The data is in the same format accepted by rich's progress.update() function. The + `downloaded` key is custom and is not natively accepted by all rich progress bars. + + Parameters: + url: Web URL of a file to download. + save_path: The path to save the file to. If the save path's directory does not + exist then it will be made automatically. + session: The Requests Session to make HTTP requests with. Useful to set Header, + Cookie, and Proxy data. Connections are saved and re-used with the session + so long as the server keeps the connection alive. + kwargs: Any extra keyword arguments to pass to the session.get() call. Use this + for one-time request changes like a header, cookie, or proxy. For example, + to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. + """ + session = session or Session() + + save_dir = save_path.parent + control_file = save_path.with_name(f"{save_path.name}.!dev") + + save_dir.mkdir(parents=True, exist_ok=True) + + if control_file.exists(): + # consider the file corrupt if the control file exists + save_path.unlink(missing_ok=True) + control_file.unlink() + elif save_path.exists(): + # if it exists, and no control file, then it should be safe + yield dict( + file_downloaded=save_path, + written=save_path.stat().st_size + ) + + # TODO: Design a control file format so we know how much of the file is missing + control_file.write_bytes(b"") + + attempts = 1 + try: + while True: + written = 0 + download_sizes = [] + last_speed_refresh = time.time() + + try: + stream = session.get(url, stream=True, **kwargs) + stream.raise_for_status() + + try: + content_length = int(stream.headers.get("Content-Length", "0")) + except ValueError: + content_length = 0 + + if content_length > 0: + yield dict(total=math.ceil(content_length / CHUNK_SIZE)) + else: + # we have no data to calculate total chunks + yield dict(total=None) # indeterminate mode + + with open(save_path, "wb") as f: + for chunk in stream.iter_content(chunk_size=CHUNK_SIZE): + download_size = len(chunk) + f.write(chunk) + written += download_size + + yield dict(advance=1) + + now = time.time() + time_since = now - last_speed_refresh + + download_sizes.append(download_size) + if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE: + data_size = sum(download_sizes) + download_speed = math.ceil(data_size / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() + + yield dict( + file_downloaded=save_path, + written=written + ) + break + except Exception as e: + save_path.unlink(missing_ok=True) + if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: + raise e + time.sleep(RETRY_WAIT) + attempts += 1 + finally: + control_file.unlink() def requests( - uri: Union[str, list[str]], - out: Path, - headers: Optional[dict] = None, + urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]], + output_dir: Path, + filename: str, + headers: Optional[MutableMapping[str, Union[str, bytes]]] = None, cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None, proxy: Optional[str] = None, - progress: Optional[partial] = None, - *_: Any, - **__: Any -) -> int: + max_workers: Optional[int] = None +) -> Generator[dict[str, Any], None, None]: """ - Download files using Python Requests. + Download a file using Python Requests. https://requests.readthedocs.io - If multiple URLs are provided they will be downloaded in the provided order - to the output directory. They will not be merged together. - """ - if isinstance(uri, list) and len(uri) == 1: - uri = uri[0] + Yields the following download status updates while chunks are downloading: - if isinstance(uri, list): - if out.is_file(): - raise ValueError("Expecting out to be a Directory path not a File as multiple URLs were provided") - uri = [ - (url, out / f"{i:08}.mp4") - for i, url in enumerate(uri) - ] - else: - uri = [(uri, out.parent / out.name)] + - {total: 123} (there are 123 chunks to download) + - {total: None} (there are an unknown number of chunks to download) + - {advance: 1} (one chunk was downloaded) + - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) + - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) + + The data is in the same format accepted by rich's progress.update() function. + However, The `downloaded`, `file_downloaded` and `written` keys are custom and not + natively accepted by rich progress bars. + + Parameters: + urls: Web URL(s) to file(s) to download. You can use a dictionary with the key + "url" for the URI, and other keys for extra arguments to use per-URL. + output_dir: The folder to save the file into. If the save path's directory does + not exist then it will be made automatically. + filename: The filename or filename template to use for each file. The variables + you can use are `i` for the URL index and `ext` for the URL extension. + headers: A mapping of HTTP Header Key/Values to use for all downloads. + cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. + proxy: An optional proxy URI to route connections through for all downloads. + max_workers: The maximum amount of threads to use for downloads. Defaults to + min(32,(cpu_count+4)). + """ + if not urls: + raise ValueError("urls must be provided and not empty") + elif not isinstance(urls, (str, dict, list)): + raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}") + + if not output_dir: + raise ValueError("output_dir must be provided") + elif not isinstance(output_dir, Path): + raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}") + + if not filename: + raise ValueError("filename must be provided") + elif not isinstance(filename, str): + raise TypeError(f"Expected filename to be {str}, not {type(filename)}") + + if not isinstance(headers, (MutableMapping, type(None))): + raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}") + + if not isinstance(cookies, (MutableMapping, RequestsCookieJar, type(None))): + raise TypeError(f"Expected cookies to be {MutableMapping} or {RequestsCookieJar}, not {type(cookies)}") + + if not isinstance(proxy, (str, type(None))): + raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}") + + if not isinstance(max_workers, (int, type(None))): + raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") + + if not isinstance(urls, list): + urls = [urls] + + urls = [ + dict( + save_path=save_path, + **url + ) if isinstance(url, dict) else dict( + url=url, + save_path=save_path + ) + for i, url in enumerate(urls) + for save_path in [output_dir / filename.format( + i=i, + ext=Path((url["url"]) if isinstance(url, dict) else url).suffix + )] + ] session = Session() if headers: @@ -57,67 +222,61 @@ def requests( if proxy: session.proxies.update({"all": proxy}) - if progress: - progress(total=len(uri)) + yield dict(total=len(urls)) download_sizes = [] last_speed_refresh = time.time() - for url, out_path in uri: - out_path.parent.mkdir(parents=True, exist_ok=True) + with ThreadPoolExecutor(max_workers=max_workers) as pool: + for i, future in enumerate(futures.as_completed(( + pool.submit( + download, + session=session, + **url + ) + for url in urls + ))): + file_path, download_size = None, None + try: + for status_update in future.result(): + if status_update.get("file_downloaded") and status_update.get("written"): + file_path = status_update["file_downloaded"] + download_size = status_update["written"] + elif len(urls) == 1: + # these are per-chunk updates, only useful if it's one big file + yield status_update + except KeyboardInterrupt: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[yellow]CANCELLING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[yellow]CANCELLED") + # tell dl that it was cancelled + # the pool is already shut down, so exiting loop is fine + raise + except Exception: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[red]FAILING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[red]FAILED") + # tell dl that it failed + # the pool is already shut down, so exiting loop is fine + raise + else: + yield dict(file_downloaded=file_path, written=download_size) + yield dict(advance=1) - control_file = out_path.with_name(f"{out_path.name}.!dev") - if control_file.exists(): - # consider the file corrupt if the control file exists - # TODO: Design a control file format so we know how much of the file is missing - out_path.unlink(missing_ok=True) - control_file.unlink() - elif out_path.exists(): - continue - control_file.write_bytes(b"") + now = time.time() + time_since = now - last_speed_refresh - attempts = 1 - try: - while True: - try: - stream = session.get(url, stream=True) - stream.raise_for_status() + if download_size: # no size == skipped dl + download_sizes.append(download_size) - if len(uri) == 1 and progress: - content_length = int(stream.headers.get("Content-Length", "0")) - if content_length > 0: - progress(total=math.ceil(content_length / 1024)) - - with open(out_path, "wb") as f: - written = 0 - for chunk in stream.iter_content(chunk_size=1024): - download_size = len(chunk) - f.write(chunk) - written += download_size - if progress: - progress(advance=1) - - now = time.time() - time_since = now - last_speed_refresh - - download_sizes.append(download_size) - if time_since > 5 or download_size < 1024: - data_size = sum(download_sizes) - download_speed = data_size / (time_since or 1) - progress(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() - break - except Exception as e: - out_path.unlink(missing_ok=True) - if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: - raise e - time.sleep(RETRY_WAIT) - attempts += 1 - finally: - control_file.unlink() - - return 0 + if download_sizes and (time_since > PROGRESS_WINDOW or i == len(urls)): + data_size = sum(download_sizes) + download_speed = math.ceil(data_size / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() __all__ = ("requests",)