From d1c0bfac108c9a5aaf034015f9e096c27b68ca73 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 29 Dec 2015 17:01:25 -0800 Subject: [PATCH] warc indexing: refactor to add create_payload_buffer() which can be overriden in custom iterators to create a file-like object that will receive write() calls to buffer the payload when indexing. Default implementation does not buffer the payload --- pywb/warc/archiveiterator.py | 66 ++++++++++++++++++++++++------------ pywb/warc/recordloader.py | 2 +- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/pywb/warc/archiveiterator.py b/pywb/warc/archiveiterator.py index e419c619..5df3bb03 100644 --- a/pywb/warc/archiveiterator.py +++ b/pywb/warc/archiveiterator.py @@ -166,7 +166,7 @@ class ArchiveIterator(object): return line, empty_size - def read_to_end(self, record, compute_digest=False): + def read_to_end(self, record, payload_callback=None): """ Read remainder of the stream If a digester is included, update it with the data read @@ -176,11 +176,6 @@ class ArchiveIterator(object): if self.member_info: return None - if compute_digest: - digester = hashlib.sha1() - else: - digester = None - num = 0 curr_offset = self.offset @@ -189,8 +184,8 @@ class ArchiveIterator(object): if not b: break num += len(b) - if digester: - digester.update(b) + if payload_callback: + payload_callback(b) """ - For compressed files, blank lines are consumed @@ -213,12 +208,7 @@ class ArchiveIterator(object): if not self.reader.decompressor: length -= empty_size - if compute_digest: - digest = base64.b32encode(digester.digest()) - else: - digest = None - - self.member_info = (curr_offset, length, digest) + self.member_info = (curr_offset, length) #return self.member_info #return next_line @@ -243,9 +233,16 @@ class ArchiveIterator(object): class ArchiveIndexEntryMixin(object): MIME_RE = re.compile('[; ]') + def __init__(self): + super(ArchiveIndexEntryMixin, self).__init__() + self.reset_entry() + def reset_entry(self): self['urlkey'] = '' self['metadata'] = '' + self.buffer = None + self.record = None + def extract_mime(self, mime, def_mime='unk'): """ Utility function to extract mimetype only @@ -264,10 +261,7 @@ class ArchiveIndexEntryMixin(object): elif self['status'] == '204' and 'Error' in status_headers.statusline: self['status'] = '-' - def set_rec_info(self, offset, length, digest): - if digest: - self['digest'] = digest - + def set_rec_info(self, offset, length): self['length'] = str(length) self['offset'] = str(offset) @@ -300,6 +294,8 @@ class DefaultRecordIter(object): def __init__(self, **options): self.options = options self.entry_cache = {} + self.digester = None + self.buff = None def _create_index_entry(self, rec_type): try: @@ -315,13 +311,37 @@ class DefaultRecordIter(object): return entry + def begin_payload(self, compute_digest, entry): + if compute_digest: + self.digester = hashlib.sha1() + else: + self.digester = None + + self.entry = entry + entry.buffer = self.create_payload_buffer(entry) + + def handle_payload(self, buff): + if self.digester: + self.digester.update(buff) + + if self.entry and self.entry.buffer: + self.entry.buffer.write(buff) + + def end_payload(self, entry): + if self.digester: + entry['digest'] = base64.b32encode(self.digester.digest()) + + self.entry = None + + def create_payload_buffer(self, entry): + return None + def create_record_iter(self, arcv_iter): append_post = self.options.get('append_post') include_all = self.options.get('include_all') block_size = self.options.get('block_size', 16384) surt_ordered = self.options.get('surt_ordered', True) minimal = self.options.get('minimal') - append_post = self.options.get('append_post') if append_post and minimal: raise Exception('Sorry, minimal index option and ' + @@ -371,10 +391,14 @@ class DefaultRecordIter(object): entry['_post_query'] = post_query - arcv_iter.read_to_end(record, compute_digest) - entry.set_rec_info(*arcv_iter.member_info) entry.record = record + self.begin_payload(compute_digest, entry) + arcv_iter.read_to_end(record, self.handle_payload) + + entry.set_rec_info(*arcv_iter.member_info) + self.end_payload(entry) + yield entry def join_request_records(self, entry_iter): diff --git a/pywb/warc/recordloader.py b/pywb/warc/recordloader.py index eb32c3c4..9fd21772 100644 --- a/pywb/warc/recordloader.py +++ b/pywb/warc/recordloader.py @@ -122,7 +122,7 @@ class ArcWarcRecordLoader: length = int(length) - sub_len if length < 0: is_err = True - except ValueError: + except (ValueError, TypeError): is_err = True # err condition