1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

cdx indexing: wrap record iterator global functions in class DefaultRecordIter to allow for better extensibility

add 'minimal' option to skip digest/mime/status extraction only include minimal data (url+timestamp)
cdx-indexer: add -6 option to create 6-field index
This commit is contained in:
Ilya Kreymer 2015-02-25 13:31:37 -08:00
parent 1d4c54deaa
commit 671f45f69f
2 changed files with 194 additions and 167 deletions

View File

@ -242,188 +242,194 @@ class ArchiveIndexEntry(object):
#=================================================================
def create_record_iter(arcv_iter, options):
append_post = options.get('append_post')
include_all = options.get('include_all')
block_size = options.get('block_size', 16384)
class DefaultRecordIter(object):
def __init__(self, **options):
self.options = options
for record in arcv_iter.iter_records(block_size):
entry = None
def create_record_iter(self, arcv_iter):
append_post = self.options.get('append_post')
include_all = self.options.get('include_all')
block_size = self.options.get('block_size', 16384)
if not include_all and (record.status_headers.get_statuscode() == '-'):
continue
for record in arcv_iter.iter_records(block_size):
entry = None
if record.format == 'warc':
if (record.rec_type in ('request', 'warcinfo') and
not include_all and
not append_post):
if not include_all and (record.status_headers.get_statuscode() == '-'):
continue
elif (not include_all and
record.content_type == 'application/warc-fields'):
if record.format == 'warc':
if (record.rec_type in ('request', 'warcinfo') and
not include_all and
not append_post):
continue
elif (not include_all and
record.content_type == 'application/warc-fields'):
continue
entry = self.parse_warc_record(record)
elif record.format == 'arc':
entry = self.parse_arc_record(record)
if not entry:
continue
entry = parse_warc_record(record)
elif record.format == 'arc':
entry = parse_arc_record(record)
if entry.url and not entry.key:
entry.key = canonicalize(entry.url,
self.options.get('surt_ordered', True))
if not entry:
continue
compute_digest = False
if entry.url and not entry.key:
entry.key = canonicalize(entry.url,
options.get('surt_ordered', True))
if (entry.digest == '-' and
record.rec_type not in ('revisit', 'request', 'warcinfo')):
compute_digest = False
compute_digest = True
if (entry.digest == '-' and
record.rec_type not in ('revisit', 'request', 'warcinfo')):
elif record.rec_type == 'request' and self.options.get('append_post'):
method = record.status_headers.protocol
len_ = record.status_headers.get_header('Content-Length')
compute_digest = True
post_query = extract_post_query(method,
entry.mime,
len_,
record.stream)
elif record.rec_type == 'request' and options.get('append_post'):
method = record.status_headers.protocol
len_ = record.status_headers.get_header('Content-Length')
entry.post_query = post_query
post_query = extract_post_query(method,
entry.mime,
len_,
record.stream)
#entry.set_rec_info(*arcv_iter.read_to_end(record, compute_digest))
arcv_iter.read_to_end(record, compute_digest)
entry.set_rec_info(*arcv_iter.member_info)
entry.record = record
entry.post_query = post_query
#entry.set_rec_info(*arcv_iter.read_to_end(record, compute_digest))
arcv_iter.read_to_end(record, compute_digest)
entry.set_rec_info(*arcv_iter.member_info)
entry.record = record
yield entry
#=================================================================
def join_request_records(entry_iter, options):
prev_entry = None
for entry in entry_iter:
if not prev_entry:
prev_entry = entry
continue
# check for url match
if (entry.url != prev_entry.url):
pass
# check for concurrency also
elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') !=
prev_entry.record.rec_headers.get_header('WARC-Record-ID')):
pass
elif (entry.merge_request_data(prev_entry, options) or
prev_entry.merge_request_data(entry, options)):
yield prev_entry
yield entry
prev_entry = None
continue
yield prev_entry
prev_entry = entry
def join_request_records(self, entry_iter):
prev_entry = None
if prev_entry:
yield prev_entry
for entry in entry_iter:
if not prev_entry:
prev_entry = entry
continue
# check for url match
if (entry.url != prev_entry.url):
pass
# check for concurrency also
elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') !=
prev_entry.record.rec_headers.get_header('WARC-Record-ID')):
pass
elif (entry.merge_request_data(prev_entry, self.options) or
prev_entry.merge_request_data(entry, self.options)):
yield prev_entry
yield entry
prev_entry = None
continue
yield prev_entry
prev_entry = entry
if prev_entry:
yield prev_entry
#=================================================================
def parse_warc_record(record):
""" Parse warc record
"""
#=================================================================
def parse_warc_record(self, record):
""" Parse warc record
"""
entry = ArchiveIndexEntry()
entry = ArchiveIndexEntry()
if record.rec_type == 'warcinfo':
entry.url = record.rec_headers.get_header('WARC-Filename')
entry.key = entry.url
entry.warcinfo = record.stream.read(record.length)
return entry
entry.url = record.rec_headers.get_header('WARC-Target-Uri')
# timestamp
entry.timestamp = iso_date_to_timestamp(record.rec_headers.
get_header('WARC-Date'))
if self.options.get('minimal'):
return entry
# mime
if record.rec_type == 'revisit':
entry.mime = 'warc/revisit'
else:
def_mime = '-' if record.rec_type == 'request' else 'unk'
entry.extract_mime(record.status_headers.
get_header('Content-Type'),
def_mime)
# status -- only for response records (by convention):
if record.rec_type == 'response':
entry.extract_status(record.status_headers)
else:
entry.status = '-'
# digest
entry.digest = record.rec_headers.get_header('WARC-Payload-Digest')
if entry.digest and entry.digest.startswith('sha1:'):
entry.digest = entry.digest[len('sha1:'):]
if not entry.digest:
entry.digest = '-'
if record.rec_type == 'warcinfo':
entry.url = record.rec_headers.get_header('WARC-Filename')
entry.key = entry.url
entry.warcinfo = record.stream.read(record.length)
return entry
entry.url = record.rec_headers.get_header('WARC-Target-Uri')
# timestamp
entry.timestamp = iso_date_to_timestamp(record.rec_headers.
get_header('WARC-Date'))
#=================================================================
def parse_arc_record(self, record):
""" Parse arc record
"""
if record.rec_type == 'arc_header':
return None
# mime
if record.rec_type == 'revisit':
entry.mime = 'warc/revisit'
else:
def_mime = '-' if record.rec_type == 'request' else 'unk'
entry.extract_mime(record.status_headers.
get_header('Content-Type'),
def_mime)
url = record.rec_headers.get_header('uri')
url = url.replace('\r', '%0D')
url = url.replace('\n', '%0A')
# replace formfeed
url = url.replace('\x0c', '%0C')
# replace nulls
url = url.replace('\x00', '%00')
# status -- only for response records (by convention):
if record.rec_type == 'response':
entry = ArchiveIndexEntry()
entry.url = url
# timestamp
entry.timestamp = record.rec_headers.get_header('archive-date')
if len(entry.timestamp) > 14:
entry.timestamp = entry.timestamp[:14]
if self.options.get('minimal'):
return entry
# status
entry.extract_status(record.status_headers)
else:
entry.status = '-'
# digest
entry.digest = record.rec_headers.get_header('WARC-Payload-Digest')
if entry.digest and entry.digest.startswith('sha1:'):
entry.digest = entry.digest[len('sha1:'):]
# mime
entry.extract_mime(record.rec_headers.get_header('content-type'))
if not entry.digest:
# digest
entry.digest = '-'
return entry
return entry
def __call__(self, fh):
aiter = ArchiveIterator(fh)
#=================================================================
def parse_arc_record(record):
""" Parse arc record
"""
if record.rec_type == 'arc_header':
return None
entry_iter = self.create_record_iter(aiter)
url = record.rec_headers.get_header('uri')
url = url.replace('\r', '%0D')
url = url.replace('\n', '%0A')
# replace formfeed
url = url.replace('\x0c', '%0C')
# replace nulls
url = url.replace('\x00', '%00')
if self.options.get('append_post'):
entry_iter = self.join_request_records(entry_iter)
entry = ArchiveIndexEntry()
entry.url = url
for entry in entry_iter:
if (entry.record.rec_type in ('request', 'warcinfo') and
not self.options.get('include_all')):
continue
# timestamp
entry.timestamp = record.rec_headers.get_header('archive-date')
if len(entry.timestamp) > 14:
entry.timestamp = entry.timestamp[:14]
# status
entry.extract_status(record.status_headers)
# mime
entry.extract_mime(record.rec_headers.get_header('content-type'))
# digest
entry.digest = '-'
return entry
#=================================================================
def create_index_iter(fh, **options):
aiter = ArchiveIterator(fh)
entry_iter = create_record_iter(aiter, options)
if options.get('append_post'):
entry_iter = join_request_records(entry_iter, options)
for entry in entry_iter:
if (entry.record.rec_type in ('request', 'warcinfo') and
not options.get('include_all')):
continue
yield entry
yield entry

View File

@ -5,20 +5,21 @@ from bisect import insort
from io import BytesIO
from archiveiterator import create_index_iter
from archiveiterator import DefaultRecordIter
#=================================================================
class CDXWriter(object):
def __init__(self, out, cdx09=False):
def __init__(self, out, format_):
self.out = out
self.cdx09 = cdx09
self.format_ = format_
def __enter__(self):
if not self.cdx09:
self.out.write(' CDX N b a m s k r M S V g\n')
else:
if self.format_ == 'cdx09':
self.out.write(' CDX N b a m s k r V g\n')
elif self.format_ == 'cdx06':
self.out.write(' CDX N b a S V g\n')
else:
self.out.write(' CDX N b a m s k r M S V g\n')
return self
@ -41,17 +42,24 @@ class CDXWriter(object):
out.write(' ')
out.write(entry.url)
out.write(' ')
out.write(entry.mime)
out.write(' ')
out.write(entry.status)
out.write(' ')
out.write(entry.digest)
if self.cdx09:
if self.format_ != 'cdx06':
out.write(entry.mime)
out.write(' ')
out.write(entry.status)
out.write(' ')
out.write(entry.digest)
if self.format_ == 'cdx09':
out.write(' - ')
elif self.format_ == 'cdx06':
out.write(entry.length)
out.write(' ')
else:
out.write(' - - ')
out.write(entry.length)
out.write(' ')
out.write(entry.offset)
out.write(' ')
out.write(filename)
@ -153,11 +161,12 @@ def write_multi_cdx_index(output, inputs, **options):
outfile = open(output, 'wb')
writer_cls = get_cdx_writer_cls(options)
record_iter = DefaultRecordIter(**options)
with writer_cls(outfile, options.get('cdx09')) as writer:
with writer_cls(outfile, options.get('format')) as writer:
for fullpath, filename in iter_file_or_dir(inputs, recurse):
with open(fullpath, 'rb') as infile:
entry_iter = create_index_iter(infile, **options)
entry_iter = record_iter(infile)
for entry in entry_iter:
writer.write(entry, filename)
@ -172,8 +181,8 @@ def write_cdx_index(outfile, infile, filename, **options):
writer_cls = get_cdx_writer_cls(options)
with writer_cls(outfile, options.get('cdx09')) as writer:
entry_iter = create_index_iter(infile, **options)
with writer_cls(outfile, options.get('format')) as writer:
entry_iter = DefaultRecordIter(**options)(infile)
for entry in entry_iter:
writer.write(entry, filename)
@ -260,7 +269,12 @@ if input is a directory"""
action='store_true',
help=unsurt_help)
parser.add_argument('-9', '--cdx09',
group = parser.add_mutually_exclusive_group()
group.add_argument('-9', '--cdx09',
action='store_true',
help=cdx09_help)
group.add_argument('-6', '--cdx06',
action='store_true',
help=cdx09_help)
@ -269,13 +283,20 @@ if input is a directory"""
cmd = parser.parse_args(args=args)
format_ = 'cdx11'
if cmd.cdx09:
format_ = 'cdx09'
elif cmd.cdx06:
format_ = 'cdx06'
write_multi_cdx_index(cmd.output, cmd.inputs,
sort=cmd.sort,
surt_ordered=not cmd.unsurt,
include_all=cmd.allrecords,
append_post=cmd.postappend,
recurse=cmd.recurse,
cdx09=cmd.cdx09)
format=format_,
minimal=cmd.cdx06)
if __name__ == '__main__':