From c66d251a90c9f3ecddad64c0851283cb799d659c Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 1 Mar 2017 12:48:06 -0800 Subject: [PATCH] warc: make ArchiveIterator an actual iterator warc indexing test: add test for reading warc with interspersed empty gzip records, ensure they are ignored --- pywb/warc/archiveiterator.py | 83 ++++++++++++++++++--------------- pywb/warc/test/test_indexing.py | 24 ++++++++++ 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/pywb/warc/archiveiterator.py b/pywb/warc/archiveiterator.py index 69883304..c4785d3a 100644 --- a/pywb/warc/archiveiterator.py +++ b/pywb/warc/archiveiterator.py @@ -7,6 +7,7 @@ from pywb.warc.recordloader import ArcWarcRecordLoader import hashlib import base64 +import six import re import sys @@ -17,8 +18,11 @@ except ImportError: # pragma: no cover from ordereddict import OrderedDict -#================================================================= -class ArchiveIterator(object): +# ============================================================================ +BUFF_SIZE = 16384 + + +class ArchiveIterator(six.Iterator): """ Iterate over records in WARC and ARC files, both gzip chunk compressed and uncompressed @@ -52,9 +56,9 @@ class ArchiveIterator(object): Remainder: {1} """ - def __init__(self, fileobj, no_record_parse=False, - verify_http=False, arc2warc=False): + verify_http=False, arc2warc=False, block_size=BUFF_SIZE): + self.fh = fileobj self.loader = ArcWarcRecordLoader(verify_http=verify_http, @@ -69,55 +73,59 @@ class ArchiveIterator(object): self.member_info = None self.no_record_parse = no_record_parse - def __call__(self, block_size=16384): - """ iterate over each record - """ - - decomp_type = 'gzip' - self.reader = DecompressingBufferedReader(self.fh, block_size=block_size) self.offset = self.fh.tell() self.next_line = None - raise_invalid_gzip = False - empty_record = False - record = None + self._raise_invalid_gzip = False + self._is_empty = False + self._is_first = True + self.last_record = None + def __iter__(self): + return self + + def __next__(self): while True: + if not self._is_first: + self._finish_record() + + self._is_first = False + try: - curr_offset = self.fh.tell() - record = self._next_record(self.next_line) - if raise_invalid_gzip: + self.last_record = self._next_record(self.next_line) + if self._raise_invalid_gzip: self._raise_invalid_gzip_err() - yield record + return self.last_record except EOFError: - empty_record = True + self._is_empty = True - if record: - self.read_to_end(record) + def _finish_record(self): + if self.last_record: + self.read_to_end(self.last_record) - if self.reader.decompressor: - # if another gzip member, continue - if self.reader.read_next_member(): - continue + if self.reader.decompressor: + # if another gzip member, continue + if self.reader.read_next_member(): + return - # if empty record, then we're done - elif empty_record: - break + # if empty record, then we're done + elif self._is_empty: + raise StopIteration() - # otherwise, probably a gzip - # containing multiple non-chunked records - # raise this as an error - else: - raise_invalid_gzip = True + # otherwise, probably a gzip + # containing multiple non-chunked records + # raise this as an error + else: + self._raise_invalid_gzip = True - # non-gzip, so we're done - elif empty_record: - break + # non-gzip, so we're done + elif self._is_empty: + raise StopIteration() def _raise_invalid_gzip_err(self): """ A gzip file with multiple ARC/WARC records, non-chunked @@ -185,7 +193,7 @@ class ArchiveIterator(object): curr_offset = self.offset while True: - b = record.stream.read(8192) + b = record.stream.read(BUFF_SIZE) if not b: break num += len(b) @@ -349,7 +357,6 @@ class DefaultRecordParser(object): def create_record_iter(self, raw_iter): append_post = self.options.get('append_post') include_all = self.options.get('include_all') - block_size = self.options.get('block_size', 16384) surt_ordered = self.options.get('surt_ordered', True) minimal = self.options.get('minimal') @@ -357,7 +364,7 @@ class DefaultRecordParser(object): raise Exception('Sorry, minimal index option and ' + 'append POST options can not be used together') - for record in raw_iter(block_size): + for record in raw_iter: entry = None if not include_all and not minimal and (record.status_headers.get_statuscode() == '-'): diff --git a/pywb/warc/test/test_indexing.py b/pywb/warc/test/test_indexing.py index 40238bca..afc148ff 100644 --- a/pywb/warc/test/test_indexing.py +++ b/pywb/warc/test/test_indexing.py @@ -370,6 +370,30 @@ def test_cdxj_empty(): assert buff.getvalue() == b'' +def test_cdxj_middle_empty_records(): + empty_gzip_record = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00' + + new_warc = BytesIO() + + with open(TEST_WARC_DIR + 'example2.warc.gz', 'rb') as fh: + new_warc.write(empty_gzip_record) + new_warc.write(fh.read()) + new_warc.write(empty_gzip_record) + new_warc.write(empty_gzip_record) + fh.seek(0) + new_warc.write(fh.read()) + + options = dict(cdxj=True) + + buff = BytesIO() + new_warc.seek(0) + + write_cdx_index(buff, new_warc, 'empty.warc.gz', **options) + + lines = buff.getvalue().rstrip().split(b'\n') + + assert len(lines) == 2, lines + if __name__ == "__main__": import doctest