udemy-downloader/mp4parse.py

497 lines
17 KiB
Python

""" MP4 Parser based on:
http://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf
@author: Alastair McCormack
@license: MIT License
"""
import bitstring
from datetime import datetime
from collections import namedtuple
import logging
import six
log = logging.getLogger(__name__)
#log.addHandler(logging.NullHandler())
log.setLevel(logging.WARN)
class MixinDictRepr(object):
def __repr__(self, *args, **kwargs):
return "{class_name} : {content!r} ".format(class_name=self.__class__.__name__,
content=self.__dict__)
class MixinMinimalRepr(object):
""" A minimal representaion when the payload could be large """
def __repr__(self, *args, **kwargs):
return "{class_name} : {content!r} ".format(class_name=self.__class__.__name__,
content=self.__dict__.keys())
class FragmentRunTableBox(MixinDictRepr):
pass
class UnImplementedBox(MixinDictRepr):
type = "na"
pass
class MovieFragmentBox(MixinDictRepr):
type = "moof"
class MovieBox(MixinDictRepr):
type = "moov"
class BootStrapInfoBox(MixinDictRepr):
type = "abst"
@property
def current_media_time(self):
return self._current_media_time
@current_media_time.setter
def current_media_time(self, epoch_timestamp):
""" Takes a timestamp arg and saves it as datetime """
self._current_media_time = datetime.utcfromtimestamp(epoch_timestamp/float(self.time_scale))
class FragmentRandomAccessBox(MixinDictRepr):
""" aka afra """
type = "afra"
FragmentRandomAccessBoxEntry = namedtuple("FragmentRandomAccessBoxEntry", ["time", "offset"])
FragmentRandomAccessBoxGlobalEntry = namedtuple("FragmentRandomAccessBoxGlobalEntry", ["time", "segment_number", "fragment_number", "afra_offset", "sample_offset"])
pass
class SegmentRunTable(MixinDictRepr):
type = "asrt"
SegmentRunTableEntry = namedtuple('SegmentRunTableEntry', ["first_segment", "fragments_per_segment"])
pass
class FragmentRunTable(MixinDictRepr):
type = "afrt"
class FragmentRunTableEntry( namedtuple('FragmentRunTableEntry',
["first_fragment",
"first_fragment_timestamp",
"fragment_duration",
"discontinuity_indicator"]) ):
DI_END_OF_PRESENTATION = 0
DI_NUMBERING = 1
DI_TIMESTAMP = 2
DI_TIMESTAMP_AND_NUMBER = 3
def __eq__(self, other):
if self.first_fragment == other.first_fragment and \
self.first_fragment_timestamp == other.first_fragment_timestamp and \
self.fragment_duration == other.fragment_duration and \
self.discontinuity_indicator == other.discontinuity_indicator:
return True
def __repr__(self, *args, **kwargs):
return str(self.__dict__)
class MediaDataBox(MixinMinimalRepr):
""" aka mdat """
type = "mdat"
class MovieFragmentHeader(MixinDictRepr):
type = "mfhd"
class ProtectionSystemSpecificHeader(MixinDictRepr):
type = "pssh"
BoxHeader = namedtuple( "BoxHeader", ["box_size", "box_type", "header_size"] )
class F4VParser(object):
@classmethod
def parse(cls, filename=None, bytes_input=None, file_input=None, offset_bytes=0, headers_only=False):
"""
Parse an MP4 file or bytes into boxes
:param filename: filename of mp4 file.
:type filename: str.
:param bytes_input: bytes of mp4 file.
:type bytes_input: bytes / Python 2.x str.
:param offset_bytes: start parsing at offset.
:type offset_bytes: int.
:param headers_only: Ignore data and return just headers. Useful when data is cut short
:type: headers_only: boolean
:return: BMFF Boxes or Headers
"""
box_lookup = {
BootStrapInfoBox.type: cls._parse_abst,
FragmentRandomAccessBox.type: cls._parse_afra,
MediaDataBox.type: cls._parse_mdat,
MovieFragmentBox.type: cls._parse_moof,
MovieBox.type: cls._parse_moov,
MovieFragmentHeader.type: cls._parse_mfhd,
ProtectionSystemSpecificHeader.type: cls._parse_pssh
}
if filename:
bs = bitstring.ConstBitStream(filename=filename, offset=offset_bytes * 8)
elif bytes_input:
bs = bitstring.ConstBitStream(bytes=bytes_input, offset=offset_bytes * 8)
else:
bs = bitstring.ConstBitStream(auto=file_input, offset=offset_bytes * 8)
log.debug("Starting parse")
log.debug("Size is %d bits", bs.len)
while bs.pos < bs.len:
log.debug("Byte pos before header: %d relative to (%d)", bs.bytepos, offset_bytes)
log.debug("Reading header")
try:
header = cls._read_box_header(bs)
except bitstring.ReadError as e:
log.error("Premature end of data while reading box header")
raise
log.debug("Header type: %s", header.box_type)
log.debug("Byte pos after header: %d relative to (%d)", bs.bytepos, offset_bytes)
if headers_only:
yield header
# move pointer to next header if possible
try:
bs.bytepos += header.box_size
except ValueError:
log.warning("Premature end of data")
raise
else:
# Get parser method for header type
parse_function = box_lookup.get(header.box_type, cls._parse_unimplemented)
try:
yield parse_function(bs, header)
except ValueError as e:
log.error("Premature end of data")
raise
@classmethod
def _is_mp4(cls, parser):
try:
for box in parser:
return True
except ValueError:
return False
@classmethod
def is_mp4_s(cls, bytes_input):
""" Is bytes_input the contents of an MP4 file
:param bytes_input: str/bytes to check.
:type bytes_input: str/bytes.
:return:
"""
parser = cls.parse(bytes_input=bytes_input, headers_only=True)
return cls._is_mp4(parser)
@classmethod
def is_mp4(cls, file_input):
""" Checks input if it's an MP4 file
:param input: Filename or file object
:type input: str, file
:param state: Current state to be in.
:type state: bool.
:returns: bool.
:raises: AttributeError, KeyError
"""
if hasattr(file_input, "read"):
parser = cls.parse(file_input=file_input, headers_only=True)
else:
parser = cls.parse(filename=file_input, headers_only=True)
return cls._is_mp4(parser)
@staticmethod
def _read_string(bs):
""" read UTF8 null terminated string """
result = bs.readto('0x00', bytealigned=True).bytes.decode("utf-8")[:-1]
return result if result else None
@classmethod
def _read_count_and_string_table(cls, bs):
""" Read a count then return the strings in a list """
result = []
entry_count = bs.read("uint:8")
for _ in six.range(0, entry_count):
result.append( cls._read_string(bs) )
return result
@staticmethod
def _read_box_header(bs):
header_start_pos = bs.bytepos
size, box_type = bs.readlist("uint:32, bytes:4")
# box_type should be an ASCII string. Decode as UTF-8 in case
try:
box_type = box_type.decode('utf-8')
except UnicodeDecodeError:
# we'll leave as bytes instead
pass
# if size == 1, then this is an extended size type.
# Therefore read the next 64 bits as size
if size == 1:
size = bs.read("uint:64")
header_end_pos = bs.bytepos
header_size = header_end_pos - header_start_pos
return BoxHeader(box_size=size-header_size, box_type=box_type, header_size=header_size)
@staticmethod
def _parse_unimplemented(bs, header):
ui = UnImplementedBox()
ui.header = header
bs.bytepos += header.box_size
return ui
@classmethod
def _parse_afra(cls, bs, header):
afra = FragmentRandomAccessBox()
afra.header = header
# read the entire box in case there's padding
afra_bs = bs.read(header.box_size * 8)
# skip Version and Flags
afra_bs.pos += 8 + 24
long_ids, long_offsets, global_entries, afra.time_scale, local_entry_count = \
afra_bs.readlist("bool, bool, bool, pad:5, uint:32, uint:32")
if long_ids:
id_bs_type = "uint:32"
else:
id_bs_type = "uint:16"
if long_offsets:
offset_bs_type = "uint:64"
else:
offset_bs_type = "uint:32"
log.debug("local_access_entries entry count: %s", local_entry_count)
afra.local_access_entries = []
for _ in six.range(0, local_entry_count):
time = cls._parse_time_field(afra_bs, afra.time_scale)
offset = afra_bs.read(offset_bs_type)
afra_entry = \
FragmentRandomAccessBox.FragmentRandomAccessBoxEntry(time=time,
offset=offset)
afra.local_access_entries.append(afra_entry)
afra.global_access_entries = []
if global_entries:
global_entry_count = afra_bs.read("uint:32")
log.debug("global_access_entries entry count: %s", global_entry_count)
for _ in six.range(0, global_entry_count):
time = cls._parse_time_field(afra_bs, afra.time_scale)
segment_number = afra_bs.read(id_bs_type)
fragment_number = afra_bs.read(id_bs_type)
afra_offset = afra_bs.read(offset_bs_type)
sample_offset = afra_bs.read(offset_bs_type)
afra_global_entry = \
FragmentRandomAccessBox.FragmentRandomAccessBoxGlobalEntry(
time=time,
segment_number=segment_number,
fragment_number=fragment_number,
afra_offset=afra_offset,
sample_offset=sample_offset)
afra.global_access_entries.append(afra_global_entry)
return afra
@classmethod
def _parse_moof(cls, bootstrap_bs, header):
moof = MovieFragmentBox()
moof.header = header
box_bs = bootstrap_bs.read(moof.header.box_size * 8)
for child_box in cls.parse(bytes_input=box_bs.bytes):
setattr(moof, child_box.type, child_box)
return moof
@classmethod
def _parse_moov(cls, bootstrap_bs, header):
moov = MovieBox()
moov.header = header
psshs = []
box_bs = bootstrap_bs.read(moov.header.box_size * 8)
for child_box in cls.parse(bytes_input=box_bs.bytes):
if(child_box.type == "pssh"):
psshs.append(child_box)
else:
setattr(moov, child_box.type, child_box)
setattr(moov, "pssh", psshs)
return moov
@classmethod
def _parse_mfhd(cls, bootstrap_bs, header):
mfhd = MovieFragmentHeader()
mfhd.header = header
box_bs = bootstrap_bs.read(mfhd.header.box_size * 8)
return mfhd
@staticmethod
def _parse_pssh(bootstrap_bs, header):
pssh = ProtectionSystemSpecificHeader()
pssh.header = header
box_bs = bootstrap_bs.read(pssh.header.box_size*8)
# Payload appears to be 8 bytes in.
data = box_bs.hex[8:]
pssh.system_id = data[:32]
pssh.payload = data[40:]
return pssh
@classmethod
def _parse_abst(cls, bootstrap_bs, header):
abst = BootStrapInfoBox()
abst.header = header
box_bs = bootstrap_bs.read(abst.header.box_size * 8)
abst.version, abst.profile_raw, abst.live, abst.update, \
abst.time_scale, abst.current_media_time, abst.smpte_timecode_offset = \
box_bs.readlist("""pad:8, pad:24, uint:32, uint:2, bool, bool,
pad:4,
uint:32, uint:64, uint:64""")
abst.movie_identifier = cls._read_string(box_bs)
abst.server_entry_table = cls._read_count_and_string_table(box_bs)
abst.quality_entry_table = cls._read_count_and_string_table(box_bs)
abst.drm_data = cls._read_string(box_bs)
abst.meta_data = cls._read_string(box_bs)
abst.segment_run_tables = []
segment_count = box_bs.read("uint:8")
log.debug("segment_count: %d" % segment_count)
for _ in six.range(0, segment_count):
abst.segment_run_tables.append( cls._parse_asrt(box_bs) )
abst.fragment_tables = []
fragment_count = box_bs.read("uint:8")
log.debug("fragment_count: %d" % fragment_count)
for _ in xrange(0, fragment_count):
abst.fragment_tables.append( cls._parse_afrt(box_bs) )
log.debug("Finished parsing abst")
return abst
@classmethod
def _parse_asrt(cls, box_bs):
""" Parse asrt / Segment Run Table Box """
asrt = SegmentRunTable()
asrt.header = cls._read_box_header(box_bs)
# read the entire box in case there's padding
asrt_bs_box = box_bs.read(asrt.header.box_size * 8)
asrt_bs_box.pos += 8
update_flag = asrt_bs_box.read("uint:24")
asrt.update = True if update_flag == 1 else False
asrt.quality_segment_url_modifiers = cls._read_count_and_string_table(asrt_bs_box)
asrt.segment_run_table_entries = []
segment_count = asrt_bs_box.read("uint:32")
for _ in six.range(0, segment_count):
first_segment = asrt_bs_box.read("uint:32")
fragments_per_segment = asrt_bs_box.read("uint:32")
asrt.segment_run_table_entries.append(
SegmentRunTable.SegmentRunTableEntry(first_segment=first_segment,
fragments_per_segment=fragments_per_segment) )
return asrt
@classmethod
def _parse_afrt(cls, box_bs):
""" Parse afrt / Fragment Run Table Box """
afrt = FragmentRunTable()
afrt.header = cls._read_box_header(box_bs)
# read the entire box in case there's padding
afrt_bs_box = box_bs.read(afrt.header.box_size * 8)
afrt_bs_box.pos += 8
update_flag = afrt_bs_box.read("uint:24")
afrt.update = True if update_flag == 1 else False
afrt.time_scale = afrt_bs_box.read("uint:32")
afrt.quality_fragment_url_modifiers = cls._read_count_and_string_table(afrt_bs_box)
fragment_count = afrt_bs_box.read("uint:32")
afrt.fragments = []
for _ in six.range(0, fragment_count):
first_fragment = afrt_bs_box.read("uint:32")
first_fragment_timestamp_raw = afrt_bs_box.read("uint:64")
try:
first_fragment_timestamp = datetime.utcfromtimestamp(first_fragment_timestamp_raw/float(afrt.time_scale))
except ValueError:
# Elemental sometimes create odd timestamps
first_fragment_timestamp = None
fragment_duration = afrt_bs_box.read("uint:32")
if fragment_duration == 0:
discontinuity_indicator = afrt_bs_box.read("uint:8")
else:
discontinuity_indicator = None
frte = FragmentRunTable.FragmentRunTableEntry(first_fragment=first_fragment,
first_fragment_timestamp=first_fragment_timestamp,
fragment_duration=fragment_duration,
discontinuity_indicator=discontinuity_indicator)
afrt.fragments.append(frte)
return afrt
@staticmethod
def _parse_mdat(box_bs, header):
""" Parse afrt / Fragment Run Table Box """
mdat = MediaDataBox()
mdat.header = header
mdat.payload = box_bs.read(mdat.header.box_size * 8).bytes
return mdat
@staticmethod
def _parse_time_field(bs, scale):
timestamp = bs.read("uint:64")
return datetime.utcfromtimestamp(timestamp / float(scale) )