1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

warc indexing: refactor to add create_payload_buffer() which can be overriden in custom iterators to create a file-like object

that will receive write() calls to buffer the payload when indexing. Default implementation does not buffer the payload
This commit is contained in:
Ilya Kreymer 2015-12-29 17:01:25 -08:00
parent 98843a2551
commit d1c0bfac10
2 changed files with 46 additions and 22 deletions

View File

@ -166,7 +166,7 @@ class ArchiveIterator(object):
return line, empty_size
def read_to_end(self, record, compute_digest=False):
def read_to_end(self, record, payload_callback=None):
""" Read remainder of the stream
If a digester is included, update it
with the data read
@ -176,11 +176,6 @@ class ArchiveIterator(object):
if self.member_info:
return None
if compute_digest:
digester = hashlib.sha1()
else:
digester = None
num = 0
curr_offset = self.offset
@ -189,8 +184,8 @@ class ArchiveIterator(object):
if not b:
break
num += len(b)
if digester:
digester.update(b)
if payload_callback:
payload_callback(b)
"""
- For compressed files, blank lines are consumed
@ -213,12 +208,7 @@ class ArchiveIterator(object):
if not self.reader.decompressor:
length -= empty_size
if compute_digest:
digest = base64.b32encode(digester.digest())
else:
digest = None
self.member_info = (curr_offset, length, digest)
self.member_info = (curr_offset, length)
#return self.member_info
#return next_line
@ -243,9 +233,16 @@ class ArchiveIterator(object):
class ArchiveIndexEntryMixin(object):
MIME_RE = re.compile('[; ]')
def __init__(self):
super(ArchiveIndexEntryMixin, self).__init__()
self.reset_entry()
def reset_entry(self):
self['urlkey'] = ''
self['metadata'] = ''
self.buffer = None
self.record = None
def extract_mime(self, mime, def_mime='unk'):
""" Utility function to extract mimetype only
@ -264,10 +261,7 @@ class ArchiveIndexEntryMixin(object):
elif self['status'] == '204' and 'Error' in status_headers.statusline:
self['status'] = '-'
def set_rec_info(self, offset, length, digest):
if digest:
self['digest'] = digest
def set_rec_info(self, offset, length):
self['length'] = str(length)
self['offset'] = str(offset)
@ -300,6 +294,8 @@ class DefaultRecordIter(object):
def __init__(self, **options):
self.options = options
self.entry_cache = {}
self.digester = None
self.buff = None
def _create_index_entry(self, rec_type):
try:
@ -315,13 +311,37 @@ class DefaultRecordIter(object):
return entry
def begin_payload(self, compute_digest, entry):
if compute_digest:
self.digester = hashlib.sha1()
else:
self.digester = None
self.entry = entry
entry.buffer = self.create_payload_buffer(entry)
def handle_payload(self, buff):
if self.digester:
self.digester.update(buff)
if self.entry and self.entry.buffer:
self.entry.buffer.write(buff)
def end_payload(self, entry):
if self.digester:
entry['digest'] = base64.b32encode(self.digester.digest())
self.entry = None
def create_payload_buffer(self, entry):
return None
def create_record_iter(self, arcv_iter):
append_post = self.options.get('append_post')
include_all = self.options.get('include_all')
block_size = self.options.get('block_size', 16384)
surt_ordered = self.options.get('surt_ordered', True)
minimal = self.options.get('minimal')
append_post = self.options.get('append_post')
if append_post and minimal:
raise Exception('Sorry, minimal index option and ' +
@ -371,10 +391,14 @@ class DefaultRecordIter(object):
entry['_post_query'] = post_query
arcv_iter.read_to_end(record, compute_digest)
entry.set_rec_info(*arcv_iter.member_info)
entry.record = record
self.begin_payload(compute_digest, entry)
arcv_iter.read_to_end(record, self.handle_payload)
entry.set_rec_info(*arcv_iter.member_info)
self.end_payload(entry)
yield entry
def join_request_records(self, entry_iter):

View File

@ -122,7 +122,7 @@ class ArcWarcRecordLoader:
length = int(length) - sub_len
if length < 0:
is_err = True
except ValueError:
except (ValueError, TypeError):
is_err = True
# err condition