diff --git a/devine/core/manifests/hls.py b/devine/core/manifests/hls.py index 803a8ed..6afe1ad 100644 --- a/devine/core/manifests/hls.py +++ b/devine/core/manifests/hls.py @@ -3,6 +3,8 @@ from __future__ import annotations import html import logging import re +import shutil +import subprocess import sys import time from concurrent import futures @@ -29,7 +31,7 @@ from devine.core.downloaders import downloader from devine.core.downloaders import requests as requests_downloader from devine.core.drm import DRM_T, ClearKey, Widevine from devine.core.tracks import Audio, Subtitle, Tracks, Video -from devine.core.utilities import is_close_match, try_ensure_utf8 +from devine.core.utilities import get_binary_path, is_close_match, try_ensure_utf8 class HLS: @@ -250,84 +252,115 @@ class HLS: range_offset.put(0) drm_lock = Lock() - with ThreadPoolExecutor(max_workers=16) as pool: - for i, download in enumerate(futures.as_completed(( - pool.submit( - HLS.download_segment, - segment=segment, - out_path=(save_dir / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"), - track=track, - init_data=init_data, - segment_key=segment_key, - range_offset=range_offset, - drm_lock=drm_lock, - progress=progress, - license_widevine=license_widevine, - session=session, - proxy=proxy - ) - for n, segment in enumerate(master.segments) - ))): - try: - download_size = download.result() - except KeyboardInterrupt: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - progress(downloaded="[yellow]CANCELLING") - pool.shutdown(wait=True, cancel_futures=True) - progress(downloaded="[yellow]CANCELLED") - # tell dl that it was cancelled - # the pool is already shut down, so exiting loop is fine - raise - except Exception as e: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - progress(downloaded="[red]FAILING") - pool.shutdown(wait=True, cancel_futures=True) - progress(downloaded="[red]FAILED") - # tell dl that it failed - # the pool is already shut down, so exiting loop is fine - raise e - else: - # it successfully downloaded, and it was not cancelled - progress(advance=1) + discontinuities: list[list[segment]] = [] + discontinuity_index = -1 + for i, segment in enumerate(master.segments): + if i == 0 or segment.discontinuity: + discontinuity_index += 1 + discontinuities.append([]) + discontinuities[discontinuity_index].append(segment) - if download_size == -1: # skipped for --skip-dl - progress(downloaded="[yellow]SKIPPING") - continue + for d_i, discontinuity in enumerate(discontinuities): + # each discontinuity is a separate 'file'/encode and must be processed separately + discontinuity_save_dir = save_dir / str(d_i).zfill(len(str(len(discontinuities)))) + discontinuity_save_path = discontinuity_save_dir.with_suffix(Path(discontinuity[0].uri).suffix) - now = time.time() - time_since = now - last_speed_refresh + with ThreadPoolExecutor(max_workers=16) as pool: + for i, download in enumerate(futures.as_completed(( + pool.submit( + HLS.download_segment, + segment=segment, + out_path=( + discontinuity_save_dir / + str(s_i).zfill(len(str(len(discontinuity)))) + ).with_suffix(Path(segment.uri).suffix), + track=track, + init_data=init_data, + segment_key=segment_key, + range_offset=range_offset, + drm_lock=drm_lock, + progress=progress, + license_widevine=license_widevine, + session=session, + proxy=proxy + ) + for s_i, segment in enumerate(discontinuity) + ))): + try: + download_size = download.result() + except KeyboardInterrupt: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + progress(downloaded="[yellow]CANCELLING") + pool.shutdown(wait=True, cancel_futures=True) + progress(downloaded="[yellow]CANCELLED") + # tell dl that it was cancelled + # the pool is already shut down, so exiting loop is fine + raise + except Exception as e: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + progress(downloaded="[red]FAILING") + pool.shutdown(wait=True, cancel_futures=True) + progress(downloaded="[red]FAILED") + # tell dl that it failed + # the pool is already shut down, so exiting loop is fine + raise e + else: + # it successfully downloaded, and it was not cancelled + progress(advance=1) - if download_size: # no size == skipped dl - download_sizes.append(download_size) + if download_size == -1: # skipped for --skip-dl + progress(downloaded="[yellow]SKIPPING") + continue - if download_sizes and (time_since > download_speed_window or i == len(master.segments)): - data_size = sum(download_sizes) - download_speed = data_size / (time_since or 1) - progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() + now = time.time() + time_since = now - last_speed_refresh + + if download_size: # no size == skipped dl + download_sizes.append(download_size) + + if download_sizes and (time_since > download_speed_window or i == len(master.segments)): + data_size = sum(download_sizes) + download_speed = data_size / (time_since or 1) + progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() + + with open(discontinuity_save_path, "wb") as f: + for segment_file in sorted(discontinuity_save_dir.iterdir()): + segment_data = segment_file.read_bytes() + if isinstance(track, Subtitle): + segment_data = try_ensure_utf8(segment_data) + if track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML): + # decode text direction entities or SubtitleEdit's /ReverseRtlStartEnd won't work + segment_data = segment_data.decode("utf8"). \ + replace("‎", html.unescape("‎")). \ + replace("‏", html.unescape("‏")). \ + encode("utf8") + f.write(segment_data) + segment_file.unlink() + shutil.rmtree(discontinuity_save_dir) if DOWNLOAD_LICENCE_ONLY.is_set(): return - with open(save_path, "wb") as f: - for segment_file in sorted(save_dir.iterdir()): - segment_data = segment_file.read_bytes() - if isinstance(track, Subtitle): - segment_data = try_ensure_utf8(segment_data) - if track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML): - # decode text direction entities or SubtitleEdit's /ReverseRtlStartEnd won't work - segment_data = segment_data.decode("utf8"). \ - replace("‎", html.unescape("‎")). \ - replace("‏", html.unescape("‏")). \ - encode("utf8") - f.write(segment_data) - segment_file.unlink() + if isinstance(track, (Video, Audio)): + progress(downloaded="Merging") + HLS.merge_segments( + segments=sorted(list(save_dir.iterdir())), + save_path=save_path + ) + shutil.rmtree(save_dir) + else: + with open(save_path, "wb") as f: + for discontinuity_file in sorted(save_dir.iterdir()): + discontinuity_data = discontinuity_file.read_bytes() + f.write(discontinuity_data) + discontinuity_file.unlink() + save_dir.rmdir() progress(downloaded="Downloaded") track.path = save_path - save_dir.rmdir() @staticmethod def download_segment( @@ -482,6 +515,37 @@ class HLS: return download_size + @staticmethod + def merge_segments(segments: list[Path], save_path: Path) -> int: + """ + Concatenate Segments by first demuxing with FFmpeg. + + Returns the file size of the merged file. + """ + ffmpeg = get_binary_path("ffmpeg") + if not ffmpeg: + raise EnvironmentError("FFmpeg executable was not found but is required to merge HLS segments.") + + demuxer_file = segments[0].parent / "ffmpeg_concat_demuxer.txt" + demuxer_file.write_text("\n".join([ + f"file '{segment}'" + for segment in segments + ])) + + subprocess.check_call([ + ffmpeg, "-hide_banner", + "-loglevel", "panic", + "-f", "concat", + "-safe", "0", + "-i", demuxer_file, + "-map", "0", + "-c", "copy", + save_path + ]) + demuxer_file.unlink() + + return save_path.stat().st_size + @staticmethod def get_drm( keys: list[Union[m3u8.model.SessionKey, m3u8.model.Key]],