""" MP4 Parser based on: http://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf @author: Alastair McCormack @license: MIT License The MIT License (MIT) Copyright (c) 2015 use-sparingly Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import bitstring from datetime import datetime from collections import namedtuple import logging import six log = logging.getLogger(__name__) #log.addHandler(logging.NullHandler()) log.setLevel(logging.WARN) class MixinDictRepr(object): def __repr__(self, *args, **kwargs): return "{class_name} : {content!r} ".format(class_name=self.__class__.__name__, content=self.__dict__) class MixinMinimalRepr(object): """ A minimal representaion when the payload could be large """ def __repr__(self, *args, **kwargs): return "{class_name} : {content!r} ".format(class_name=self.__class__.__name__, content=self.__dict__.keys()) class FragmentRunTableBox(MixinDictRepr): pass class UnImplementedBox(MixinDictRepr): type = "na" pass class MovieFragmentBox(MixinDictRepr): type = "moof" class MovieBox(MixinDictRepr): type = "moov" class BootStrapInfoBox(MixinDictRepr): type = "abst" @property def current_media_time(self): return self._current_media_time @current_media_time.setter def current_media_time(self, epoch_timestamp): """ Takes a timestamp arg and saves it as datetime """ self._current_media_time = datetime.utcfromtimestamp(epoch_timestamp/float(self.time_scale)) class FragmentRandomAccessBox(MixinDictRepr): """ aka afra """ type = "afra" FragmentRandomAccessBoxEntry = namedtuple("FragmentRandomAccessBoxEntry", ["time", "offset"]) FragmentRandomAccessBoxGlobalEntry = namedtuple("FragmentRandomAccessBoxGlobalEntry", ["time", "segment_number", "fragment_number", "afra_offset", "sample_offset"]) pass class SegmentRunTable(MixinDictRepr): type = "asrt" SegmentRunTableEntry = namedtuple('SegmentRunTableEntry', ["first_segment", "fragments_per_segment"]) pass class FragmentRunTable(MixinDictRepr): type = "afrt" class FragmentRunTableEntry( namedtuple('FragmentRunTableEntry', ["first_fragment", "first_fragment_timestamp", "fragment_duration", "discontinuity_indicator"]) ): DI_END_OF_PRESENTATION = 0 DI_NUMBERING = 1 DI_TIMESTAMP = 2 DI_TIMESTAMP_AND_NUMBER = 3 def __eq__(self, other): if self.first_fragment == other.first_fragment and \ self.first_fragment_timestamp == other.first_fragment_timestamp and \ self.fragment_duration == other.fragment_duration and \ self.discontinuity_indicator == other.discontinuity_indicator: return True def __repr__(self, *args, **kwargs): return str(self.__dict__) class MediaDataBox(MixinMinimalRepr): """ aka mdat """ type = "mdat" class MovieFragmentHeader(MixinDictRepr): type = "mfhd" class ProtectionSystemSpecificHeader(MixinDictRepr): type = "pssh" BoxHeader = namedtuple( "BoxHeader", ["box_size", "box_type", "header_size"] ) class F4VParser(object): @classmethod def parse(cls, filename=None, bytes_input=None, file_input=None, offset_bytes=0, headers_only=False): """ Parse an MP4 file or bytes into boxes :param filename: filename of mp4 file. :type filename: str. :param bytes_input: bytes of mp4 file. :type bytes_input: bytes / Python 2.x str. :param offset_bytes: start parsing at offset. :type offset_bytes: int. :param headers_only: Ignore data and return just headers. Useful when data is cut short :type: headers_only: boolean :return: BMFF Boxes or Headers """ box_lookup = { BootStrapInfoBox.type: cls._parse_abst, FragmentRandomAccessBox.type: cls._parse_afra, MediaDataBox.type: cls._parse_mdat, MovieFragmentBox.type: cls._parse_moof, MovieBox.type: cls._parse_moov, MovieFragmentHeader.type: cls._parse_mfhd, ProtectionSystemSpecificHeader.type: cls._parse_pssh } if filename: bs = bitstring.ConstBitStream(filename=filename, offset=offset_bytes * 8) elif bytes_input: bs = bitstring.ConstBitStream(bytes=bytes_input, offset=offset_bytes * 8) else: bs = bitstring.ConstBitStream(auto=file_input, offset=offset_bytes * 8) log.debug("Starting parse") log.debug("Size is %d bits", bs.len) while bs.pos < bs.len: log.debug("Byte pos before header: %d relative to (%d)", bs.bytepos, offset_bytes) log.debug("Reading header") try: header = cls._read_box_header(bs) except bitstring.ReadError as e: log.error("Premature end of data while reading box header") raise log.debug("Header type: %s", header.box_type) log.debug("Byte pos after header: %d relative to (%d)", bs.bytepos, offset_bytes) if headers_only: yield header # move pointer to next header if possible try: bs.bytepos += header.box_size except ValueError: log.warning("Premature end of data") raise else: # Get parser method for header type parse_function = box_lookup.get(header.box_type, cls._parse_unimplemented) try: yield parse_function(bs, header) except ValueError as e: log.error("Premature end of data") raise @classmethod def _is_mp4(cls, parser): try: for box in parser: return True except ValueError: return False @classmethod def is_mp4_s(cls, bytes_input): """ Is bytes_input the contents of an MP4 file :param bytes_input: str/bytes to check. :type bytes_input: str/bytes. :return: """ parser = cls.parse(bytes_input=bytes_input, headers_only=True) return cls._is_mp4(parser) @classmethod def is_mp4(cls, file_input): """ Checks input if it's an MP4 file :param input: Filename or file object :type input: str, file :param state: Current state to be in. :type state: bool. :returns: bool. :raises: AttributeError, KeyError """ if hasattr(file_input, "read"): parser = cls.parse(file_input=file_input, headers_only=True) else: parser = cls.parse(filename=file_input, headers_only=True) return cls._is_mp4(parser) @staticmethod def _read_string(bs): """ read UTF8 null terminated string """ result = bs.readto('0x00', bytealigned=True).bytes.decode("utf-8")[:-1] return result if result else None @classmethod def _read_count_and_string_table(cls, bs): """ Read a count then return the strings in a list """ result = [] entry_count = bs.read("uint:8") for _ in six.range(0, entry_count): result.append( cls._read_string(bs) ) return result @staticmethod def _read_box_header(bs): header_start_pos = bs.bytepos size, box_type = bs.readlist("uint:32, bytes:4") # box_type should be an ASCII string. Decode as UTF-8 in case try: box_type = box_type.decode('utf-8') except UnicodeDecodeError: # we'll leave as bytes instead pass # if size == 1, then this is an extended size type. # Therefore read the next 64 bits as size if size == 1: size = bs.read("uint:64") header_end_pos = bs.bytepos header_size = header_end_pos - header_start_pos return BoxHeader(box_size=size-header_size, box_type=box_type, header_size=header_size) @staticmethod def _parse_unimplemented(bs, header): ui = UnImplementedBox() ui.header = header bs.bytepos += header.box_size return ui @classmethod def _parse_afra(cls, bs, header): afra = FragmentRandomAccessBox() afra.header = header # read the entire box in case there's padding afra_bs = bs.read(header.box_size * 8) # skip Version and Flags afra_bs.pos += 8 + 24 long_ids, long_offsets, global_entries, afra.time_scale, local_entry_count = \ afra_bs.readlist("bool, bool, bool, pad:5, uint:32, uint:32") if long_ids: id_bs_type = "uint:32" else: id_bs_type = "uint:16" if long_offsets: offset_bs_type = "uint:64" else: offset_bs_type = "uint:32" log.debug("local_access_entries entry count: %s", local_entry_count) afra.local_access_entries = [] for _ in six.range(0, local_entry_count): time = cls._parse_time_field(afra_bs, afra.time_scale) offset = afra_bs.read(offset_bs_type) afra_entry = \ FragmentRandomAccessBox.FragmentRandomAccessBoxEntry(time=time, offset=offset) afra.local_access_entries.append(afra_entry) afra.global_access_entries = [] if global_entries: global_entry_count = afra_bs.read("uint:32") log.debug("global_access_entries entry count: %s", global_entry_count) for _ in six.range(0, global_entry_count): time = cls._parse_time_field(afra_bs, afra.time_scale) segment_number = afra_bs.read(id_bs_type) fragment_number = afra_bs.read(id_bs_type) afra_offset = afra_bs.read(offset_bs_type) sample_offset = afra_bs.read(offset_bs_type) afra_global_entry = \ FragmentRandomAccessBox.FragmentRandomAccessBoxGlobalEntry( time=time, segment_number=segment_number, fragment_number=fragment_number, afra_offset=afra_offset, sample_offset=sample_offset) afra.global_access_entries.append(afra_global_entry) return afra @classmethod def _parse_moof(cls, bootstrap_bs, header): moof = MovieFragmentBox() moof.header = header box_bs = bootstrap_bs.read(moof.header.box_size * 8) for child_box in cls.parse(bytes_input=box_bs.bytes): setattr(moof, child_box.type, child_box) return moof @classmethod def _parse_moov(cls, bootstrap_bs, header): moov = MovieBox() moov.header = header psshs = [] box_bs = bootstrap_bs.read(moov.header.box_size * 8) for child_box in cls.parse(bytes_input=box_bs.bytes): if(child_box.type == "pssh"): psshs.append(child_box) else: setattr(moov, child_box.type, child_box) setattr(moov, "pssh", psshs) return moov @classmethod def _parse_mfhd(cls, bootstrap_bs, header): mfhd = MovieFragmentHeader() mfhd.header = header box_bs = bootstrap_bs.read(mfhd.header.box_size * 8) return mfhd @staticmethod def _parse_pssh(bootstrap_bs, header): pssh = ProtectionSystemSpecificHeader() pssh.header = header box_bs = bootstrap_bs.read(pssh.header.box_size*8) # Payload appears to be 8 bytes in. data = box_bs.hex[8:] pssh.system_id = data[:32] pssh.payload = data[40:] return pssh @classmethod def _parse_abst(cls, bootstrap_bs, header): abst = BootStrapInfoBox() abst.header = header box_bs = bootstrap_bs.read(abst.header.box_size * 8) abst.version, abst.profile_raw, abst.live, abst.update, \ abst.time_scale, abst.current_media_time, abst.smpte_timecode_offset = \ box_bs.readlist("""pad:8, pad:24, uint:32, uint:2, bool, bool, pad:4, uint:32, uint:64, uint:64""") abst.movie_identifier = cls._read_string(box_bs) abst.server_entry_table = cls._read_count_and_string_table(box_bs) abst.quality_entry_table = cls._read_count_and_string_table(box_bs) abst.drm_data = cls._read_string(box_bs) abst.meta_data = cls._read_string(box_bs) abst.segment_run_tables = [] segment_count = box_bs.read("uint:8") log.debug("segment_count: %d" % segment_count) for _ in six.range(0, segment_count): abst.segment_run_tables.append( cls._parse_asrt(box_bs) ) abst.fragment_tables = [] fragment_count = box_bs.read("uint:8") log.debug("fragment_count: %d" % fragment_count) for _ in xrange(0, fragment_count): abst.fragment_tables.append( cls._parse_afrt(box_bs) ) log.debug("Finished parsing abst") return abst @classmethod def _parse_asrt(cls, box_bs): """ Parse asrt / Segment Run Table Box """ asrt = SegmentRunTable() asrt.header = cls._read_box_header(box_bs) # read the entire box in case there's padding asrt_bs_box = box_bs.read(asrt.header.box_size * 8) asrt_bs_box.pos += 8 update_flag = asrt_bs_box.read("uint:24") asrt.update = True if update_flag == 1 else False asrt.quality_segment_url_modifiers = cls._read_count_and_string_table(asrt_bs_box) asrt.segment_run_table_entries = [] segment_count = asrt_bs_box.read("uint:32") for _ in six.range(0, segment_count): first_segment = asrt_bs_box.read("uint:32") fragments_per_segment = asrt_bs_box.read("uint:32") asrt.segment_run_table_entries.append( SegmentRunTable.SegmentRunTableEntry(first_segment=first_segment, fragments_per_segment=fragments_per_segment) ) return asrt @classmethod def _parse_afrt(cls, box_bs): """ Parse afrt / Fragment Run Table Box """ afrt = FragmentRunTable() afrt.header = cls._read_box_header(box_bs) # read the entire box in case there's padding afrt_bs_box = box_bs.read(afrt.header.box_size * 8) afrt_bs_box.pos += 8 update_flag = afrt_bs_box.read("uint:24") afrt.update = True if update_flag == 1 else False afrt.time_scale = afrt_bs_box.read("uint:32") afrt.quality_fragment_url_modifiers = cls._read_count_and_string_table(afrt_bs_box) fragment_count = afrt_bs_box.read("uint:32") afrt.fragments = [] for _ in six.range(0, fragment_count): first_fragment = afrt_bs_box.read("uint:32") first_fragment_timestamp_raw = afrt_bs_box.read("uint:64") try: first_fragment_timestamp = datetime.utcfromtimestamp(first_fragment_timestamp_raw/float(afrt.time_scale)) except ValueError: # Elemental sometimes create odd timestamps first_fragment_timestamp = None fragment_duration = afrt_bs_box.read("uint:32") if fragment_duration == 0: discontinuity_indicator = afrt_bs_box.read("uint:8") else: discontinuity_indicator = None frte = FragmentRunTable.FragmentRunTableEntry(first_fragment=first_fragment, first_fragment_timestamp=first_fragment_timestamp, fragment_duration=fragment_duration, discontinuity_indicator=discontinuity_indicator) afrt.fragments.append(frte) return afrt @staticmethod def _parse_mdat(box_bs, header): """ Parse afrt / Fragment Run Table Box """ mdat = MediaDataBox() mdat.header = header mdat.payload = box_bs.read(mdat.header.box_size * 8).bytes return mdat @staticmethod def _parse_time_field(bs, scale): timestamp = bs.read("uint:64") return datetime.utcfromtimestamp(timestamp / float(scale) )