mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
archiveindexer major refactoring using zlib only
supports warc.gz, arc.gz, warc, arc and optional sorting outputs cdx 11 but possible to extend to other formats (additional edge case testing needed) DecompressingBufferedReader refactoring to support multi-member gzip Unit tests for indexer, addtional unit tests for bufferedreaders and loaders, and recordloaders
This commit is contained in:
parent
26bb695292
commit
28d65ce717
@ -29,29 +29,47 @@ class DecompressingBufferedReader(object):
|
||||
|
||||
DECOMPRESSORS = {'gzip': gzip_decompressor}
|
||||
|
||||
def __init__(self, stream, block_size=1024, decomp_type=None):
|
||||
def __init__(self, stream, block_size=1024,
|
||||
decomp_type=None,
|
||||
starting_data=None):
|
||||
self.stream = stream
|
||||
self.block_size = block_size
|
||||
|
||||
self._init_decomp(decomp_type)
|
||||
|
||||
self.buff = None
|
||||
self.starting_data = starting_data
|
||||
self.num_read = 0
|
||||
self.buff_size = 0
|
||||
|
||||
def _init_decomp(self, decomp_type):
|
||||
if decomp_type:
|
||||
try:
|
||||
self.decomp_type = decomp_type
|
||||
self.decompressor = self.DECOMPRESSORS[decomp_type.lower()]()
|
||||
except KeyError:
|
||||
raise Exception('Decompression type not supported: ' +
|
||||
decomp_type)
|
||||
else:
|
||||
self.decomp_type = None
|
||||
self.decompressor = None
|
||||
|
||||
self.buff = None
|
||||
self.num_read = 0
|
||||
self.buff_size = 0
|
||||
|
||||
def _fillbuff(self, block_size=None):
|
||||
if not self.empty():
|
||||
return
|
||||
|
||||
# can't read past next member
|
||||
if self.rem_length() > 0:
|
||||
return
|
||||
|
||||
if self.starting_data:
|
||||
data = self.starting_data
|
||||
self.starting_data = None
|
||||
else:
|
||||
if not block_size:
|
||||
block_size = self.block_size
|
||||
|
||||
if not self.buff or self.buff.tell() == self.buff_size:
|
||||
data = self.stream.read(block_size)
|
||||
|
||||
self._process_read(data)
|
||||
|
||||
def _process_read(self, data):
|
||||
@ -81,6 +99,9 @@ class DecompressingBufferedReader(object):
|
||||
or at a buffer boundary. If at a boundary, the subsequent
|
||||
call will fill buffer anew.
|
||||
"""
|
||||
if length == 0:
|
||||
return ''
|
||||
|
||||
self._fillbuff()
|
||||
return self.buff.read(length)
|
||||
|
||||
@ -91,8 +112,12 @@ class DecompressingBufferedReader(object):
|
||||
If no newline found at end, try filling buffer again in case
|
||||
at buffer boundary.
|
||||
"""
|
||||
if length == 0:
|
||||
return ''
|
||||
|
||||
self._fillbuff()
|
||||
linebuff = self.buff.readline(length)
|
||||
|
||||
# we may be at a boundary
|
||||
while not linebuff.endswith('\n'):
|
||||
if length:
|
||||
@ -102,13 +127,33 @@ class DecompressingBufferedReader(object):
|
||||
|
||||
self._fillbuff()
|
||||
|
||||
if self.buff_size == 0:
|
||||
if self.empty():
|
||||
break
|
||||
|
||||
linebuff += self.buff.readline(length)
|
||||
|
||||
return linebuff
|
||||
|
||||
def empty(self):
|
||||
return not self.buff or self.buff.tell() >= self.buff_size
|
||||
|
||||
def read_next_member(self):
|
||||
if not self.decompressor or not self.decompressor.unused_data:
|
||||
return False
|
||||
|
||||
self.starting_data = self.decompressor.unused_data
|
||||
self._init_decomp(self.decomp_type)
|
||||
return True
|
||||
|
||||
def rem_length(self):
|
||||
rem = 0
|
||||
if self.buff:
|
||||
rem = self.buff_size - self.buff.tell()
|
||||
|
||||
if self.decompressor and self.decompressor.unused_data:
|
||||
rem += len(self.decompressor.unused_data)
|
||||
return rem
|
||||
|
||||
def close(self):
|
||||
if self.stream:
|
||||
self.stream.close()
|
||||
@ -123,37 +168,20 @@ class ChunkedDataException(Exception):
|
||||
#=================================================================
|
||||
class ChunkedDataReader(DecompressingBufferedReader):
|
||||
r"""
|
||||
A ChunkedDataReader is a BufferedReader which also supports de-chunking
|
||||
of the data if it happens to be http 'chunk-encoded'.
|
||||
A ChunkedDataReader is a DecompressingBufferedReader
|
||||
which also supports de-chunking of the data if it happens
|
||||
to be http 'chunk-encoded'.
|
||||
|
||||
If at any point the chunked header is not available, the stream is
|
||||
assumed to not be chunked and no more dechunking occurs.
|
||||
|
||||
Properly formatted chunked data:
|
||||
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n0\r\n\r\n"));
|
||||
>>> c.read() + c.read()
|
||||
'1234'
|
||||
|
||||
Non-chunked data:
|
||||
>>> ChunkedDataReader(BytesIO("xyz123!@#")).read()
|
||||
'xyz123!@#'
|
||||
|
||||
Starts like chunked data, but isn't:
|
||||
>>> c = ChunkedDataReader(BytesIO("1\r\nxyz123!@#"));
|
||||
>>> c.read() + c.read()
|
||||
'1\r\nx123!@#'
|
||||
|
||||
Chunked data cut off part way through:
|
||||
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"));
|
||||
>>> c.read() + c.read()
|
||||
'123412'
|
||||
"""
|
||||
|
||||
all_chunks_read = False
|
||||
not_chunked = False
|
||||
def __init__(self, stream, raise_exceptions=False, **kwargs):
|
||||
super(ChunkedDataReader, self).__init__(stream, **kwargs)
|
||||
self.all_chunks_read = False
|
||||
self.not_chunked = False
|
||||
|
||||
# if False, we'll use best-guess fallback for parse errors
|
||||
raise_chunked_data_exceptions = False
|
||||
self.raise_chunked_data_exceptions = raise_exceptions
|
||||
|
||||
def _fillbuff(self, block_size=None):
|
||||
if self.not_chunked:
|
||||
@ -162,7 +190,7 @@ class ChunkedDataReader(DecompressingBufferedReader):
|
||||
if self.all_chunks_read:
|
||||
return
|
||||
|
||||
if not self.buff or self.buff.tell() >= self.buff_size:
|
||||
if self.empty():
|
||||
length_header = self.stream.readline(64)
|
||||
self._data = ''
|
||||
|
||||
@ -222,7 +250,3 @@ class ChunkedDataReader(DecompressingBufferedReader):
|
||||
|
||||
# hand to base class for further processing
|
||||
self._process_read(self._data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
doctest.testmod()
|
||||
|
@ -146,17 +146,28 @@ class LimitReader(object):
|
||||
self.stream = stream
|
||||
self.limit = limit
|
||||
|
||||
if not self.limit:
|
||||
self.limit = 1
|
||||
|
||||
def read(self, length=None):
|
||||
length = min(length, self.limit) if length else self.limit
|
||||
if length is not None:
|
||||
length = min(length, self.limit)
|
||||
else:
|
||||
length = self.limit
|
||||
|
||||
if length == 0:
|
||||
return ''
|
||||
|
||||
buff = self.stream.read(length)
|
||||
self.limit -= len(buff)
|
||||
return buff
|
||||
|
||||
def readline(self, length=None):
|
||||
length = min(length, self.limit) if length else self.limit
|
||||
if length is not None:
|
||||
length = min(length, self.limit)
|
||||
else:
|
||||
length = self.limit
|
||||
|
||||
if length == 0:
|
||||
return ''
|
||||
|
||||
buff = self.stream.readline(length)
|
||||
self.limit -= len(buff)
|
||||
return buff
|
||||
@ -172,7 +183,7 @@ class LimitReader(object):
|
||||
"""
|
||||
try:
|
||||
content_length = int(content_length)
|
||||
if content_length > 0:
|
||||
if content_length >= 0:
|
||||
stream = LimitReader(stream, content_length)
|
||||
|
||||
except (ValueError, TypeError):
|
||||
@ -199,11 +210,11 @@ class SeekableTextFileReader(object):
|
||||
def getsize(self):
|
||||
return self.size
|
||||
|
||||
def read(self):
|
||||
return self.fh.read()
|
||||
def read(self, length=None):
|
||||
return self.fh.read(length)
|
||||
|
||||
def readline(self):
|
||||
return self.fh.readline()
|
||||
def readline(self, length=None):
|
||||
return self.fh.readline(length)
|
||||
|
||||
def seek(self, offset):
|
||||
return self.fh.seek(offset)
|
||||
|
@ -68,7 +68,7 @@ class StatusAndHeadersParser(object):
|
||||
def __init__(self, statuslist):
|
||||
self.statuslist = statuslist
|
||||
|
||||
def parse(self, stream):
|
||||
def parse(self, stream, full_statusline=None):
|
||||
"""
|
||||
parse stream for status line and headers
|
||||
return a StatusAndHeaders object
|
||||
@ -76,9 +76,15 @@ class StatusAndHeadersParser(object):
|
||||
support continuation headers starting with space or tab
|
||||
"""
|
||||
# status line w newlines intact
|
||||
if full_statusline is None:
|
||||
full_statusline = stream.readline()
|
||||
|
||||
statusline, total_read = _strip_count(full_statusline, 0)
|
||||
|
||||
# at end of stream
|
||||
if total_read == 0:
|
||||
raise EOFError()
|
||||
|
||||
protocol_status = self.split_prefix(statusline, self.statuslist)
|
||||
|
||||
if not protocol_status:
|
||||
|
103
pywb/utils/test/test_bufferedreaders.py
Normal file
103
pywb/utils/test/test_bufferedreaders.py
Normal file
@ -0,0 +1,103 @@
|
||||
r"""
|
||||
# DecompressingBufferedReader Tests
|
||||
#=================================================================
|
||||
|
||||
# DecompressingBufferedReader readline()
|
||||
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline()
|
||||
' CDX N b a m s k r M S V g\n'
|
||||
|
||||
# detect not compressed
|
||||
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline()
|
||||
' CDX N b a m s k r M S V g\n'
|
||||
|
||||
# decompress with on the fly compression
|
||||
>>> DecompressingBufferedReader(BytesIO(compress('ABC\n1234\n')), decomp_type = 'gzip').read()
|
||||
'ABC\n1234\n'
|
||||
|
||||
# error: invalid compress type
|
||||
>>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = 'bzip2').read()
|
||||
Traceback (most recent call last):
|
||||
Exception: Decompression type not supported: bzip2
|
||||
|
||||
# error: compressed member, followed by not compressed -- considered invalid
|
||||
>>> x = DecompressingBufferedReader(BytesIO(compress('ABC') + '123'), decomp_type = 'gzip')
|
||||
>>> b = x.read()
|
||||
>>> b = x.read_next_member()
|
||||
>>> x.read()
|
||||
Traceback (most recent call last):
|
||||
error: Error -3 while decompressing: incorrect header check
|
||||
|
||||
# DecompressingBufferedReader readline() with decompression (zipnum file, no header)
|
||||
>>> DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline()
|
||||
'com,example)/ 20140127171200 http://example.com text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1046 334 dupes.warc.gz\n'
|
||||
|
||||
# test very small block size
|
||||
>>> dbr = DecompressingBufferedReader(BytesIO('ABCDEFG\nHIJKLMN\nOPQR\nXYZ'), block_size = 3)
|
||||
>>> dbr.readline(); dbr.readline(4); dbr.readline(); dbr.readline(); dbr.readline(2); dbr.readline(); dbr.readline()
|
||||
'ABCDEFG\n'
|
||||
'HIJK'
|
||||
'LMN\n'
|
||||
'OPQR\n'
|
||||
'XY'
|
||||
'Z'
|
||||
''
|
||||
|
||||
# test zero length reads
|
||||
>>> x = DecompressingBufferedReader(LimitReader(BytesIO('\r\n'), 1))
|
||||
>>> x.readline(0); x.read(0)
|
||||
''
|
||||
''
|
||||
|
||||
# Chunk-Decoding Buffered Reader Tests
|
||||
#=================================================================
|
||||
|
||||
Properly formatted chunked data:
|
||||
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n0\r\n\r\n"));
|
||||
>>> c.read() + c.read() + c.read()
|
||||
'1234'
|
||||
|
||||
Non-chunked data:
|
||||
>>> ChunkedDataReader(BytesIO("xyz123!@#")).read()
|
||||
'xyz123!@#'
|
||||
|
||||
Starts like chunked data, but isn't:
|
||||
>>> c = ChunkedDataReader(BytesIO("1\r\nxyz123!@#"));
|
||||
>>> c.read() + c.read()
|
||||
'1\r\nx123!@#'
|
||||
|
||||
Chunked data cut off part way through:
|
||||
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"));
|
||||
>>> c.read() + c.read()
|
||||
'123412'
|
||||
|
||||
Chunked data cut off with exceptions
|
||||
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"), raise_exceptions=True)
|
||||
>>> c.read() + c.read()
|
||||
Traceback (most recent call last):
|
||||
ChunkedDataException: Ran out of data before end of chunk
|
||||
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
from pywb.utils.bufferedreaders import ChunkedDataReader
|
||||
from pywb.utils.bufferedreaders import DecompressingBufferedReader
|
||||
from pywb.utils.loaders import LimitReader
|
||||
|
||||
from pywb import get_test_dir
|
||||
|
||||
import zlib
|
||||
|
||||
test_cdx_dir = get_test_dir() + 'cdx/'
|
||||
test_zip_dir = get_test_dir() + 'zipcdx/'
|
||||
|
||||
|
||||
def compress(buff):
|
||||
compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
|
||||
compressed = compressobj.compress(buff)
|
||||
compressed += compressobj.flush()
|
||||
|
||||
return compressed
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
doctest.testmod()
|
@ -10,11 +10,32 @@
|
||||
>>> read_multiple(LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 10), [2, 2, 20])
|
||||
'efghji'
|
||||
|
||||
# zero-length read
|
||||
>>> LimitReader(BytesIO('a'), 0).readline(0)
|
||||
''
|
||||
|
||||
# don't wrap if invalid length
|
||||
>>> b = BytesIO('b')
|
||||
>>> LimitReader.wrap_stream(b, 'abc') == b
|
||||
True
|
||||
|
||||
# BlockLoader Tests (includes LimitReader)
|
||||
# Ensure attempt to read more than 100 bytes, reads exactly 100 bytes
|
||||
>>> len(BlockLoader().load(test_cdx_dir + 'iana.cdx', 0, 100).read('400'))
|
||||
100
|
||||
|
||||
# no length specified, read full amount requested
|
||||
>>> len(BlockLoader().load('file://' + test_cdx_dir + 'example.cdx', 0, -1).read(400))
|
||||
400
|
||||
|
||||
# HMAC Cookie Maker
|
||||
>>> BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read()
|
||||
'Example Domain'
|
||||
|
||||
# test with extra id, ensure 4 parts of the A-B=C-D form are present
|
||||
>>> len(re.split('[-=]', HMACCookieMaker('test', 'test', 5).make('extra')))
|
||||
4
|
||||
|
||||
# SeekableTextFileReader Test
|
||||
>>> sr = SeekableTextFileReader(test_cdx_dir + 'iana.cdx')
|
||||
>>> sr.getsize()
|
||||
@ -23,47 +44,21 @@
|
||||
>>> seek_read_full(sr, 100)
|
||||
'org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200826 http://www.iana.org/_css/2013.1/fonts/Inconsolata.otf application/octet-stream 200 LNMEDYOENSOEI5VPADCKL3CB6N3GWXPR - - 34054 620049 iana.warc.gz\\n'
|
||||
|
||||
# Buffered Reader Tests
|
||||
#=================================================================
|
||||
|
||||
#DecompressingBufferedReader readline()
|
||||
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline()
|
||||
' CDX N b a m s k r M S V g\\n'
|
||||
|
||||
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline()
|
||||
' CDX N b a m s k r M S V g\\n'
|
||||
|
||||
#DecompressingBufferedReader readline() with decompression (zipnum file, no header)
|
||||
>>> DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline()
|
||||
'com,example)/ 20140127171200 http://example.com text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1046 334 dupes.warc.gz\\n'
|
||||
|
||||
>>> BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read()
|
||||
'Example Domain'
|
||||
|
||||
# test very small block size
|
||||
>>> dbr = DecompressingBufferedReader(BytesIO('ABCDEFG\\nHIJKLMN\\nOPQR\\nXYZ'), block_size = 3)
|
||||
>>> dbr.readline(); dbr.readline(4); dbr.readline(); dbr.readline(); dbr.readline(2); dbr.readline(); dbr.readline()
|
||||
'ABCDEFG\\n'
|
||||
'HIJK'
|
||||
'LMN\\n'
|
||||
'OPQR\\n'
|
||||
'XY'
|
||||
'Z'
|
||||
''
|
||||
# seek, read, close
|
||||
>>> r = sr.seek(0); sr.read(10); sr.close()
|
||||
' CDX N b a'
|
||||
"""
|
||||
|
||||
|
||||
#=================================================================
|
||||
import os
|
||||
from io import BytesIO, open
|
||||
import re
|
||||
from io import BytesIO
|
||||
from pywb.utils.loaders import BlockLoader, HMACCookieMaker
|
||||
from pywb.utils.loaders import LimitReader, SeekableTextFileReader
|
||||
from pywb.utils.bufferedreaders import DecompressingBufferedReader
|
||||
|
||||
from pywb import get_test_dir
|
||||
#test_cdx_dir = os.path.dirname(os.path.realpath(__file__)) + '/../sample-data/'
|
||||
|
||||
test_cdx_dir = get_test_dir() + 'cdx/'
|
||||
test_zip_dir = get_test_dir() + 'zipcdx/'
|
||||
|
||||
def read_multiple(reader, inc_reads):
|
||||
result = None
|
||||
|
@ -1,6 +1,6 @@
|
||||
from gzip2 import GzipFile
|
||||
|
||||
from pywb.utils.timeutils import iso_date_to_timestamp
|
||||
from pywb.utils.bufferedreaders import DecompressingBufferedReader
|
||||
|
||||
from recordloader import ArcWarcRecordLoader
|
||||
|
||||
import surt
|
||||
@ -8,38 +8,151 @@ import hashlib
|
||||
import base64
|
||||
|
||||
import re
|
||||
import sys
|
||||
|
||||
from bisect import insort
|
||||
|
||||
|
||||
#=================================================================
|
||||
class ArchiveIndexer(object):
|
||||
"""
|
||||
Generate a CDX index for WARC and ARC files, both gzip chunk
|
||||
""" Generate a CDX index for WARC and ARC files, both gzip chunk
|
||||
compressed and uncompressed
|
||||
|
||||
The indexer will automatically detect format, and decompress
|
||||
if necessary
|
||||
"""
|
||||
def __init__(self, filename):
|
||||
def __init__(self, fileobj, filename, out=sys.stdout, sort=False):
|
||||
self.fh = fileobj
|
||||
self.filename = filename
|
||||
self.loader = ArcWarcRecordLoader()
|
||||
self.offset = 0
|
||||
self.known_format = None
|
||||
|
||||
if not out:
|
||||
out = sys.stdout
|
||||
|
||||
if sort:
|
||||
self.writer = SortedCDXWriter(out)
|
||||
else:
|
||||
self.writer = CDXWriter(out)
|
||||
|
||||
def make_index(self):
|
||||
self.fh = open(self.filename, 'r')
|
||||
""" Output a cdx index!
|
||||
"""
|
||||
|
||||
# assume gzip until proven otherwise
|
||||
gf = GzipFile(fileobj=self.fh)
|
||||
decomp_type = 'gzip'
|
||||
block_size = 16384
|
||||
|
||||
member = gf.read_member()
|
||||
reader = DecompressingBufferedReader(self.fh,
|
||||
block_size=block_size,
|
||||
decomp_type=decomp_type)
|
||||
self.offset = self.fh.tell()
|
||||
next_line = None
|
||||
|
||||
offset = self.fh.tell()
|
||||
self.writer.start()
|
||||
|
||||
print ' CDX N b a m s k r M S V g'
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
record = self._process_reader(reader, next_line)
|
||||
except EOFError:
|
||||
break
|
||||
|
||||
while member:
|
||||
offset = self._process_member(member, offset)
|
||||
member = gf.read_member()
|
||||
# for non-compressed, consume blank lines here
|
||||
if not reader.decompressor:
|
||||
next_line = self._consume_blanklines(reader)
|
||||
if next_line is None:
|
||||
# at end of file
|
||||
break
|
||||
|
||||
# reset reader for next member
|
||||
else:
|
||||
reader.read_next_member()
|
||||
finally:
|
||||
self.writer.end()
|
||||
|
||||
def _consume_blanklines(self, reader):
|
||||
""" Consume blank lines that are between records
|
||||
- For warcs, there are usually 2
|
||||
- For arcs, may be 1 or 0
|
||||
- For block gzipped files, these are at end of each gzip envelope
|
||||
and are included in record length which is the full gzip envelope
|
||||
- For uncompressed, they are between records and so are NOT part of
|
||||
the record length
|
||||
"""
|
||||
while True:
|
||||
line = reader.readline()
|
||||
if len(line) == 0:
|
||||
return None
|
||||
|
||||
if line.rstrip() == '':
|
||||
self.offset = self.fh.tell() - reader.rem_length()
|
||||
continue
|
||||
|
||||
return line
|
||||
|
||||
def _read_to_record_end(self, reader, record):
|
||||
""" Read to end of record and update current offset,
|
||||
which is used to compute record length
|
||||
- For compressed files, blank lines are consumed
|
||||
since they are part of record length
|
||||
- For uncompressed files, blank lines are read later,
|
||||
and not included in the record length
|
||||
"""
|
||||
|
||||
if reader.decompressor:
|
||||
self._consume_blanklines(reader)
|
||||
|
||||
self.offset = self.fh.tell() - reader.rem_length()
|
||||
|
||||
def _process_reader(self, reader, next_line):
|
||||
""" Use loader to parse the record from the reader stream
|
||||
Supporting warc and arc records
|
||||
"""
|
||||
record = self.loader.parse_record_stream(reader,
|
||||
next_line,
|
||||
self.known_format)
|
||||
|
||||
# Track known format for faster parsing of other records
|
||||
self.known_format = record.format
|
||||
|
||||
if record.format == 'warc':
|
||||
result = self._parse_warc_record(record)
|
||||
elif record.format == 'arc':
|
||||
result = self._parse_arc_record(record)
|
||||
|
||||
if not result:
|
||||
self.read_rest(record.stream)
|
||||
self._read_to_record_end(reader, record)
|
||||
return record
|
||||
|
||||
# generate digest if it doesn't exist and if not a revisit
|
||||
# if revisit, then nothing we can do here
|
||||
if result[-1] == '-' and record.rec_type != 'revisit':
|
||||
digester = hashlib.sha1()
|
||||
self.read_rest(record.stream, digester)
|
||||
result[-1] = base64.b32encode(digester.digest())
|
||||
else:
|
||||
num = self.read_rest(record.stream)
|
||||
|
||||
result.append('- -')
|
||||
|
||||
offset = self.offset
|
||||
self._read_to_record_end(reader, record)
|
||||
length = self.offset - offset
|
||||
|
||||
result.append(str(length))
|
||||
result.append(str(offset))
|
||||
result.append(self.filename)
|
||||
|
||||
self.writer.write(result)
|
||||
|
||||
return record
|
||||
|
||||
def _parse_warc_record(self, record):
|
||||
""" Parse warc record to be included in index, or
|
||||
return none if skipping this type of record
|
||||
"""
|
||||
if record.rec_type not in ('response', 'revisit',
|
||||
'metadata', 'resource'):
|
||||
return None
|
||||
@ -66,7 +179,6 @@ class ArchiveIndexer(object):
|
||||
if not digest:
|
||||
digest = '-'
|
||||
|
||||
#result = OrderedDict()
|
||||
return [surt.surt(url),
|
||||
timestamp,
|
||||
url,
|
||||
@ -75,16 +187,30 @@ class ArchiveIndexer(object):
|
||||
digest]
|
||||
|
||||
def _parse_arc_record(self, record):
|
||||
""" Parse arc record and return list of fields
|
||||
to include in index, or retur none if skipping this
|
||||
type of record
|
||||
"""
|
||||
if record.rec_type == 'arc_header':
|
||||
return None
|
||||
|
||||
url = record.rec_headers.get_header('uri')
|
||||
url = url.replace('\r', '%0D')
|
||||
url = url.replace('\n', '%0A')
|
||||
# replace formfeed
|
||||
url = url.replace('\x0c', '%0C')
|
||||
# replace nulls
|
||||
url = url.replace('\x00', '%00')
|
||||
|
||||
timestamp = record.rec_headers.get_header('archive-date')
|
||||
if len(timestamp) > 14:
|
||||
timestamp = timestamp[:14]
|
||||
status = record.status_headers.statusline.split(' ')[0]
|
||||
mime = record.rec_headers.get_header('content-type')
|
||||
mime = self._extract_mime(mime)
|
||||
|
||||
return [surt.surt(url),
|
||||
record.rec_headers.get_header('archive-date'),
|
||||
timestamp,
|
||||
url,
|
||||
mime,
|
||||
status,
|
||||
@ -93,61 +219,77 @@ class ArchiveIndexer(object):
|
||||
MIME_RE = re.compile('[; ]')
|
||||
|
||||
def _extract_mime(self, mime):
|
||||
""" Utility function to extract mimetype only
|
||||
from a full content type, removing charset settings
|
||||
"""
|
||||
if mime:
|
||||
mime = self.MIME_RE.split(mime, 1)[0]
|
||||
if not mime:
|
||||
mime = 'unk'
|
||||
return mime
|
||||
|
||||
def _process_member(self, member, offset):
|
||||
record = self.loader.parse_record_stream(member)
|
||||
|
||||
if record.format == 'warc':
|
||||
result = self._parse_warc_record(record)
|
||||
elif record.format == 'arc':
|
||||
result = self._parse_arc_record(record)
|
||||
|
||||
if not result:
|
||||
self.read_rest(member)
|
||||
new_offset = self.fh.tell()
|
||||
return new_offset
|
||||
|
||||
# generate digest if doesn't exist
|
||||
if result[-1] == '-':
|
||||
digester = hashlib.sha1()
|
||||
self.read_rest(record.stream, digester)
|
||||
result[-1] = base64.b32encode(digester.digest())
|
||||
else:
|
||||
self.read_rest(record.stream)
|
||||
|
||||
result.append('- -')
|
||||
|
||||
#self.read_rest(member)
|
||||
#print len(member.read(16))
|
||||
b = member.read(5)
|
||||
|
||||
new_offset = self.fh.tell()
|
||||
length = new_offset - offset
|
||||
|
||||
result.append(str(length))
|
||||
result.append(str(offset))
|
||||
result.append(self.filename)
|
||||
|
||||
print ' '.join(result)
|
||||
|
||||
return new_offset
|
||||
|
||||
def read_rest(self, member, digester=None):
|
||||
def read_rest(self, reader, digester=None):
|
||||
""" Read remainder of the stream
|
||||
If a digester is included, update it
|
||||
with the data read
|
||||
"""
|
||||
num = 0
|
||||
while True:
|
||||
b = member.read(8192)
|
||||
b = reader.read(8192)
|
||||
if not b:
|
||||
break
|
||||
num += len(b)
|
||||
if digester:
|
||||
digester.update(b)
|
||||
return num
|
||||
|
||||
|
||||
#=================================================================
|
||||
class CDXWriter(object):
|
||||
def __init__(self, out):
|
||||
self.out = out
|
||||
|
||||
def start(self):
|
||||
self.out.write(' CDX N b a m s k r M S V g\n')
|
||||
|
||||
def write(self, line):
|
||||
self.out.write(' '.join(line) + '\n')
|
||||
|
||||
def end(self):
|
||||
pass
|
||||
|
||||
|
||||
#=================================================================
|
||||
class SortedCDXWriter(object):
|
||||
def __init__(self, out):
|
||||
self.out = out
|
||||
self.sortlist = []
|
||||
|
||||
def start(self):
|
||||
self.out.write(' CDX N b a m s k r M S V g\n')
|
||||
pass
|
||||
|
||||
def write(self, line):
|
||||
line = ' '.join(line) + '\n'
|
||||
insort(self.sortlist, line)
|
||||
|
||||
def end(self):
|
||||
self.out.write(''.join(self.sortlist))
|
||||
|
||||
|
||||
#=================================================================
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if len(sys.argv) > 1:
|
||||
index = ArchiveIndexer(sys.argv[1])
|
||||
if len(sys.argv) < 2:
|
||||
print 'USAGE {0} <warc or file>'.format(sys.argv[0])
|
||||
exit(0)
|
||||
|
||||
filename = sys.argv[1]
|
||||
|
||||
if len(sys.argv) >= 3:
|
||||
sort = sys.argv[2] == '--sort'
|
||||
else:
|
||||
sort = False
|
||||
|
||||
with open(filename, 'r') as fh:
|
||||
index = ArchiveIndexer(fh, filename, sort=sort)
|
||||
index.make_index()
|
||||
|
@ -49,6 +49,9 @@ class ArcWarcRecordLoader:
|
||||
self.http_parser = StatusAndHeadersParser(['HTTP/1.0', 'HTTP/1.1'])
|
||||
|
||||
def load(self, url, offset, length):
|
||||
""" Load a single record from given url at offset with length
|
||||
and parse as either warc or arc record
|
||||
"""
|
||||
try:
|
||||
length = int(length)
|
||||
except:
|
||||
@ -64,8 +67,19 @@ class ArcWarcRecordLoader:
|
||||
|
||||
return self.parse_record_stream(stream)
|
||||
|
||||
def parse_record_stream(self, stream):
|
||||
(the_format, rec_headers) = self._detect_type_load_headers(stream)
|
||||
def parse_record_stream(self, stream,
|
||||
statusline=None, known_format=None):
|
||||
""" Parse file-like stream and return an ArcWarcRecord
|
||||
encapsulating the record headers, http headers (if any),
|
||||
and a stream limited to the remainder of the record.
|
||||
|
||||
Pass statusline and known_format to detect_type_loader_headers()
|
||||
to faciliate parsing.
|
||||
"""
|
||||
(the_format, rec_headers) = (self.
|
||||
_detect_type_load_headers(stream,
|
||||
statusline,
|
||||
known_format))
|
||||
|
||||
if the_format == 'arc':
|
||||
if rec_headers.get_header('uri').startswith('filedesc://'):
|
||||
@ -73,10 +87,18 @@ class ArcWarcRecordLoader:
|
||||
length = 0
|
||||
else:
|
||||
rec_type = 'response'
|
||||
length = int(rec_headers.get_header('length'))
|
||||
length = rec_headers.get_header('length')
|
||||
|
||||
elif the_format == 'warc':
|
||||
rec_type = rec_headers.get_header('WARC-Type')
|
||||
length = int(rec_headers.get_header('Content-Length'))
|
||||
length = rec_headers.get_header('Content-Length')
|
||||
|
||||
try:
|
||||
length = int(length)
|
||||
if length < 0:
|
||||
length = 0
|
||||
except ValueError:
|
||||
length = 0
|
||||
|
||||
# ================================================================
|
||||
# handle different types of records
|
||||
@ -114,25 +136,31 @@ class ArcWarcRecordLoader:
|
||||
# should always be valid, but just in case, still stream if
|
||||
# content-length was not set
|
||||
remains = length - status_headers.total_len
|
||||
if remains > 0:
|
||||
if remains >= 0:
|
||||
stream = LimitReader.wrap_stream(stream, remains)
|
||||
|
||||
return ArcWarcRecord(the_format, rec_type,
|
||||
rec_headers, stream, status_headers)
|
||||
|
||||
def _detect_type_load_headers(self, stream):
|
||||
"""
|
||||
Try parsing record as WARC, then try parsing as ARC.
|
||||
def _detect_type_load_headers(self, stream,
|
||||
statusline=None, known_format=None):
|
||||
""" If known_format is specified ('warc' or 'arc'),
|
||||
parse only as that format.
|
||||
|
||||
Otherwise, try parsing record as WARC, then try parsing as ARC.
|
||||
if neither one succeeds, we're out of luck.
|
||||
"""
|
||||
|
||||
statusline = None
|
||||
|
||||
if known_format != 'arc':
|
||||
# try as warc first
|
||||
try:
|
||||
rec_headers = self.warc_parser.parse(stream)
|
||||
rec_headers = self.warc_parser.parse(stream, statusline)
|
||||
return 'warc', rec_headers
|
||||
except StatusAndHeadersParserException as se:
|
||||
if known_format == 'warc':
|
||||
msg = 'Invalid WARC record, first line: '
|
||||
raise ArchiveLoadFailed(msg + str(se.statusline))
|
||||
|
||||
statusline = se.statusline
|
||||
pass
|
||||
|
||||
@ -141,8 +169,11 @@ class ArcWarcRecordLoader:
|
||||
rec_headers = self.arc_parser.parse(stream, statusline)
|
||||
return 'arc', rec_headers
|
||||
except StatusAndHeadersParserException as se:
|
||||
msg = 'Unknown archive format, first line: ' + str(se.statusline)
|
||||
raise ArchiveLoadFailed(msg)
|
||||
if known_format == 'arc':
|
||||
msg = 'Invalid WARC record, first line: '
|
||||
else:
|
||||
msg = 'Unknown archive format, first line: '
|
||||
raise ArchiveLoadFailed(msg + str(se.statusline))
|
||||
|
||||
|
||||
#=================================================================
|
||||
@ -155,16 +186,25 @@ class ARCHeadersParser:
|
||||
total_read = 0
|
||||
|
||||
# if headerline passed in, use that
|
||||
if not headerline:
|
||||
if headerline is None:
|
||||
headerline = stream.readline()
|
||||
|
||||
total_read = len(headerline)
|
||||
|
||||
if total_read == 0:
|
||||
raise EOFError()
|
||||
|
||||
headerline = headerline.rstrip()
|
||||
|
||||
parts = headerline.split()
|
||||
|
||||
headernames = self.headernames
|
||||
|
||||
# if arc header, consume next two lines
|
||||
if headerline.startswith('filedesc://'):
|
||||
stream.readline() # skip version
|
||||
stream.readline() # skip header spec, use preset one
|
||||
|
||||
parts = headerline.split(' ')
|
||||
|
||||
if len(parts) != len(headernames):
|
||||
msg = 'Wrong # of headers, expected arc headers {0}, Found {1}'
|
||||
msg = msg.format(headernames, parts)
|
||||
|
81
pywb/warc/test/test_indexing.py
Normal file
81
pywb/warc/test/test_indexing.py
Normal file
@ -0,0 +1,81 @@
|
||||
r"""
|
||||
|
||||
# warc.gz
|
||||
>>> print_cdx_index('example.warc.gz')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1043 333 example.warc.gz
|
||||
com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 553 1864 example.warc.gz
|
||||
org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 577 2907 example.warc.gz
|
||||
|
||||
# warc
|
||||
>>> print_cdx_index('example.warc')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1987 460 example.warc
|
||||
com,example)/?example=1 20140103030341 http://example.com?example=1 warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 896 3161 example.warc
|
||||
org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 854 4771 example.warc
|
||||
|
||||
# arc.gz
|
||||
>>> print_cdx_index('example.arc.gz')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/ 20140216050221 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 856 171 example.arc.gz
|
||||
|
||||
# arc
|
||||
>>> print_cdx_index('example.arc')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/ 20140216050221 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1656 151 example.arc
|
||||
|
||||
# wget warc (w/ metadata)
|
||||
>>> print_cdx_index('example-wget-1-14.warc.gz')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/ 20140216012908 http://example.com/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1151 792 example-wget-1-14.warc.gz
|
||||
metadata)/gnu.org/software/wget/warc/manifest.txt 20140216012908 metadata://gnu.org/software/wget/warc/MANIFEST.txt text/plain 200 SWUF4CK2XMZSOKSA7SDT7M7NUGWH2TRE - - 315 1943 example-wget-1-14.warc.gz
|
||||
metadata)/gnu.org/software/wget/warc/wget_arguments.txt 20140216012908 metadata://gnu.org/software/wget/warc/wget_arguments.txt text/plain 200 UCXDCGORD6K4RJT5NUQGKE2PKEG4ZZD6 - - 340 2258 example-wget-1-14.warc.gz
|
||||
metadata)/gnu.org/software/wget/warc/wget.log 20140216012908 metadata://gnu.org/software/wget/warc/wget.log text/plain 200 2ULE2LD5UOWDXGACCT624TU5BVKACRQ4 - - 599 2598 example-wget-1-14.warc.gz
|
||||
|
||||
# bad arcs -- test error edge cases
|
||||
>>> print_cdx_index('bad.arc')
|
||||
CDX N b a m s k r M S V g
|
||||
com,example)/ 20140401000000 http://example.com/ text/html 204 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 67 134 bad.arc
|
||||
com,example)/ 20140401000000 http://example.com/ text/html 204 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 68 202 bad.arc
|
||||
"""
|
||||
|
||||
from pywb import get_test_dir
|
||||
from pywb.warc.archiveindexer import ArchiveIndexer
|
||||
|
||||
from io import BytesIO
|
||||
import sys
|
||||
|
||||
TEST_CDX_DIR = get_test_dir() + 'cdx/'
|
||||
TEST_WARC_DIR = get_test_dir() + 'warcs/'
|
||||
|
||||
def read_fully(cdx):
|
||||
with open(TEST_CDX_DIR + cdx) as fh:
|
||||
curr = BytesIO()
|
||||
while True:
|
||||
b = fh.read()
|
||||
if not b:
|
||||
break
|
||||
curr.write(b)
|
||||
return curr.getvalue()
|
||||
|
||||
def cdx_index(warc, sort=False):
|
||||
buff = BytesIO()
|
||||
with open(TEST_WARC_DIR + warc) as fh:
|
||||
indexer = ArchiveIndexer(fh, warc,
|
||||
out=buff,
|
||||
sort=sort)
|
||||
|
||||
indexer.make_index()
|
||||
|
||||
return buff.getvalue()
|
||||
|
||||
def print_cdx_index(warc, sort=False):
|
||||
sys.stdout.write(cdx_index(warc, sort))
|
||||
|
||||
def assert_cdx_match(cdx, warc, sort=False):
|
||||
assert read_fully(cdx) == cdx_index(warc, sort)
|
||||
|
||||
def test_sorted_warc_gz():
|
||||
assert_cdx_match('example.cdx', 'example.warc.gz', sort=True)
|
||||
assert_cdx_match('dupes.cdx', 'dupes.warc.gz', sort=True)
|
||||
assert_cdx_match('iana.cdx', 'iana.warc.gz', sort=True)
|
@ -126,8 +126,8 @@ StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Acc
|
||||
<!doctype html>
|
||||
<html>
|
||||
|
||||
# Test cdx w/ revisit
|
||||
>>> load_from_cdx_test('com,example)/?example=1 20140103030341 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 553 1864 example.warc.gz 1043 333 example.warc.gz')
|
||||
# Test cdx w/ revisit, also no length specified
|
||||
>>> load_from_cdx_test('com,example)/?example=1 20140103030341 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - - 1864 example.warc.gz 1043 333 example.warc.gz')
|
||||
StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Accept-Ranges', 'bytes'),
|
||||
('Cache-Control', 'max-age=604800'),
|
||||
('Content-Type', 'text/html'),
|
||||
@ -159,6 +159,7 @@ StatusAndHeaders(protocol = 'HTTP/1.1', statusline = '200 OK', headers = [ ('Acc
|
||||
<!doctype html>
|
||||
<html>
|
||||
|
||||
|
||||
# Test Url Agnostic Revisit Resolving
|
||||
# ==============================================================================
|
||||
>>> load_from_cdx_test(URL_AGNOSTIC_ORIG_CDX)
|
||||
@ -218,6 +219,20 @@ Exception: ArchiveLoadFailed
|
||||
Exception: ArchiveLoadFailed
|
||||
|
||||
|
||||
# Test EOF
|
||||
>>> parse_stream_error(stream=None, statusline='', known_format='warc')
|
||||
Exception: EOFError
|
||||
|
||||
>>> parse_stream_error(stream=None, statusline='', known_format='arc')
|
||||
Exception: EOFError
|
||||
|
||||
# Invalid ARC
|
||||
>>> parse_stream_error(stream=None, statusline='ABC', known_format='arc')
|
||||
Exception: ArchiveLoadFailed
|
||||
|
||||
# Invalid WARC
|
||||
>>> parse_stream_error(stream=None, statusline='ABC', known_format='warc')
|
||||
Exception: ArchiveLoadFailed
|
||||
"""
|
||||
|
||||
import os
|
||||
@ -264,6 +279,7 @@ def load_test_archive(test_file, offset, length):
|
||||
def load_orig_cdx(self):
|
||||
return [CDXObject(URL_AGNOSTIC_ORIG_CDX)]
|
||||
|
||||
|
||||
#==============================================================================
|
||||
def load_from_cdx_test(cdx):
|
||||
resolve_loader = ResolvingLoader(test_warc_dir)
|
||||
@ -276,6 +292,15 @@ def load_from_cdx_test(cdx):
|
||||
except ArchiveLoadFailed as e:
|
||||
print 'Exception: ' + e.__class__.__name__
|
||||
|
||||
|
||||
#==============================================================================
|
||||
def parse_stream_error(**params):
|
||||
try:
|
||||
return ArcWarcRecordLoader().parse_record_stream(**params)
|
||||
except Exception as e:
|
||||
print 'Exception: ' + e.__class__.__name__
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
doctest.testmod()
|
||||
|
7
sample_archive/warcs/bad.arc
Normal file
7
sample_archive/warcs/bad.arc
Normal file
@ -0,0 +1,7 @@
|
||||
filedesc://bad.arc.gz 127.0.0.1 20140301000000 text/plain -1
|
||||
1 0 Bad Capture
|
||||
URL IP-address Archive-date Content-type Archive-length
|
||||
|
||||
http://example.com/ 93.184.216.119 201404010000000000 text/html -1
|
||||
|
||||
http://example.com/ 93.184.216.119 201404010000000000 text/html abc
|
Loading…
x
Reference in New Issue
Block a user