diff --git a/pywb/utils/bufferedreaders.py b/pywb/utils/bufferedreaders.py index 4ab72e5f..f434e492 100644 --- a/pywb/utils/bufferedreaders.py +++ b/pywb/utils/bufferedreaders.py @@ -29,30 +29,48 @@ class DecompressingBufferedReader(object): DECOMPRESSORS = {'gzip': gzip_decompressor} - def __init__(self, stream, block_size=1024, decomp_type=None): + def __init__(self, stream, block_size=1024, + decomp_type=None, + starting_data=None): self.stream = stream self.block_size = block_size + self._init_decomp(decomp_type) + + self.buff = None + self.starting_data = starting_data + self.num_read = 0 + self.buff_size = 0 + + def _init_decomp(self, decomp_type): if decomp_type: try: + self.decomp_type = decomp_type self.decompressor = self.DECOMPRESSORS[decomp_type.lower()]() except KeyError: raise Exception('Decompression type not supported: ' + decomp_type) else: + self.decomp_type = None self.decompressor = None - self.buff = None - self.num_read = 0 - self.buff_size = 0 - def _fillbuff(self, block_size=None): - if not block_size: - block_size = self.block_size + if not self.empty(): + return - if not self.buff or self.buff.tell() == self.buff_size: + # can't read past next member + if self.rem_length() > 0: + return + + if self.starting_data: + data = self.starting_data + self.starting_data = None + else: + if not block_size: + block_size = self.block_size data = self.stream.read(block_size) - self._process_read(data) + + self._process_read(data) def _process_read(self, data): data = self._decompress(data) @@ -81,6 +99,9 @@ class DecompressingBufferedReader(object): or at a buffer boundary. If at a boundary, the subsequent call will fill buffer anew. """ + if length == 0: + return '' + self._fillbuff() return self.buff.read(length) @@ -91,8 +112,12 @@ class DecompressingBufferedReader(object): If no newline found at end, try filling buffer again in case at buffer boundary. """ + if length == 0: + return '' + self._fillbuff() linebuff = self.buff.readline(length) + # we may be at a boundary while not linebuff.endswith('\n'): if length: @@ -102,13 +127,33 @@ class DecompressingBufferedReader(object): self._fillbuff() - if self.buff_size == 0: + if self.empty(): break linebuff += self.buff.readline(length) return linebuff + def empty(self): + return not self.buff or self.buff.tell() >= self.buff_size + + def read_next_member(self): + if not self.decompressor or not self.decompressor.unused_data: + return False + + self.starting_data = self.decompressor.unused_data + self._init_decomp(self.decomp_type) + return True + + def rem_length(self): + rem = 0 + if self.buff: + rem = self.buff_size - self.buff.tell() + + if self.decompressor and self.decompressor.unused_data: + rem += len(self.decompressor.unused_data) + return rem + def close(self): if self.stream: self.stream.close() @@ -123,37 +168,20 @@ class ChunkedDataException(Exception): #================================================================= class ChunkedDataReader(DecompressingBufferedReader): r""" - A ChunkedDataReader is a BufferedReader which also supports de-chunking - of the data if it happens to be http 'chunk-encoded'. + A ChunkedDataReader is a DecompressingBufferedReader + which also supports de-chunking of the data if it happens + to be http 'chunk-encoded'. If at any point the chunked header is not available, the stream is assumed to not be chunked and no more dechunking occurs. - - Properly formatted chunked data: - >>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n0\r\n\r\n")); - >>> c.read() + c.read() - '1234' - - Non-chunked data: - >>> ChunkedDataReader(BytesIO("xyz123!@#")).read() - 'xyz123!@#' - - Starts like chunked data, but isn't: - >>> c = ChunkedDataReader(BytesIO("1\r\nxyz123!@#")); - >>> c.read() + c.read() - '1\r\nx123!@#' - - Chunked data cut off part way through: - >>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12")); - >>> c.read() + c.read() - '123412' """ + def __init__(self, stream, raise_exceptions=False, **kwargs): + super(ChunkedDataReader, self).__init__(stream, **kwargs) + self.all_chunks_read = False + self.not_chunked = False - all_chunks_read = False - not_chunked = False - - # if False, we'll use best-guess fallback for parse errors - raise_chunked_data_exceptions = False + # if False, we'll use best-guess fallback for parse errors + self.raise_chunked_data_exceptions = raise_exceptions def _fillbuff(self, block_size=None): if self.not_chunked: @@ -162,7 +190,7 @@ class ChunkedDataReader(DecompressingBufferedReader): if self.all_chunks_read: return - if not self.buff or self.buff.tell() >= self.buff_size: + if self.empty(): length_header = self.stream.readline(64) self._data = '' @@ -222,7 +250,3 @@ class ChunkedDataReader(DecompressingBufferedReader): # hand to base class for further processing self._process_read(self._data) - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/pywb/utils/loaders.py b/pywb/utils/loaders.py index f2d358f5..ccbe960e 100644 --- a/pywb/utils/loaders.py +++ b/pywb/utils/loaders.py @@ -146,17 +146,28 @@ class LimitReader(object): self.stream = stream self.limit = limit - if not self.limit: - self.limit = 1 - def read(self, length=None): - length = min(length, self.limit) if length else self.limit + if length is not None: + length = min(length, self.limit) + else: + length = self.limit + + if length == 0: + return '' + buff = self.stream.read(length) self.limit -= len(buff) return buff def readline(self, length=None): - length = min(length, self.limit) if length else self.limit + if length is not None: + length = min(length, self.limit) + else: + length = self.limit + + if length == 0: + return '' + buff = self.stream.readline(length) self.limit -= len(buff) return buff @@ -172,7 +183,7 @@ class LimitReader(object): """ try: content_length = int(content_length) - if content_length > 0: + if content_length >= 0: stream = LimitReader(stream, content_length) except (ValueError, TypeError): @@ -199,11 +210,11 @@ class SeekableTextFileReader(object): def getsize(self): return self.size - def read(self): - return self.fh.read() + def read(self, length=None): + return self.fh.read(length) - def readline(self): - return self.fh.readline() + def readline(self, length=None): + return self.fh.readline(length) def seek(self, offset): return self.fh.seek(offset) diff --git a/pywb/utils/statusandheaders.py b/pywb/utils/statusandheaders.py index 64102b1c..24dcf784 100644 --- a/pywb/utils/statusandheaders.py +++ b/pywb/utils/statusandheaders.py @@ -68,7 +68,7 @@ class StatusAndHeadersParser(object): def __init__(self, statuslist): self.statuslist = statuslist - def parse(self, stream): + def parse(self, stream, full_statusline=None): """ parse stream for status line and headers return a StatusAndHeaders object @@ -76,9 +76,15 @@ class StatusAndHeadersParser(object): support continuation headers starting with space or tab """ # status line w newlines intact - full_statusline = stream.readline() + if full_statusline is None: + full_statusline = stream.readline() + statusline, total_read = _strip_count(full_statusline, 0) + # at end of stream + if total_read == 0: + raise EOFError() + protocol_status = self.split_prefix(statusline, self.statuslist) if not protocol_status: diff --git a/pywb/utils/test/test_bufferedreaders.py b/pywb/utils/test/test_bufferedreaders.py new file mode 100644 index 00000000..558f8782 --- /dev/null +++ b/pywb/utils/test/test_bufferedreaders.py @@ -0,0 +1,103 @@ +r""" +# DecompressingBufferedReader Tests +#================================================================= + +# DecompressingBufferedReader readline() +>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline() +' CDX N b a m s k r M S V g\n' + +# detect not compressed +>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline() +' CDX N b a m s k r M S V g\n' + +# decompress with on the fly compression +>>> DecompressingBufferedReader(BytesIO(compress('ABC\n1234\n')), decomp_type = 'gzip').read() +'ABC\n1234\n' + +# error: invalid compress type +>>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = 'bzip2').read() +Traceback (most recent call last): +Exception: Decompression type not supported: bzip2 + +# error: compressed member, followed by not compressed -- considered invalid +>>> x = DecompressingBufferedReader(BytesIO(compress('ABC') + '123'), decomp_type = 'gzip') +>>> b = x.read() +>>> b = x.read_next_member() +>>> x.read() +Traceback (most recent call last): +error: Error -3 while decompressing: incorrect header check + +# DecompressingBufferedReader readline() with decompression (zipnum file, no header) +>>> DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline() +'com,example)/ 20140127171200 http://example.com text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1046 334 dupes.warc.gz\n' + +# test very small block size +>>> dbr = DecompressingBufferedReader(BytesIO('ABCDEFG\nHIJKLMN\nOPQR\nXYZ'), block_size = 3) +>>> dbr.readline(); dbr.readline(4); dbr.readline(); dbr.readline(); dbr.readline(2); dbr.readline(); dbr.readline() +'ABCDEFG\n' +'HIJK' +'LMN\n' +'OPQR\n' +'XY' +'Z' +'' + +# test zero length reads +>>> x = DecompressingBufferedReader(LimitReader(BytesIO('\r\n'), 1)) +>>> x.readline(0); x.read(0) +'' +'' + +# Chunk-Decoding Buffered Reader Tests +#================================================================= + +Properly formatted chunked data: +>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n0\r\n\r\n")); +>>> c.read() + c.read() + c.read() +'1234' + +Non-chunked data: +>>> ChunkedDataReader(BytesIO("xyz123!@#")).read() +'xyz123!@#' + +Starts like chunked data, but isn't: +>>> c = ChunkedDataReader(BytesIO("1\r\nxyz123!@#")); +>>> c.read() + c.read() +'1\r\nx123!@#' + +Chunked data cut off part way through: +>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12")); +>>> c.read() + c.read() +'123412' + +Chunked data cut off with exceptions +>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"), raise_exceptions=True) +>>> c.read() + c.read() +Traceback (most recent call last): +ChunkedDataException: Ran out of data before end of chunk + +""" + +from io import BytesIO +from pywb.utils.bufferedreaders import ChunkedDataReader +from pywb.utils.bufferedreaders import DecompressingBufferedReader +from pywb.utils.loaders import LimitReader + +from pywb import get_test_dir + +import zlib + +test_cdx_dir = get_test_dir() + 'cdx/' +test_zip_dir = get_test_dir() + 'zipcdx/' + + +def compress(buff): + compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) + compressed = compressobj.compress(buff) + compressed += compressobj.flush() + + return compressed + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/pywb/utils/test/test_loaders.py b/pywb/utils/test/test_loaders.py index f66cee01..c88805b5 100644 --- a/pywb/utils/test/test_loaders.py +++ b/pywb/utils/test/test_loaders.py @@ -10,11 +10,32 @@ >>> read_multiple(LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 10), [2, 2, 20]) 'efghji' +# zero-length read +>>> LimitReader(BytesIO('a'), 0).readline(0) +'' + +# don't wrap if invalid length +>>> b = BytesIO('b') +>>> LimitReader.wrap_stream(b, 'abc') == b +True + # BlockLoader Tests (includes LimitReader) # Ensure attempt to read more than 100 bytes, reads exactly 100 bytes >>> len(BlockLoader().load(test_cdx_dir + 'iana.cdx', 0, 100).read('400')) 100 +# no length specified, read full amount requested +>>> len(BlockLoader().load('file://' + test_cdx_dir + 'example.cdx', 0, -1).read(400)) +400 + +# HMAC Cookie Maker +>>> BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read() +'Example Domain' + +# test with extra id, ensure 4 parts of the A-B=C-D form are present +>>> len(re.split('[-=]', HMACCookieMaker('test', 'test', 5).make('extra'))) +4 + # SeekableTextFileReader Test >>> sr = SeekableTextFileReader(test_cdx_dir + 'iana.cdx') >>> sr.getsize() @@ -23,47 +44,21 @@ >>> seek_read_full(sr, 100) 'org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200826 http://www.iana.org/_css/2013.1/fonts/Inconsolata.otf application/octet-stream 200 LNMEDYOENSOEI5VPADCKL3CB6N3GWXPR - - 34054 620049 iana.warc.gz\\n' -# Buffered Reader Tests -#================================================================= - -#DecompressingBufferedReader readline() ->>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline() -' CDX N b a m s k r M S V g\\n' - ->>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline() -' CDX N b a m s k r M S V g\\n' - -#DecompressingBufferedReader readline() with decompression (zipnum file, no header) ->>> DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline() -'com,example)/ 20140127171200 http://example.com text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1046 334 dupes.warc.gz\\n' - ->>> BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read() -'Example Domain' - -# test very small block size ->>> dbr = DecompressingBufferedReader(BytesIO('ABCDEFG\\nHIJKLMN\\nOPQR\\nXYZ'), block_size = 3) ->>> dbr.readline(); dbr.readline(4); dbr.readline(); dbr.readline(); dbr.readline(2); dbr.readline(); dbr.readline() -'ABCDEFG\\n' -'HIJK' -'LMN\\n' -'OPQR\\n' -'XY' -'Z' -'' +# seek, read, close +>>> r = sr.seek(0); sr.read(10); sr.close() +' CDX N b a' """ #================================================================= -import os -from io import BytesIO, open +import re +from io import BytesIO from pywb.utils.loaders import BlockLoader, HMACCookieMaker from pywb.utils.loaders import LimitReader, SeekableTextFileReader -from pywb.utils.bufferedreaders import DecompressingBufferedReader from pywb import get_test_dir -#test_cdx_dir = os.path.dirname(os.path.realpath(__file__)) + '/../sample-data/' + test_cdx_dir = get_test_dir() + 'cdx/' -test_zip_dir = get_test_dir() + 'zipcdx/' def read_multiple(reader, inc_reads): result = None diff --git a/pywb/warc/archiveindexer.py b/pywb/warc/archiveindexer.py index 6f556349..eb0f0054 100644 --- a/pywb/warc/archiveindexer.py +++ b/pywb/warc/archiveindexer.py @@ -1,6 +1,6 @@ -from gzip2 import GzipFile - from pywb.utils.timeutils import iso_date_to_timestamp +from pywb.utils.bufferedreaders import DecompressingBufferedReader + from recordloader import ArcWarcRecordLoader import surt @@ -8,38 +8,151 @@ import hashlib import base64 import re +import sys + +from bisect import insort #================================================================= class ArchiveIndexer(object): - """ - Generate a CDX index for WARC and ARC files, both gzip chunk + """ Generate a CDX index for WARC and ARC files, both gzip chunk compressed and uncompressed The indexer will automatically detect format, and decompress if necessary """ - def __init__(self, filename): + def __init__(self, fileobj, filename, out=sys.stdout, sort=False): + self.fh = fileobj self.filename = filename self.loader = ArcWarcRecordLoader() + self.offset = 0 + self.known_format = None + + if not out: + out = sys.stdout + + if sort: + self.writer = SortedCDXWriter(out) + else: + self.writer = CDXWriter(out) def make_index(self): - self.fh = open(self.filename, 'r') + """ Output a cdx index! + """ - # assume gzip until proven otherwise - gf = GzipFile(fileobj=self.fh) + decomp_type = 'gzip' + block_size = 16384 - member = gf.read_member() + reader = DecompressingBufferedReader(self.fh, + block_size=block_size, + decomp_type=decomp_type) + self.offset = self.fh.tell() + next_line = None - offset = self.fh.tell() + self.writer.start() - print ' CDX N b a m s k r M S V g' + try: + while True: + try: + record = self._process_reader(reader, next_line) + except EOFError: + break - while member: - offset = self._process_member(member, offset) - member = gf.read_member() + # for non-compressed, consume blank lines here + if not reader.decompressor: + next_line = self._consume_blanklines(reader) + if next_line is None: + # at end of file + break + + # reset reader for next member + else: + reader.read_next_member() + finally: + self.writer.end() + + def _consume_blanklines(self, reader): + """ Consume blank lines that are between records + - For warcs, there are usually 2 + - For arcs, may be 1 or 0 + - For block gzipped files, these are at end of each gzip envelope + and are included in record length which is the full gzip envelope + - For uncompressed, they are between records and so are NOT part of + the record length + """ + while True: + line = reader.readline() + if len(line) == 0: + return None + + if line.rstrip() == '': + self.offset = self.fh.tell() - reader.rem_length() + continue + + return line + + def _read_to_record_end(self, reader, record): + """ Read to end of record and update current offset, + which is used to compute record length + - For compressed files, blank lines are consumed + since they are part of record length + - For uncompressed files, blank lines are read later, + and not included in the record length + """ + + if reader.decompressor: + self._consume_blanklines(reader) + + self.offset = self.fh.tell() - reader.rem_length() + + def _process_reader(self, reader, next_line): + """ Use loader to parse the record from the reader stream + Supporting warc and arc records + """ + record = self.loader.parse_record_stream(reader, + next_line, + self.known_format) + + # Track known format for faster parsing of other records + self.known_format = record.format + + if record.format == 'warc': + result = self._parse_warc_record(record) + elif record.format == 'arc': + result = self._parse_arc_record(record) + + if not result: + self.read_rest(record.stream) + self._read_to_record_end(reader, record) + return record + + # generate digest if it doesn't exist and if not a revisit + # if revisit, then nothing we can do here + if result[-1] == '-' and record.rec_type != 'revisit': + digester = hashlib.sha1() + self.read_rest(record.stream, digester) + result[-1] = base64.b32encode(digester.digest()) + else: + num = self.read_rest(record.stream) + + result.append('- -') + + offset = self.offset + self._read_to_record_end(reader, record) + length = self.offset - offset + + result.append(str(length)) + result.append(str(offset)) + result.append(self.filename) + + self.writer.write(result) + + return record def _parse_warc_record(self, record): + """ Parse warc record to be included in index, or + return none if skipping this type of record + """ if record.rec_type not in ('response', 'revisit', 'metadata', 'resource'): return None @@ -66,88 +179,117 @@ class ArchiveIndexer(object): if not digest: digest = '-' - #result = OrderedDict() - return [ surt.surt(url), - timestamp, - url, - mime, - status, - digest ] + return [surt.surt(url), + timestamp, + url, + mime, + status, + digest] def _parse_arc_record(self, record): + """ Parse arc record and return list of fields + to include in index, or retur none if skipping this + type of record + """ if record.rec_type == 'arc_header': return None url = record.rec_headers.get_header('uri') + url = url.replace('\r', '%0D') + url = url.replace('\n', '%0A') + # replace formfeed + url = url.replace('\x0c', '%0C') + # replace nulls + url = url.replace('\x00', '%00') + + timestamp = record.rec_headers.get_header('archive-date') + if len(timestamp) > 14: + timestamp = timestamp[:14] status = record.status_headers.statusline.split(' ')[0] mime = record.rec_headers.get_header('content-type') mime = self._extract_mime(mime) - return [ surt.surt(url), - record.rec_headers.get_header('archive-date'), - url, - mime, - status, - '-' ] + return [surt.surt(url), + timestamp, + url, + mime, + status, + '-'] MIME_RE = re.compile('[; ]') def _extract_mime(self, mime): + """ Utility function to extract mimetype only + from a full content type, removing charset settings + """ if mime: mime = self.MIME_RE.split(mime, 1)[0] if not mime: mime = 'unk' return mime - def _process_member(self, member, offset): - record = self.loader.parse_record_stream(member) - - if record.format == 'warc': - result = self._parse_warc_record(record) - elif record.format == 'arc': - result = self._parse_arc_record(record) - - if not result: - self.read_rest(member) - new_offset = self.fh.tell() - return new_offset - - # generate digest if doesn't exist - if result[-1] == '-': - digester = hashlib.sha1() - self.read_rest(record.stream, digester) - result[-1] = base64.b32encode(digester.digest()) - else: - self.read_rest(record.stream) - - result.append('- -') - - #self.read_rest(member) - #print len(member.read(16)) - b = member.read(5) - - new_offset = self.fh.tell() - length = new_offset - offset - - result.append(str(length)) - result.append(str(offset)) - result.append(self.filename) - - print ' '.join(result) - - return new_offset - - def read_rest(self, member, digester=None): + def read_rest(self, reader, digester=None): + """ Read remainder of the stream + If a digester is included, update it + with the data read + """ + num = 0 while True: - b = member.read(8192) + b = reader.read(8192) if not b: break + num += len(b) if digester: digester.update(b) + return num +#================================================================= +class CDXWriter(object): + def __init__(self, out): + self.out = out + + def start(self): + self.out.write(' CDX N b a m s k r M S V g\n') + + def write(self, line): + self.out.write(' '.join(line) + '\n') + + def end(self): + pass + + +#================================================================= +class SortedCDXWriter(object): + def __init__(self, out): + self.out = out + self.sortlist = [] + + def start(self): + self.out.write(' CDX N b a m s k r M S V g\n') + pass + + def write(self, line): + line = ' '.join(line) + '\n' + insort(self.sortlist, line) + + def end(self): + self.out.write(''.join(self.sortlist)) + + +#================================================================= if __name__ == "__main__": - import sys - if len(sys.argv) > 1: - index = ArchiveIndexer(sys.argv[1]) - index.make_index() + if len(sys.argv) < 2: + print 'USAGE {0} '.format(sys.argv[0]) + exit(0) + + filename = sys.argv[1] + + if len(sys.argv) >= 3: + sort = sys.argv[2] == '--sort' + else: + sort = False + + with open(filename, 'r') as fh: + index = ArchiveIndexer(fh, filename, sort=sort) + index.make_index() diff --git a/pywb/warc/recordloader.py b/pywb/warc/recordloader.py index f1302458..9189b2c7 100644 --- a/pywb/warc/recordloader.py +++ b/pywb/warc/recordloader.py @@ -49,6 +49,9 @@ class ArcWarcRecordLoader: self.http_parser = StatusAndHeadersParser(['HTTP/1.0', 'HTTP/1.1']) def load(self, url, offset, length): + """ Load a single record from given url at offset with length + and parse as either warc or arc record + """ try: length = int(length) except: @@ -64,8 +67,19 @@ class ArcWarcRecordLoader: return self.parse_record_stream(stream) - def parse_record_stream(self, stream): - (the_format, rec_headers) = self._detect_type_load_headers(stream) + def parse_record_stream(self, stream, + statusline=None, known_format=None): + """ Parse file-like stream and return an ArcWarcRecord + encapsulating the record headers, http headers (if any), + and a stream limited to the remainder of the record. + + Pass statusline and known_format to detect_type_loader_headers() + to faciliate parsing. + """ + (the_format, rec_headers) = (self. + _detect_type_load_headers(stream, + statusline, + known_format)) if the_format == 'arc': if rec_headers.get_header('uri').startswith('filedesc://'): @@ -73,10 +87,18 @@ class ArcWarcRecordLoader: length = 0 else: rec_type = 'response' - length = int(rec_headers.get_header('length')) + length = rec_headers.get_header('length') + elif the_format == 'warc': rec_type = rec_headers.get_header('WARC-Type') - length = int(rec_headers.get_header('Content-Length')) + length = rec_headers.get_header('Content-Length') + + try: + length = int(length) + if length < 0: + length = 0 + except ValueError: + length = 0 # ================================================================ # handle different types of records @@ -114,35 +136,44 @@ class ArcWarcRecordLoader: # should always be valid, but just in case, still stream if # content-length was not set remains = length - status_headers.total_len - if remains > 0: + if remains >= 0: stream = LimitReader.wrap_stream(stream, remains) return ArcWarcRecord(the_format, rec_type, rec_headers, stream, status_headers) - def _detect_type_load_headers(self, stream): - """ - Try parsing record as WARC, then try parsing as ARC. + def _detect_type_load_headers(self, stream, + statusline=None, known_format=None): + """ If known_format is specified ('warc' or 'arc'), + parse only as that format. + + Otherwise, try parsing record as WARC, then try parsing as ARC. if neither one succeeds, we're out of luck. """ - statusline = None + if known_format != 'arc': + # try as warc first + try: + rec_headers = self.warc_parser.parse(stream, statusline) + return 'warc', rec_headers + except StatusAndHeadersParserException as se: + if known_format == 'warc': + msg = 'Invalid WARC record, first line: ' + raise ArchiveLoadFailed(msg + str(se.statusline)) - # try as warc first - try: - rec_headers = self.warc_parser.parse(stream) - return 'warc', rec_headers - except StatusAndHeadersParserException as se: - statusline = se.statusline - pass + statusline = se.statusline + pass # now try as arc try: rec_headers = self.arc_parser.parse(stream, statusline) return 'arc', rec_headers except StatusAndHeadersParserException as se: - msg = 'Unknown archive format, first line: ' + str(se.statusline) - raise ArchiveLoadFailed(msg) + if known_format == 'arc': + msg = 'Invalid WARC record, first line: ' + else: + msg = 'Unknown archive format, first line: ' + raise ArchiveLoadFailed(msg + str(se.statusline)) #================================================================= @@ -155,16 +186,25 @@ class ARCHeadersParser: total_read = 0 # if headerline passed in, use that - if not headerline: + if headerline is None: headerline = stream.readline() total_read = len(headerline) + + if total_read == 0: + raise EOFError() + headerline = headerline.rstrip() - parts = headerline.split() - headernames = self.headernames + # if arc header, consume next two lines + if headerline.startswith('filedesc://'): + stream.readline() # skip version + stream.readline() # skip header spec, use preset one + + parts = headerline.split(' ') + if len(parts) != len(headernames): msg = 'Wrong # of headers, expected arc headers {0}, Found {1}' msg = msg.format(headernames, parts) diff --git a/pywb/warc/test/test_indexing.py b/pywb/warc/test/test_indexing.py new file mode 100644 index 00000000..b323d679 --- /dev/null +++ b/pywb/warc/test/test_indexing.py @@ -0,0 +1,81 @@ +r""" + +# warc.gz +>>> print_cdx_index('example.warc.gz') + CDX N b a m s k r M S V g +com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1043 333 example.warc.gz +com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 553 1864 example.warc.gz +org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 577 2907 example.warc.gz + +# warc +>>> print_cdx_index('example.warc') + CDX N b a m s k r M S V g +com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1987 460 example.warc +com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 896 3161 example.warc +org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 854 4771 example.warc + +# arc.gz +>>> print_cdx_index('example.arc.gz') + CDX N b a m s k r M S V g +com,example)/ 20140216050221 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 856 171 example.arc.gz + +# arc +>>> print_cdx_index('example.arc') + CDX N b a m s k r M S V g +com,example)/ 20140216050221 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1656 151 example.arc + +# wget warc (w/ metadata) +>>> print_cdx_index('example-wget-1-14.warc.gz') + CDX N b a m s k r M S V g +com,example)/ 20140216012908 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1151 792 example-wget-1-14.warc.gz +metadata)/gnu.org/software/wget/warc/manifest.txt 20140216012908 metadata://gnu.org/software/wget/warc/MANIFEST.txt text/plain 200 SWUF4CK2XMZSOKSA7SDT7M7NUGWH2TRE - - 315 1943 example-wget-1-14.warc.gz +metadata)/gnu.org/software/wget/warc/wget_arguments.txt 20140216012908 metadata://gnu.org/software/wget/warc/wget_arguments.txt text/plain 200 UCXDCGORD6K4RJT5NUQGKE2PKEG4ZZD6 - - 340 2258 example-wget-1-14.warc.gz +metadata)/gnu.org/software/wget/warc/wget.log 20140216012908 metadata://gnu.org/software/wget/warc/wget.log text/plain 200 2ULE2LD5UOWDXGACCT624TU5BVKACRQ4 - - 599 2598 example-wget-1-14.warc.gz + +# bad arcs -- test error edge cases +>>> print_cdx_index('bad.arc') + CDX N b a m s k r M S V g +com,example)/ 20140401000000 http://example.com/ text/html 204 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 67 134 bad.arc +com,example)/ 20140401000000 http://example.com/ text/html 204 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 68 202 bad.arc +""" + +from pywb import get_test_dir +from pywb.warc.archiveindexer import ArchiveIndexer + +from io import BytesIO +import sys + +TEST_CDX_DIR = get_test_dir() + 'cdx/' +TEST_WARC_DIR = get_test_dir() + 'warcs/' + +def read_fully(cdx): + with open(TEST_CDX_DIR + cdx) as fh: + curr = BytesIO() + while True: + b = fh.read() + if not b: + break + curr.write(b) + return curr.getvalue() + +def cdx_index(warc, sort=False): + buff = BytesIO() + with open(TEST_WARC_DIR + warc) as fh: + indexer = ArchiveIndexer(fh, warc, + out=buff, + sort=sort) + + indexer.make_index() + + return buff.getvalue() + +def print_cdx_index(warc, sort=False): + sys.stdout.write(cdx_index(warc, sort)) + +def assert_cdx_match(cdx, warc, sort=False): + assert read_fully(cdx) == cdx_index(warc, sort) + +def test_sorted_warc_gz(): + assert_cdx_match('example.cdx', 'example.warc.gz', sort=True) + assert_cdx_match('dupes.cdx', 'dupes.warc.gz', sort=True) + assert_cdx_match('iana.cdx', 'iana.warc.gz', sort=True) diff --git a/pywb/warc/test/test_loading.py b/pywb/warc/test/test_loading.py index f8d81f0f..146dfb01 100644 --- a/pywb/warc/test/test_loading.py +++ b/pywb/warc/test/test_loading.py @@ -126,8 +126,8 @@ StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Acc -# Test cdx w/ revisit ->>> load_from_cdx_test('com,example)/?example=1 20140103030341 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 553 1864 example.warc.gz 1043 333 example.warc.gz') +# Test cdx w/ revisit, also no length specified +>>> load_from_cdx_test('com,example)/?example=1 20140103030341 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - - 1864 example.warc.gz 1043 333 example.warc.gz') StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Accept-Ranges', 'bytes'), ('Cache-Control', 'max-age=604800'), ('Content-Type', 'text/html'), @@ -159,6 +159,7 @@ StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Acc + # Test Url Agnostic Revisit Resolving # ============================================================================== >>> load_from_cdx_test(URL_AGNOSTIC_ORIG_CDX) @@ -218,6 +219,20 @@ Exception: ArchiveLoadFailed Exception: ArchiveLoadFailed +# Test EOF +>>> parse_stream_error(stream=None, statusline='', known_format='warc') +Exception: EOFError + +>>> parse_stream_error(stream=None, statusline='', known_format='arc') +Exception: EOFError + +# Invalid ARC +>>> parse_stream_error(stream=None, statusline='ABC', known_format='arc') +Exception: ArchiveLoadFailed + +# Invalid WARC +>>> parse_stream_error(stream=None, statusline='ABC', known_format='warc') +Exception: ArchiveLoadFailed """ import os @@ -264,6 +279,7 @@ def load_test_archive(test_file, offset, length): def load_orig_cdx(self): return [CDXObject(URL_AGNOSTIC_ORIG_CDX)] + #============================================================================== def load_from_cdx_test(cdx): resolve_loader = ResolvingLoader(test_warc_dir) @@ -276,6 +292,15 @@ def load_from_cdx_test(cdx): except ArchiveLoadFailed as e: print 'Exception: ' + e.__class__.__name__ + +#============================================================================== +def parse_stream_error(**params): + try: + return ArcWarcRecordLoader().parse_record_stream(**params) + except Exception as e: + print 'Exception: ' + e.__class__.__name__ + + if __name__ == "__main__": import doctest doctest.testmod() diff --git a/sample_archive/warcs/bad.arc b/sample_archive/warcs/bad.arc new file mode 100644 index 00000000..0d812251 --- /dev/null +++ b/sample_archive/warcs/bad.arc @@ -0,0 +1,7 @@ +filedesc://bad.arc.gz 127.0.0.1 20140301000000 text/plain -1 +1 0 Bad Capture +URL IP-address Archive-date Content-type Archive-length + +http://example.com/ 93.184.216.119 201404010000000000 text/html -1 + +http://example.com/ 93.184.216.119 201404010000000000 text/html abc