diff --git a/pywb/warc/archiveindexer.py b/pywb/warc/archiveindexer.py deleted file mode 100644 index 84e4e022..00000000 --- a/pywb/warc/archiveindexer.py +++ /dev/null @@ -1,568 +0,0 @@ -from pywb.utils.timeutils import iso_date_to_timestamp -from pywb.utils.bufferedreaders import DecompressingBufferedReader -from pywb.utils.canonicalize import canonicalize -from pywb.utils.loaders import extract_post_query, append_post_query - -from recordloader import ArcWarcRecordLoader - -import hashlib -import base64 - -import re -import sys - -from bisect import insort - - -#================================================================= -class ArchiveIndexer(object): - """ Generate a CDX index for WARC and ARC files, both gzip chunk - compressed and uncompressed - - The indexer will automatically detect format, and decompress - if necessary - """ - - # arc/warc record types which are - # indexed by default, without 'include_all' - DEFAULT_REC_TYPES = ('response', 'revisit', 'metadata', 'resource') - - def __init__(self, fileobj, filename, - out=sys.stdout, sort=False, writer=None, surt_ordered=True, - include_all=False, append_post_query=False): - self.fh = fileobj - self.filename = filename - - loader_parse_req = include_all or append_post_query - self.loader = ArcWarcRecordLoader(parse_request=loader_parse_req) - - self.offset = 0 - self.known_format = None - self.surt_ordered = surt_ordered - - self.include_all = include_all - self.append_post_query = append_post_query - - if writer: - self.writer = writer - elif sort: - self.writer = SortedCDXWriter(out) - else: - self.writer = CDXWriter(out) - - # todo: refactor this - self.writer.indexer = self - - if append_post_query: - self.writer = PostResolveWriter(self.writer, self) - - def make_index(self): - """ Output a cdx index! - """ - - decomp_type = 'gzip' - block_size = 16384 - - reader = DecompressingBufferedReader(self.fh, - block_size=block_size, - decomp_type=decomp_type) - self.offset = self.fh.tell() - next_line = None - - self.writer.start() - - try: - while True: - try: - record = self._process_reader(reader, next_line) - except EOFError: - break - - # for non-compressed, consume blank lines here - if not reader.decompressor: - next_line = self._consume_blanklines(reader) - if next_line is None: - # at end of file - break - - # reset reader for next member - else: - reader.read_next_member() - finally: - self.writer.end() - - def _consume_blanklines(self, reader): - """ Consume blank lines that are between records - - For warcs, there are usually 2 - - For arcs, may be 1 or 0 - - For block gzipped files, these are at end of each gzip envelope - and are included in record length which is the full gzip envelope - - For uncompressed, they are between records and so are NOT part of - the record length - """ - while True: - line = reader.readline() - if len(line) == 0: - return None - - if line.rstrip() == '': - self.offset = self.fh.tell() - reader.rem_length() - continue - - return line - - def _read_to_record_end(self, reader, record): - """ Read to end of record and update current offset, - which is used to compute record length - - For compressed files, blank lines are consumed - since they are part of record length - - For uncompressed files, blank lines are read later, - and not included in the record length - """ - - if reader.decompressor: - self._consume_blanklines(reader) - - self.offset = self.fh.tell() - reader.rem_length() - - def _process_reader(self, reader, next_line): - """ Use loader to parse the record from the reader stream - Supporting warc and arc records - """ - record = self.loader.parse_record_stream(reader, - next_line, - self.known_format) - - # Track known format for faster parsing of other records - self.known_format = record.format - - if record.format == 'warc': - result = self._parse_warc_record(record) - elif record.format == 'arc': - result = self._parse_arc_record(record) - - if not result: - self.read_rest(record.stream) - self._read_to_record_end(reader, record) - return record - - post_query = None - - if record.rec_type == 'request': - method = record.status_headers.protocol - mime = result[3] - len_ = record.status_headers.get_header('Content-Length') - - post_query = extract_post_query(method, - mime, - len_, - record.stream) - - # should be 0 if read query string - num = self.read_rest(record.stream) - # generate digest if it doesn't exist and if not a revisit - # if revisit, then nothing we can do here - elif result[-1] == '-' and record.rec_type != 'revisit': - digester = hashlib.sha1() - self.read_rest(record.stream, digester) - result[-1] = base64.b32encode(digester.digest()) - else: - num = self.read_rest(record.stream) - - result.append('- -') - - offset = self.offset - self._read_to_record_end(reader, record) - length = self.offset - offset - - result.append(str(length)) - result.append(str(offset)) - result.append(self.filename) - - self.writer.write(result, record.rec_type, post_query) - - return record - - def _parse_warc_record(self, record): - """ Parse warc record to be included in index, or - return none if skipping this type of record - """ - - if (not self.append_post_query and - not self.include_record(record.rec_type)): - return None - - url = record.rec_headers.get_header('WARC-Target-Uri') - if not url: - return None - - timestamp = record.rec_headers.get_header('WARC-Date') - timestamp = iso_date_to_timestamp(timestamp) - - digest = record.rec_headers.get_header('WARC-Payload-Digest') - - if record.rec_type == 'revisit': - mime = 'warc/revisit' - status = '-' - elif record.rec_type == 'request': - mime = record.status_headers.get_header('Content-Type') - mime = self._extract_mime(mime, '-') - status = '-' - else: - mime = record.status_headers.get_header('Content-Type') - mime = self._extract_mime(mime) - status = self._extract_status(record.status_headers) - - if digest and digest.startswith('sha1:'): - digest = digest[len('sha1:'):] - - if not digest: - digest = '-' - - key = canonicalize(url, self.surt_ordered) - - return [key, - timestamp, - url, - mime, - status, - digest] - - def _parse_arc_record(self, record): - """ Parse arc record and return list of fields - to include in index, or retur none if skipping this - type of record - """ - if record.rec_type == 'arc_header': - return None - - url = record.rec_headers.get_header('uri') - url = url.replace('\r', '%0D') - url = url.replace('\n', '%0A') - # replace formfeed - url = url.replace('\x0c', '%0C') - # replace nulls - url = url.replace('\x00', '%00') - - timestamp = record.rec_headers.get_header('archive-date') - if len(timestamp) > 14: - timestamp = timestamp[:14] - - status = self._extract_status(record.status_headers) - - mime = record.rec_headers.get_header('content-type') - mime = self._extract_mime(mime) - - key = canonicalize(url, self.surt_ordered) - - return [key, - timestamp, - url, - mime, - status, - '-'] - - MIME_RE = re.compile('[; ]') - - def _extract_mime(self, mime, def_mime='unk'): - """ Utility function to extract mimetype only - from a full content type, removing charset settings - """ - if mime: - mime = self.MIME_RE.split(mime, 1)[0] - if not mime: - mime = def_mime - return mime - - def _extract_status(self, status_headers): - status = status_headers.statusline.split(' ')[0] - if not status: - status = '-' - return status - - def read_rest(self, reader, digester=None): - """ Read remainder of the stream - If a digester is included, update it - with the data read - """ - num = 0 - while True: - b = reader.read(8192) - if not b: - break - num += len(b) - if digester: - digester.update(b) - return num - - def include_record(self, type_): - return self.include_all or (type_ in self.DEFAULT_REC_TYPES) - - def add_post_query(self, fields, post_query): - url = append_post_query(fields[2], post_query) - fields[0] = canonicalize(url, self.surt_ordered) - return fields - - -#================================================================= -class CDXWriter(object): - def __init__(self, out): - self.out = out - self.indexer = None - - def start(self): - self.out.write(' CDX N b a m s k r M S V g\n') - - def write(self, line, rec_type, *args): - if not self.indexer or self.indexer.include_record(rec_type): - self.out.write(' '.join(line) + '\n') - - def end(self): - pass - - -#================================================================= -class SortedCDXWriter(CDXWriter): - def __init__(self, out): - super(SortedCDXWriter, self).__init__(out) - self.sortlist = [] - - def write(self, line, rec_type, *args): - if not self.indexer or self.indexer.include_record(rec_type): - line = ' '.join(line) + '\n' - insort(self.sortlist, line) - - def end(self): - self.out.write(''.join(self.sortlist)) - - -#================================================================= -class PostResolveWriter(CDXWriter): - def __init__(self, writer, indexer): - self.writer = writer - self.indexer = indexer - self.prev_line = None - self.prev_post_query = None - self.prev_type = None - - def start(self): - self.writer.start() - - def write(self, line, rec_type, post_query): - if not self.prev_line: - self.prev_line = line - self.prev_post_query = post_query - self.prev_type = rec_type - return - - #cdx original field - if self.prev_line[2] != line[2]: - self.writer.write(self.prev_line, self.prev_type) - self.prev_line = line - self.prev_post_query = post_query - return - - if self.prev_post_query or post_query: - if self.prev_post_query: - self.indexer.add_post_query(line, self.prev_post_query) - else: - self.indexer.add_post_query(line, post_query) - - # update prev url key too - self.prev_line[0] = line[0] - - # write both lines - self.writer.write(self.prev_line, self.prev_type) - self.writer.write(line, rec_type) - - # flush any cached lines - self.prev_line = None - self.prev_post_query = None - self.prev_type = None - - def end(self): - if self.prev_line: - self.writer.write(self.prev_line, self.prev_type) - - self.writer.end() - - -#================================================================= -class MultiFileMixin(object): - def start_all(self): - super(MultiFileMixin, self).start() - - def end_all(self): - super(MultiFileMixin, self).end() - - def start(self): - pass - - def end(self): - pass - - -class MultiFileCDXWriter(MultiFileMixin, CDXWriter): - pass - - -class MultiFileSortedCDXWriter(MultiFileMixin, SortedCDXWriter): - pass - - -#================================================================= -import os -from argparse import ArgumentParser, RawTextHelpFormatter - - -def iter_file_or_dir(inputs): - for input_ in inputs: - if not os.path.isdir(input_): - yield input_, os.path.basename(input_) - else: - for filename in os.listdir(input_): - yield os.path.join(input_, filename), filename - - -def index_to_file(inputs, output, sort, - surt_ordered, include_all, append_post_query): - if output == '-': - outfile = sys.stdout - else: - outfile = open(output, 'w') - - if sort: - writer = MultiFileSortedCDXWriter(outfile) - else: - writer = MultiFileCDXWriter(outfile) - - try: - infile = None - writer.start_all() - - for fullpath, filename in iter_file_or_dir(inputs): - with open(fullpath, 'r') as infile: - ArchiveIndexer(fileobj=infile, - filename=filename, - writer=writer, - surt_ordered=surt_ordered, - append_post_query=append_post_query, - include_all=include_all).make_index() - finally: - writer.end_all() - if infile: - infile.close() - - -def remove_ext(filename): - for ext in ('.arc', '.arc.gz', '.warc', '.warc.gz'): - if filename.endswith(ext): - filename = filename[:-len(ext)] - break - - return filename - - -def cdx_filename(filename): - return remove_ext(filename) + '.cdx' - - -def index_to_dir(inputs, output, sort, - surt_ordered, include_all, append_post_query): - for fullpath, filename in iter_file_or_dir(inputs): - - outpath = cdx_filename(filename) - outpath = os.path.join(output, outpath) - - with open(outpath, 'w') as outfile: - with open(fullpath, 'r') as infile: - ArchiveIndexer(fileobj=infile, - filename=filename, - sort=sort, - out=outfile, - surt_ordered=surt_ordered, - append_post_query=append_post_query, - include_all=include_all).make_index() - - -def main(args=None): - description = """ -Generate .cdx index files for WARCs and ARCs -Compressed (.warc.gz / .arc.gz) or uncompressed (.warc / .arc) formats -are supported. -""" - - epilog = """ -Some examples: - -* Create "example.cdx" index from example.warc.gz -{0} ./cdx/example.cdx ./warcs/example.warc.gz - -* Create "combined.cdx", a combined, sorted index of all warcs in ./warcs/ -{0} --sort combined.cdx ./warcs/ - -* Create a sorted cdx per file in ./cdx/ for each archive file in ./warcs/ -{0} --sort ./cdx/ ./warcs/ -""".format(os.path.basename(sys.argv[0])) - - sort_help = """ -sort the output to each file before writing to create a total ordering -""" - - unsurt_help = """ -Convert SURT (Sort-friendly URI Reordering Transform) back to regular -urls for the cdx key. Default is to use SURT keys. -Not-recommended for new cdx, use only for backwards-compatibility. -""" - - output_help = """output file or directory. -- If directory, each input file is written to a seperate output file - with a .cdx extension -- If output is '-', output is written to stdout -""" - - input_help = """input file or directory -- If directory, all archive files from that directory are read -""" - - allrecords_help = """include all records. -currently includes the 'request' records in addition to all -response records""" - - post_append_help = """for POST requests, append -form query to url key. (Only applies to form url encoded posts)""" - - parser = ArgumentParser(description=description, - epilog=epilog, - formatter_class=RawTextHelpFormatter) - - parser.add_argument('-s', '--sort', - action='store_true', - help=sort_help) - - parser.add_argument('-a', '--allrecords', - action='store_true', - help=allrecords_help) - - parser.add_argument('-p', '--postappend', - action='store_true', - help=post_append_help) - - parser.add_argument('-u', '--unsurt', - action='store_true', - help=unsurt_help) - - parser.add_argument('output', nargs='?', default='-', help=output_help) - parser.add_argument('inputs', nargs='+', help=input_help) - - cmd = parser.parse_args(args=args) - if cmd.output != '-' and os.path.isdir(cmd.output): - index_to_dir(cmd.inputs, cmd.output, cmd.sort, - not cmd.unsurt, cmd.allrecords, cmd.postappend) - else: - index_to_file(cmd.inputs, cmd.output, cmd.sort, - not cmd.unsurt, cmd.allrecords, cmd.postappend) - - -if __name__ == '__main__': - main() diff --git a/pywb/warc/archiveiterator.py b/pywb/warc/archiveiterator.py new file mode 100644 index 00000000..65437013 --- /dev/null +++ b/pywb/warc/archiveiterator.py @@ -0,0 +1,381 @@ +from pywb.utils.timeutils import iso_date_to_timestamp +from pywb.utils.bufferedreaders import DecompressingBufferedReader +from pywb.utils.canonicalize import canonicalize +from pywb.utils.loaders import extract_post_query, append_post_query + +from recordloader import ArcWarcRecordLoader + +import hashlib +import base64 + +import re + + +#================================================================= +class ArchiveIterator(object): + """ Iterate over records in WARC and ARC files, both gzip chunk + compressed and uncompressed + + The indexer will automatically detect format, and decompress + if necessary. + + """ + + def __init__(self, fileobj): + self.fh = fileobj + + self.loader = ArcWarcRecordLoader() + self.reader = None + + self.offset = 0 + self.known_format = None + + self.member_info = None + + def iter_records(self): + """ iterate over each record + """ + + decomp_type = 'gzip' + block_size = 16384 + + self.reader = DecompressingBufferedReader(self.fh, + block_size=block_size) + self.offset = self.fh.tell() + + next_line = None + + while True: + try: + record = self._next_record(next_line) + yield record + except EOFError: + break + + self.read_to_end(record) + + # for non-compressed, consume blank lines here + if not self.reader.decompressor: + next_line = self._consume_blanklines() + if next_line is None: + # at end of file + break + + # reset reader for next member + else: + self.reader.read_next_member() + + def _consume_blanklines(self): + """ Consume blank lines that are between records + - For warcs, there are usually 2 + - For arcs, may be 1 or 0 + - For block gzipped files, these are at end of each gzip envelope + and are included in record length which is the full gzip envelope + - For uncompressed, they are between records and so are NOT part of + the record length + """ + while True: + line = self.reader.readline() + if len(line) == 0: + return None + + if line.rstrip() == '': + self.offset = self.fh.tell() - self.reader.rem_length() + continue + + return line + + def read_to_end(self, record, compute_digest=False): + """ Read remainder of the stream + If a digester is included, update it + with the data read + """ + if self.member_info: + return self.member_info + + if compute_digest: + digester = hashlib.sha1() + else: + digester = None + + num = 0 + curr_offset = self.offset + + while True: + b = record.stream.read(8192) + if not b: + break + num += len(b) + if digester: + digester.update(b) + + """ + - For compressed files, blank lines are consumed + since they are part of record length + - For uncompressed files, blank lines are read later, + and not included in the record length + """ + if self.reader.decompressor: + self._consume_blanklines() + + self.offset = self.fh.tell() - self.reader.rem_length() + length = self.offset - curr_offset + + if compute_digest: + digest = base64.b32encode(digester.digest()) + else: + digest = None + + self.member_info = (curr_offset, length, digest) + return self.member_info + + def _next_record(self, next_line): + """ Use loader to parse the record from the reader stream + Supporting warc and arc records + """ + record = self.loader.parse_record_stream(self.reader, + next_line, + self.known_format) + + self.member_info = None + + # Track known format for faster parsing of other records + self.known_format = record.format + + return record + + +#================================================================= +class ArchiveIndexEntry(object): + MIME_RE = re.compile('[; ]') + + def extract_mime(self, mime, def_mime='unk'): + """ Utility function to extract mimetype only + from a full content type, removing charset settings + """ + self.mime = def_mime + if mime: + self.mime = self.MIME_RE.split(mime, 1)[0] + + def extract_status(self, status_headers): + """ Extract status code only from status line + """ + self.status = status_headers.get_statuscode() + if not self.status: + self.status = '-' + + def set_rec_info(self, offset, length, digest): + self.offset = str(offset) + self.length = str(length) + if digest: + self.digest = digest + + def add_post_query(self, other, options): + surt_ordered = options.get('surt_ordered') + + if other.record.rec_type != 'request': + return False + + if not hasattr(other, 'post_query'): + return False + + # two requests, not correct + if self.record.rec_type == 'request': + return False + + url = append_post_query(self.url, other.post_query) + self.key = canonicalize(url, surt_ordered) + other.key = self.key + return True + + +#================================================================= +def create_record_iter(arcv_iter, options): + for record in arcv_iter.iter_records(): + entry = None + + if record.format == 'warc': + if (record.rec_type == 'request' and + not options.get('append_post') and + not options.get('include_all')): + continue + + entry = parse_warc_record(record) + elif record.format == 'arc': + entry = parse_arc_record(record) + + if not entry: + continue + + entry.key = canonicalize(entry.url, options.get('surt_ordered', True)) + + compute_digest = False + + if (entry.digest == '-' and + record.rec_type not in ('revisit', 'request')): + + compute_digest = True + + elif record.rec_type == 'request' and options.get('append_post'): + method = record.status_headers.protocol + len_ = record.status_headers.get_header('Content-Length') + + post_query = extract_post_query(method, + entry.mime, + len_, + record.stream) + + entry.post_query = post_query + + entry.set_rec_info(*arcv_iter.read_to_end(record, compute_digest)) + entry.record = record + + yield entry + +#================================================================= +def join_request_records(entry_iter, options): + prev_entry = None + + for entry in entry_iter: + if not prev_entry: + prev_entry = entry + continue + + # check for url match + if (entry.url != prev_entry.url): + pass + # check for concurrency also + #elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') != + # prev_entry.record.rec_headers.get_header('WARC-Record-ID')): + # pass + + elif (entry.add_post_query(prev_entry, options) or + prev_entry.add_post_query(entry, options)): + yield prev_entry + yield entry + prev_entry = None + continue + + yield prev_entry + prev_entry = entry + + + if prev_entry: + yield prev_entry + + + + +#================================================================= +def parse_warc_record(record): + """ Parse warc record + """ + + url = record.rec_headers.get_header('WARC-Target-Uri') + if not url: + return None + + entry = ArchiveIndexEntry() + entry.url = url + + # timestamp + entry.timestamp = iso_date_to_timestamp(record.rec_headers. + get_header('WARC-Date')) + + # mime + if record.rec_type == 'revisit': + entry.mime = 'warc/revisit' + else: + def_mime = '-' if record.rec_type == 'request' else 'unk' + entry.extract_mime(record.status_headers. + get_header('Content-Type'), + def_mime) + + # status + if record.rec_type in ('request', 'revisit'): + entry.status = '-' + else: + entry.extract_status(record.status_headers) + + # digest + entry.digest = record.rec_headers.get_header('WARC-Payload-Digest') + if entry.digest and entry.digest.startswith('sha1:'): + entry.digest = entry.digest[len('sha1:'):] + + if not entry.digest: + entry.digest = '-' + + return entry + + +#================================================================= +def parse_arc_record(record): + """ Parse arc record + """ + if record.rec_type == 'arc_header': + return None + + url = record.rec_headers.get_header('uri') + url = url.replace('\r', '%0D') + url = url.replace('\n', '%0A') + # replace formfeed + url = url.replace('\x0c', '%0C') + # replace nulls + url = url.replace('\x00', '%00') + + entry = ArchiveIndexEntry() + entry.url = url + + # timestamp + entry.timestamp = record.rec_headers.get_header('archive-date') + if len(entry.timestamp) > 14: + entry.timestamp = entry.timestamp[:14] + + # status + entry.extract_status(record.status_headers) + + # mime + entry.extract_mime(record.rec_headers.get_header('content-type')) + + # digest + entry.digest = '-' + + return entry + + +#================================================================= +def create_index_iter(fh, **options): + aiter = ArchiveIterator(fh) + + entry_iter = create_record_iter(aiter, options) + + if options.get('append_post'): + entry_iter = join_request_records(entry_iter, options) + + for entry in entry_iter: + if (entry.record.rec_type == 'request' and + not options.get('include_all')): + continue + + yield entry + + +#================================================================= +if __name__ == "__main__": + import sys + filename = sys.argv[1] + + with open(filename) as fh: + ait = ArchiveIterator(fh) + options = dict(surt_ordered=True, append_post=True) + + out = sys.stdout + + entry_iter = create_record_iter(ait, options) + entry_iter = join_request_records(entry_iter, options) + + cdx_write(out, entry_iter, options, filename) + + #for record in ait.iter_records(): + # result = ait.read_to_end(record.stream) + # print record.rec_type, result diff --git a/pywb/warc/cdxindexer.py b/pywb/warc/cdxindexer.py new file mode 100644 index 00000000..caae83c7 --- /dev/null +++ b/pywb/warc/cdxindexer.py @@ -0,0 +1,223 @@ +import os +import sys +from argparse import ArgumentParser, RawTextHelpFormatter +from bisect import insort + +from io import BytesIO + +from archiveiterator import create_index_iter + + +#================================================================= +class CDXWriter(object): + def __init__(self, out): + self.out = out + + def __enter__(self): + self.out.write(' CDX N b a m s k r M S V g\n') + return self + + def write(self, entry, filename): + self.write_cdx_line(self.out, entry, filename) + + def __exit__(self, *args): + return False + + @staticmethod + def write_cdx_line(out, entry, filename): + out.write(entry.key) + out.write(' ') + out.write(entry.timestamp) + out.write(' ') + out.write(entry.url) + out.write(' ') + out.write(entry.mime) + out.write(' ') + out.write(entry.status) + out.write(' ') + out.write(entry.digest) + out.write(' - - ') + out.write(entry.length) + out.write(' ') + out.write(entry.offset) + out.write(' ') + out.write(filename) + out.write('\n') + + +#================================================================= +class SortedCDXWriter(CDXWriter): + def __init__(self, out): + super(SortedCDXWriter, self).__init__(out) + self.sortlist = [] + + def write(self, entry, filename): + outbuff = BytesIO() + self.write_cdx_line(outbuff, entry, filename) + + insort(self.sortlist, outbuff.getvalue()) + + def __exit__(self, *args): + self.out.write(''.join(self.sortlist)) + return False + + +#================================================================= +def iter_file_or_dir(inputs): + for input_ in inputs: + if not os.path.isdir(input_): + yield input_, os.path.basename(input_) + else: + for filename in os.listdir(input_): + yield os.path.join(input_, filename), filename + + +#================================================================= +def index_to_file(inputs, output, sort, + surt_ordered, include_all, append_post_query): + if output == '-': + outfile = sys.stdout + else: + outfile = open(output, 'w') + + if sort: + writer_cls = SortedCDXWriter + else: + writer_cls = CDXWriter + + with writer_cls(outfile) as writer: + for fullpath, filename in iter_file_or_dir(inputs): + with open(fullpath, 'r') as infile: + write_index(writer, filename, infile, + surt_ordered, append_post_query, include_all) + +#================================================================= +def index_to_dir(inputs, output, sort, + surt_ordered, include_all, append_post_query): + + if sort: + writer_cls = SortedCDXWriter + else: + writer_cls = CDXWriter + + for fullpath, filename in iter_file_or_dir(inputs): + + outpath = cdx_filename(filename) + outpath = os.path.join(output, outpath) + + with open(outpath, 'w') as outfile: + with writer_cls(outfile) as writer: + with open(fullpath, 'r') as infile: + write_index(writer, filename, infile, + surt_ordered, append_post_query, include_all) + +#================================================================= +def remove_ext(filename): + for ext in ('.arc', '.arc.gz', '.warc', '.warc.gz'): + if filename.endswith(ext): + filename = filename[:-len(ext)] + break + + return filename + + +#================================================================= +def cdx_filename(filename): + return remove_ext(filename) + '.cdx' + + +#================================================================= +def write_index(writer, filename, infile, + surt_ordered, append_post, include_all): + + entry_iter = create_index_iter(infile, + surt_ordered=surt_ordered, + append_post=append_post, + include_all=include_all) + + for entry in entry_iter: + writer.write(entry, filename) + + +#================================================================= +def main(args=None): + description = """ +Generate .cdx index files for WARCs and ARCs +Compressed (.warc.gz / .arc.gz) or uncompressed (.warc / .arc) formats +are supported. +""" + + epilog = """ +Some examples: + +* Create "example.cdx" index from example.warc.gz +{0} ./cdx/example.cdx ./warcs/example.warc.gz + +* Create "combined.cdx", a combined, sorted index of all warcs in ./warcs/ +{0} --sort combined.cdx ./warcs/ + +* Create a sorted cdx per file in ./cdx/ for each archive file in ./warcs/ +{0} --sort ./cdx/ ./warcs/ +""".format(os.path.basename(sys.argv[0])) + + sort_help = """ +sort the output to each file before writing to create a total ordering +""" + + unsurt_help = """ +Convert SURT (Sort-friendly URI Reordering Transform) back to regular +urls for the cdx key. Default is to use SURT keys. +Not-recommended for new cdx, use only for backwards-compatibility. +""" + + output_help = """output file or directory. +- If directory, each input file is written to a seperate output file + with a .cdx extension +- If output is '-', output is written to stdout +""" + + input_help = """input file or directory +- If directory, all archive files from that directory are read +""" + + allrecords_help = """include all records. +currently includes the 'request' records in addition to all +response records""" + + post_append_help = """for POST requests, append +form query to url key. (Only applies to form url encoded posts)""" + + parser = ArgumentParser(description=description, + epilog=epilog, + formatter_class=RawTextHelpFormatter) + + parser.add_argument('-s', '--sort', + action='store_true', + help=sort_help) + + parser.add_argument('-a', '--allrecords', + action='store_true', + help=allrecords_help) + + parser.add_argument('-p', '--postappend', + action='store_true', + help=post_append_help) + + parser.add_argument('-u', '--unsurt', + action='store_true', + help=unsurt_help) + + parser.add_argument('output', nargs='?', default='-', help=output_help) + parser.add_argument('inputs', nargs='+', help=input_help) + + cmd = parser.parse_args(args=args) + if cmd.output != '-' and os.path.isdir(cmd.output): + index_to_dir(cmd.inputs, cmd.output, cmd.sort, + not cmd.unsurt, cmd.allrecords, cmd.postappend) + else: + index_to_file(cmd.inputs, cmd.output, cmd.sort, + not cmd.unsurt, cmd.allrecords, cmd.postappend) + + +if __name__ == '__main__': + main() diff --git a/pywb/warc/recordloader.py b/pywb/warc/recordloader.py index 42c1923d..98355a5d 100644 --- a/pywb/warc/recordloader.py +++ b/pywb/warc/recordloader.py @@ -46,8 +46,7 @@ class ArcWarcRecordLoader: HTTP_VERBS = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'OPTIONS', 'CONNECT', 'PATCH'] - def __init__(self, loader=None, cookie_maker=None, block_size=8192, - parse_request=False): + def __init__(self, loader=None, cookie_maker=None, block_size=8192): if not loader: loader = BlockLoader(cookie_maker) @@ -59,9 +58,7 @@ class ArcWarcRecordLoader: self.warc_parser = StatusAndHeadersParser(self.WARC_TYPES) self.http_parser = StatusAndHeadersParser(self.HTTP_TYPES) - self.parse_request = parse_request - if self.parse_request: - self.http_req_parser = StatusAndHeadersParser(self.HTTP_VERBS) + self.http_req_parser = StatusAndHeadersParser(self.HTTP_VERBS) def load(self, url, offset, length): """ Load a single record from given url at offset with length @@ -117,9 +114,6 @@ class ArcWarcRecordLoader: except ValueError: is_err = True - # ================================================================ - # handle different types of records - # err condition if is_err: status_headers = StatusAndHeaders('-', []) @@ -128,6 +122,15 @@ class ArcWarcRecordLoader: elif length == 0: status_headers = StatusAndHeaders('204 No Content', []) + # limit stream to the length for all valid records + stream = LimitReader.wrap_stream(stream, length) + + if length == 0: + # already handled error case above + pass + + # ================================================================ + # handle different types of records # special case: warc records that are not expected to have http headers # attempt to add 200 status and content-type elif rec_type == 'metadata' or rec_type == 'resource': @@ -138,33 +141,16 @@ class ArcWarcRecordLoader: elif (rec_type == 'warcinfo' or rec_type == 'arc_header'): - # not parsing these for now + # no extra parsing of body for these status_headers = StatusAndHeaders('204 No Content', []) elif (rec_type == 'request'): - if self.parse_request: - status_headers = self.http_req_parser.parse(stream) - else: - status_headers = StatusAndHeaders('204 No Content', []) + status_headers = self.http_req_parser.parse(stream) - # special case: http 0.9 response, no status or headers - #elif rec_type == 'response': - # content_type = rec_headers.get_header('Content-Type') - # if content_type and (';version=0.9' in content_type): - # status_headers = StatusAndHeaders('200 OK', []) - - # response record: parse HTTP status and headers! + # response record: parse HTTP status and headers! else: - #(statusline, http_headers) = self.parse_http_headers(stream) status_headers = self.http_parser.parse(stream) - # limit the stream to the remainder, if >0 - # should always be valid, but just in case, still stream if - # content-length was not set - remains = length - status_headers.total_len - if remains >= 0: - stream = LimitReader.wrap_stream(stream, remains) - return ArcWarcRecord(the_format, rec_type, rec_headers, stream, status_headers) diff --git a/pywb/warc/test/test_indexing.py b/pywb/warc/test/test_indexing.py index 11d4458c..e8fbbd84 100644 --- a/pywb/warc/test/test_indexing.py +++ b/pywb/warc/test/test_indexing.py @@ -24,6 +24,15 @@ com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 20 com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 896 3161 example.warc org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 854 4771 example.warc +# warc all +>>> print_cdx_index('example.warc', include_all=True) + CDX N b a m s k r M S V g +com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1987 460 example.warc +com,example)/?example=1 20140103030321 http://example.com?example=1 - - - - - 706 2451 example.warc +com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 896 3161 example.warc +com,example)/?example=1 20140103030341 http://example.com?example=1 - - - - - 706 4061 example.warc +org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 854 4771 example.warc + # arc.gz >>> print_cdx_index('example.arc.gz') CDX N b a m s k r M S V g @@ -101,7 +110,7 @@ org,iana,example)/ 20130702195402 http://example.iana.org/ text/html 200 B2LTWWP >>> cli_lines(['--sort', '-a', '-p', TEST_WARC_DIR]) com,example)/ 20130729195151 http://test@example.com/ warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 591 355 example-url-agnostic-revisit.warc.gz org,iana,example)/ 20130702195402 http://example.iana.org/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1001 353 example-url-agnostic-orig.warc.gz -392 +395 # test writing to stdout >>> cli_lines(['-', TEST_WARC_DIR + 'example.warc.gz']) @@ -124,7 +133,9 @@ org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example tex """ from pywb import get_test_dir -from pywb.warc.archiveindexer import ArchiveIndexer, main, cdx_filename + +#from pywb.warc.archiveindexer import ArchiveIndexer, main, cdx_filename +from pywb.warc.cdxindexer import write_index, main, cdx_filename, CDXWriter, SortedCDXWriter from io import BytesIO import sys @@ -149,14 +160,16 @@ def read_fully(cdx): def cdx_index(warc, sort=False, include_all=False, append_post_query=False): buff = BytesIO() - with open(TEST_WARC_DIR + warc) as fh: - indexer = ArchiveIndexer(fh, warc, - out=buff, - sort=sort, - include_all=include_all, - append_post_query=append_post_query) - indexer.make_index() + if sort: + writer_cls = SortedCDXWriter + else: + writer_cls = CDXWriter + + with writer_cls(buff) as writer: + with open(TEST_WARC_DIR + warc) as fh: + write_index(writer, warc, fh, + True, append_post_query, include_all) return buff.getvalue() diff --git a/pywb/warc/test/test_loading.py b/pywb/warc/test/test_loading.py index f744aa9f..347a694e 100644 --- a/pywb/warc/test/test_loading.py +++ b/pywb/warc/test/test_loading.py @@ -330,7 +330,7 @@ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - \ def load_test_archive(test_file, offset, length): path = test_warc_dir + test_file - testloader = ArcWarcRecordLoader(parse_request=True) + testloader = ArcWarcRecordLoader() archive = testloader.load(path, offset, length) diff --git a/setup.py b/setup.py index e08fac5d..9bf782b0 100755 --- a/setup.py +++ b/setup.py @@ -84,7 +84,7 @@ setup( [console_scripts] wayback = pywb.apps.wayback:main cdx-server = pywb.apps.cdx_server:main - cdx-indexer = pywb.warc.archiveindexer:main + cdx-indexer = pywb.warc.cdxindexer:main live-rewrite-server = pywb.apps.live_rewrite_server:main """, zip_safe=False,