mirror of
https://github.com/devine-dl/devine.git
synced 2025-04-30 18:09:43 +00:00
The new system now downloads and decrypts segments individually instead of downloading all segments, merging them, and then decrypting. Overall the download system now acts more like a normal player. This fixes #23 as the new HLS download system detects changes in keys and init segments as segments are downloaded. DASH still only supports one period, and one period only, but hopefully I can change that in the future. Downloading code is now also moved from the Track classes to the manifest classes. Download progress is now also actually helpful for segmented downloads (all HLS, and most DASH streams). It uses TQDM to show a progress bar based on how many segments it needs to download, and how fast it downloads them. There's only one down side currently. Downloading of segmented videos no longer have the benefit of aria2c's -j parameter. Where it can download n URLs concurrently. Aria2c is still used but only -x and -s is going to make a difference. In the future I will make HLS and DASH download in a multi-threaded way, sort of a manual version of -j.
392 lines
17 KiB
Python
392 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from collections import defaultdict
|
|
from enum import Enum
|
|
from io import BytesIO
|
|
from typing import Any, Iterable, Optional
|
|
|
|
import pycaption
|
|
from construct import Container
|
|
from pycaption import Caption, CaptionList, CaptionNode, WebVTTReader
|
|
from pycaption.geometry import Layout
|
|
from pymp4.parser import MP4
|
|
from subtitle_filter import Subtitles
|
|
|
|
from devine.core.tracks.track import Track
|
|
from devine.core.utilities import get_binary_path
|
|
|
|
|
|
class Subtitle(Track):
|
|
class Codec(str, Enum):
|
|
SubRip = "SRT" # https://wikipedia.org/wiki/SubRip
|
|
SubStationAlpha = "SSA" # https://wikipedia.org/wiki/SubStation_Alpha
|
|
SubStationAlphav4 = "ASS" # https://wikipedia.org/wiki/SubStation_Alpha#Advanced_SubStation_Alpha=
|
|
TimedTextMarkupLang = "TTML" # https://wikipedia.org/wiki/Timed_Text_Markup_Language
|
|
WebVTT = "VTT" # https://wikipedia.org/wiki/WebVTT
|
|
# MPEG-DASH box-encapsulated subtitle formats
|
|
fTTML = "STPP" # https://www.w3.org/TR/2018/REC-ttml-imsc1.0.1-20180424
|
|
fVTT = "WVTT" # https://www.w3.org/TR/webvtt1
|
|
|
|
@property
|
|
def extension(self) -> str:
|
|
return self.value.lower()
|
|
|
|
@staticmethod
|
|
def from_mime(mime: str) -> Subtitle.Codec:
|
|
mime = mime.lower().strip().split(".")[0]
|
|
if mime == "srt":
|
|
return Subtitle.Codec.SubRip
|
|
elif mime == "ssa":
|
|
return Subtitle.Codec.SubStationAlpha
|
|
elif mime == "ass":
|
|
return Subtitle.Codec.SubStationAlphav4
|
|
elif mime == "ttml":
|
|
return Subtitle.Codec.TimedTextMarkupLang
|
|
elif mime == "vtt":
|
|
return Subtitle.Codec.WebVTT
|
|
elif mime == "stpp":
|
|
return Subtitle.Codec.fTTML
|
|
elif mime == "wvtt":
|
|
return Subtitle.Codec.fVTT
|
|
raise ValueError(f"The MIME '{mime}' is not a supported Subtitle Codec")
|
|
|
|
@staticmethod
|
|
def from_codecs(codecs: str) -> Subtitle.Codec:
|
|
for codec in codecs.lower().split(","):
|
|
mime = codec.strip().split(".")[0]
|
|
try:
|
|
return Subtitle.Codec.from_mime(mime)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError(f"No MIME types matched any supported Subtitle Codecs in '{codecs}'")
|
|
|
|
@staticmethod
|
|
def from_netflix_profile(profile: str) -> Subtitle.Codec:
|
|
profile = profile.lower().strip()
|
|
if profile.startswith("webvtt"):
|
|
return Subtitle.Codec.WebVTT
|
|
if profile.startswith("dfxp"):
|
|
return Subtitle.Codec.TimedTextMarkupLang
|
|
raise ValueError(f"The Content Profile '{profile}' is not a supported Subtitle Codec")
|
|
|
|
def __init__(self, *args: Any, codec: Subtitle.Codec, cc: bool = False, sdh: bool = False, forced: bool = False,
|
|
**kwargs: Any):
|
|
"""
|
|
Information on Subtitle Types:
|
|
https://bit.ly/2Oe4fLC (3PlayMedia Blog on SUB vs CC vs SDH).
|
|
However, I wouldn't pay much attention to the claims about SDH needing to
|
|
be in the original source language. It's logically not true.
|
|
|
|
CC == Closed Captions. Source: Basically every site.
|
|
SDH = Subtitles for the Deaf or Hard-of-Hearing. Source: Basically every site.
|
|
HOH = Exact same as SDH. Is a term used in the UK. Source: https://bit.ly/2PGJatz (ICO UK)
|
|
|
|
More in-depth information, examples, and stuff to look for can be found in the Parameter
|
|
explanation list below.
|
|
|
|
Parameters:
|
|
cc: Closed Caption.
|
|
- Intended as if you couldn't hear the audio at all.
|
|
- Can have Sound as well as Dialogue, but doesn't have to.
|
|
- Original source would be from an EIA-CC encoded stream. Typically all
|
|
upper-case characters.
|
|
Indicators of it being CC without knowing original source:
|
|
- Extracted with CCExtractor, or
|
|
- >>> (or similar) being used at the start of some or all lines, or
|
|
- All text is uppercase or at least the majority, or
|
|
- Subtitles are Scrolling-text style (one line appears, oldest line
|
|
then disappears).
|
|
Just because you downloaded it as a SRT or VTT or such, doesn't mean it
|
|
isn't from an EIA-CC stream. And I wouldn't take the streaming services
|
|
(CC) as gospel either as they tend to get it wrong too.
|
|
sdh: Deaf or Hard-of-Hearing. Also known as HOH in the UK (EU?).
|
|
- Intended as if you couldn't hear the audio at all.
|
|
- MUST have Sound as well as Dialogue to be considered SDH.
|
|
- It has no "syntax" or "format" but is not transmitted using archaic
|
|
forms like EIA-CC streams, would be intended for transmission via
|
|
SubRip (SRT), WebVTT (VTT), TTML, etc.
|
|
If you can see important audio/sound transcriptions and not just dialogue
|
|
and it doesn't have the indicators of CC, then it's most likely SDH.
|
|
If it doesn't have important audio/sounds transcriptions it might just be
|
|
regular subtitling (you wouldn't mark as CC or SDH). This would be the
|
|
case for most translation subtitles. Like Anime for example.
|
|
forced: Typically used if there's important information at some point in time
|
|
like watching Dubbed content and an important Sign or Letter is shown
|
|
or someone talking in a different language.
|
|
Forced tracks are recommended by the Matroska Spec to be played if
|
|
the player's current playback audio language matches a subtitle
|
|
marked as "forced".
|
|
However, that doesn't mean every player works like this but there is
|
|
no other way to reliably work with Forced subtitles where multiple
|
|
forced subtitles may be in the output file. Just know what to expect
|
|
with "forced" subtitles.
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
self.codec = codec
|
|
self.cc = bool(cc)
|
|
self.sdh = bool(sdh)
|
|
if self.cc and self.sdh:
|
|
raise ValueError("A text track cannot be both CC and SDH.")
|
|
self.forced = bool(forced)
|
|
if (self.cc or self.sdh) and self.forced:
|
|
raise ValueError("A text track cannot be CC/SDH as well as Forced.")
|
|
|
|
def get_track_name(self) -> Optional[str]:
|
|
"""Return the base Track Name."""
|
|
track_name = super().get_track_name() or ""
|
|
flag = self.cc and "CC" or self.sdh and "SDH" or self.forced and "Forced"
|
|
if flag:
|
|
if track_name:
|
|
flag = f" ({flag})"
|
|
track_name += flag
|
|
return track_name or None
|
|
|
|
@staticmethod
|
|
def parse(data: bytes, codec: Subtitle.Codec) -> pycaption.CaptionSet:
|
|
# TODO: Use an "enum" for subtitle codecs
|
|
if not isinstance(data, bytes):
|
|
raise ValueError(f"Subtitle data must be parsed as bytes data, not {type(data).__name__}")
|
|
try:
|
|
if codec == Subtitle.Codec.fTTML:
|
|
captions: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList)
|
|
for segment in (
|
|
Subtitle.parse(box.data, Subtitle.Codec.TimedTextMarkupLang)
|
|
for box in MP4.parse_stream(BytesIO(data))
|
|
if box.type == b"mdat"
|
|
):
|
|
for lang in segment.get_languages():
|
|
captions[lang].extend(segment.get_captions(lang))
|
|
captions: pycaption.CaptionSet = pycaption.CaptionSet(captions)
|
|
return captions
|
|
if codec == Subtitle.Codec.TimedTextMarkupLang:
|
|
text = data.decode("utf8").replace("tt:", "")
|
|
return pycaption.DFXPReader().read(text)
|
|
if codec == Subtitle.Codec.fVTT:
|
|
caption_lists: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList)
|
|
caption_list, language = Subtitle.merge_segmented_wvtt(data)
|
|
caption_lists[language] = caption_list
|
|
caption_set: pycaption.CaptionSet = pycaption.CaptionSet(caption_lists)
|
|
return caption_set
|
|
if codec == Subtitle.Codec.WebVTT:
|
|
# Segmented VTT when merged may have the WEBVTT headers part of the next caption
|
|
# if they are not separated far enough from the previous caption, hence the \n\n
|
|
text = data.decode("utf8"). \
|
|
replace("WEBVTT", "\n\nWEBVTT"). \
|
|
replace("\r", ""). \
|
|
replace("\n\n\n", "\n \n\n"). \
|
|
replace("\n\n<", "\n<")
|
|
captions: pycaption.CaptionSet = pycaption.WebVTTReader().read(text)
|
|
return captions
|
|
except pycaption.exceptions.CaptionReadSyntaxError:
|
|
raise SyntaxError(f"A syntax error has occurred when reading the \"{codec}\" subtitle")
|
|
except pycaption.exceptions.CaptionReadNoCaptions:
|
|
return pycaption.CaptionSet({"en": []})
|
|
|
|
raise ValueError(f"Unknown Subtitle Format \"{codec}\"...")
|
|
|
|
@staticmethod
|
|
def merge_same_cues(caption_set: pycaption.CaptionSet):
|
|
"""Merge captions with the same timecodes and text as one in-place."""
|
|
for lang in caption_set.get_languages():
|
|
captions = caption_set.get_captions(lang)
|
|
last_caption = None
|
|
concurrent_captions = pycaption.CaptionList()
|
|
merged_captions = pycaption.CaptionList()
|
|
for caption in captions:
|
|
if last_caption:
|
|
if (caption.start, caption.end) == (last_caption.start, last_caption.end):
|
|
if caption.get_text() != last_caption.get_text():
|
|
concurrent_captions.append(caption)
|
|
last_caption = caption
|
|
continue
|
|
else:
|
|
merged_captions.append(pycaption.base.merge(concurrent_captions))
|
|
concurrent_captions = [caption]
|
|
last_caption = caption
|
|
|
|
if concurrent_captions:
|
|
merged_captions.append(pycaption.base.merge(concurrent_captions))
|
|
if merged_captions:
|
|
caption_set.set_captions(lang, merged_captions)
|
|
|
|
@staticmethod
|
|
def merge_segmented_wvtt(data: bytes, period_start: float = 0.) -> tuple[CaptionList, Optional[str]]:
|
|
"""
|
|
Convert Segmented DASH WebVTT cues into a pycaption Caption List.
|
|
Also returns an ISO 639-2 alpha-3 language code if available.
|
|
|
|
Code ported originally by xhlove to Python from shaka-player.
|
|
Has since been improved upon by rlaphoenix using pymp4 and
|
|
pycaption functions.
|
|
"""
|
|
captions = CaptionList()
|
|
|
|
# init:
|
|
saw_wvtt_box = False
|
|
timescale = None
|
|
language = None
|
|
|
|
# media:
|
|
# > tfhd
|
|
default_duration = None
|
|
# > tfdt
|
|
saw_tfdt_box = False
|
|
base_time = 0
|
|
# > trun
|
|
saw_trun_box = False
|
|
samples = []
|
|
|
|
def flatten_boxes(box: Container) -> Iterable[Container]:
|
|
for child in box:
|
|
if hasattr(child, "children"):
|
|
yield from flatten_boxes(child.children)
|
|
del child["children"]
|
|
if hasattr(child, "entries"):
|
|
yield from flatten_boxes(child.entries)
|
|
del child["entries"]
|
|
# some boxes (mainly within 'entries') uses format not type
|
|
child["type"] = child.get("type") or child.get("format")
|
|
yield child
|
|
|
|
for box in flatten_boxes(MP4.parse_stream(BytesIO(data))):
|
|
# init
|
|
if box.type == b"mdhd":
|
|
timescale = box.timescale
|
|
language = box.language
|
|
|
|
if box.type == b"wvtt":
|
|
saw_wvtt_box = True
|
|
|
|
# media
|
|
if box.type == b"styp":
|
|
# essentially the start of each segment
|
|
# media var resets
|
|
# > tfhd
|
|
default_duration = None
|
|
# > tfdt
|
|
saw_tfdt_box = False
|
|
base_time = 0
|
|
# > trun
|
|
saw_trun_box = False
|
|
samples = []
|
|
|
|
if box.type == b"tfhd":
|
|
if box.flags.default_sample_duration_present:
|
|
default_duration = box.default_sample_duration
|
|
|
|
if box.type == b"tfdt":
|
|
saw_tfdt_box = True
|
|
base_time = box.baseMediaDecodeTime
|
|
|
|
if box.type == b"trun":
|
|
saw_trun_box = True
|
|
samples = box.sample_info
|
|
|
|
if box.type == b"mdat":
|
|
if not timescale:
|
|
raise ValueError("Timescale was not found in the Segmented WebVTT.")
|
|
if not saw_wvtt_box:
|
|
raise ValueError("The WVTT box was not found in the Segmented WebVTT.")
|
|
if not saw_tfdt_box:
|
|
raise ValueError("The TFDT box was not found in the Segmented WebVTT.")
|
|
if not saw_trun_box:
|
|
raise ValueError("The TRUN box was not found in the Segmented WebVTT.")
|
|
|
|
vttc_boxes = MP4.parse_stream(BytesIO(box.data))
|
|
current_time = base_time + period_start
|
|
|
|
for sample, vttc_box in zip(samples, vttc_boxes):
|
|
duration = sample.sample_duration or default_duration
|
|
if sample.sample_composition_time_offsets:
|
|
current_time += sample.sample_composition_time_offsets
|
|
|
|
start_time = current_time
|
|
end_time = current_time + (duration or 0)
|
|
current_time = end_time
|
|
|
|
if vttc_box.type == b"vtte":
|
|
# vtte is a vttc that's empty, skip
|
|
continue
|
|
|
|
layout: Optional[Layout] = None
|
|
nodes: list[CaptionNode] = []
|
|
|
|
for cue_box in MP4.parse_stream(BytesIO(vttc_box.data)):
|
|
if cue_box.type == b"vsid":
|
|
# this is a V(?) Source ID box, we don't care
|
|
continue
|
|
cue_data = cue_box.data.decode("utf8")
|
|
if cue_box.type == b"sttg":
|
|
layout = Layout(webvtt_positioning=cue_data)
|
|
elif cue_box.type == b"payl":
|
|
nodes.extend([
|
|
node
|
|
for line in cue_data.split("\n")
|
|
for node in [
|
|
CaptionNode.create_text(WebVTTReader()._decode(line)),
|
|
CaptionNode.create_break()
|
|
]
|
|
])
|
|
nodes.pop()
|
|
|
|
if nodes:
|
|
caption = Caption(
|
|
start=start_time * timescale, # as microseconds
|
|
end=end_time * timescale,
|
|
nodes=nodes,
|
|
layout_info=layout
|
|
)
|
|
p_caption = captions[-1] if captions else None
|
|
if p_caption and caption.start == p_caption.end and str(caption.nodes) == str(p_caption.nodes):
|
|
# it's a duplicate, but lets take its end time
|
|
p_caption.end = caption.end
|
|
continue
|
|
captions.append(caption)
|
|
|
|
return captions, language
|
|
|
|
def strip_hearing_impaired(self) -> None:
|
|
"""
|
|
Strip captions for hearing impaired (SDH).
|
|
It uses SubtitleEdit if available, otherwise filter-subs.
|
|
"""
|
|
if not self.path or not self.path.exists():
|
|
raise ValueError("You must download the subtitle track first.")
|
|
|
|
executable = get_binary_path("SubtitleEdit")
|
|
if executable:
|
|
subprocess.run([
|
|
executable,
|
|
"/Convert", self.path, "srt",
|
|
"/overwrite",
|
|
"/RemoveTextForHI"
|
|
], check=True)
|
|
# Remove UTF-8 Byte Order Marks
|
|
self.path.write_text(
|
|
self.path.read_text(encoding="utf-8-sig"),
|
|
encoding="utf8"
|
|
)
|
|
else:
|
|
sub = Subtitles(self.path)
|
|
sub.filter(
|
|
rm_fonts=True,
|
|
rm_ast=True,
|
|
rm_music=True,
|
|
rm_effects=True,
|
|
rm_names=True,
|
|
rm_author=True
|
|
)
|
|
sub.save()
|
|
|
|
def __str__(self) -> str:
|
|
return " | ".join(filter(bool, [
|
|
"SUB",
|
|
f"[{self.codec.value}]",
|
|
str(self.language),
|
|
self.get_track_name()
|
|
]))
|
|
|
|
|
|
__ALL__ = (Subtitle,)
|