1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

py3: make pywb.utils work with python 3!

This commit is contained in:
Ilya Kreymer 2016-02-16 14:52:20 -08:00
parent 7cf81935e1
commit 3c85f7b7ac
12 changed files with 169 additions and 117 deletions

View File

@ -4,6 +4,13 @@ Utility functions for performing binary search over a sorted text file
from collections import deque
import itertools
import six
import sys
if six.PY3:
def cmp(a, b):
return (a > b) - (a < b)
#=================================================================
@ -18,10 +25,10 @@ def binsearch_offset(reader, key, compare_func=cmp, block_size=8192):
min_ = 0
reader.seek(0, 2)
max_ = reader.tell() / block_size
max_ = int(reader.tell() / block_size)
while max_ - min_ > 1:
mid = min_ + ((max_ - min_) / 2)
mid = int(min_ + ((max_ - min_) / 2))
reader.seek(mid * block_size)
if mid > 0:
@ -135,7 +142,7 @@ def iter_prefix(reader, key):
#=================================================================
def iter_exact(reader, key, token=' '):
def iter_exact(reader, key, token=b' '):
"""
Create an iterator which iterates over lines where the first field matches
the 'key', equivalent to token + sep prefix.

View File

@ -120,7 +120,7 @@ class BufferedReader(object):
call will fill buffer anew.
"""
if length == 0:
return ''
return b''
self._fillbuff()
buff = self.buff.read(length)
@ -134,13 +134,13 @@ class BufferedReader(object):
at buffer boundary.
"""
if length == 0:
return ''
return b''
self._fillbuff()
linebuff = self.buff.readline(length)
# we may be at a boundary
while not linebuff.endswith('\n'):
while not linebuff.endswith(b'\n'):
if length:
length -= len(linebuff)
if length <= 0:
@ -195,7 +195,7 @@ class DecompressingBufferedReader(BufferedReader):
#=================================================================
class ChunkedDataException(Exception):
def __init__(self, msg, data=''):
def __init__(self, msg, data=b''):
Exception.__init__(self, msg)
self.data = data
@ -249,19 +249,19 @@ class ChunkedDataReader(BufferedReader):
def _try_decode(self, length_header):
# decode length header
try:
chunk_size = int(length_header.strip().split(';')[0], 16)
chunk_size = int(length_header.strip().split(b';')[0], 16)
except ValueError:
raise ChunkedDataException("Couldn't decode length header " +
raise ChunkedDataException(b"Couldn't decode length header " +
length_header)
if not chunk_size:
# chunk_size 0 indicates end of file
self.all_chunks_read = True
self._process_read('')
self._process_read(b'')
return
data_len = 0
data = ''
data = b''
# read chunk
while data_len < chunk_size:
@ -285,8 +285,8 @@ class ChunkedDataReader(BufferedReader):
# it should end in \r\n
if not self.all_chunks_read:
clrf = self.stream.read(2)
if clrf != '\r\n':
raise ChunkedDataException("Chunk terminator not found.",
if clrf != b'\r\n':
raise ChunkedDataException(b"Chunk terminator not found.",
data)
# hand to base class for further processing

View File

@ -2,9 +2,9 @@
"""
import surt
import urlparse
import six.moves.urllib.parse as urlparse
from wbexception import BadRequestException
from pywb.utils.wbexception import BadRequestException
#=================================================================
@ -128,11 +128,11 @@ def calc_search_range(url, match_type, surt_ordered=True, url_canon=None):
('example.com/', 'example.com0')
# errors: domain range not supported
>>> calc_search_range('http://example.com/path/file.html', 'domain', False)
>>> calc_search_range('http://example.com/path/file.html', 'domain', False) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
UrlCanonicalizeException: matchType=domain unsupported for non-surt
>>> calc_search_range('http://example.com/path/file.html', 'blah', False)
>>> calc_search_range('http://example.com/path/file.html', 'blah', False) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
UrlCanonicalizeException: Invalid match_type: blah

View File

@ -1,5 +1,5 @@
import pkgutil
from loaders import load_yaml_config
from pywb.utils.loaders import load_yaml_config
#=================================================================

View File

@ -5,12 +5,15 @@ local and remote access
import os
import hmac
import urllib
#import urllib2
import requests
import urlparse
import six
import six.moves.urllib.request as urllib_req
import six.moves.urllib.parse as urlparse
import time
import pkg_resources
from io import open, BytesIO
try:
@ -30,7 +33,7 @@ def to_file_url(filename):
""" Convert a filename to a file:// url
"""
url = os.path.abspath(filename)
url = urlparse.urljoin('file:', urllib.pathname2url(url))
url = urlparse.urljoin('file:', urllib_req.pathname2url(url))
return url
@ -80,7 +83,7 @@ def extract_post_query(method, mime, length, stream, buffered_stream=None):
buffered_stream.write(post_query)
buffered_stream.seek(0)
post_query = urllib.unquote_plus(post_query)
post_query = urlparse.unquote_plus(post_query)
return post_query
@ -210,7 +213,7 @@ class LocalFileLoader(object):
# convert to filename
if url.startswith('file://'):
file_only = True
url = urllib.url2pathname(url[len('file://'):])
url = urllib_req.url2pathname(url[len('file://'):])
try:
# first, try as file
@ -253,7 +256,7 @@ class HttpLoader(object):
headers['Range'] = BlockLoader._make_range_header(offset, length)
if self.cookie_maker:
if isinstance(self.cookie_maker, basestring):
if isinstance(self.cookie_maker, six.string_types):
headers['Cookie'] = self.cookie_maker
else:
headers['Cookie'] = self.cookie_maker.make()
@ -311,14 +314,14 @@ class HMACCookieMaker(object):
self.duration = duration
def make(self, extra_id=''):
expire = str(long(time.time() + self.duration))
expire = str(int(time.time() + self.duration))
if extra_id:
msg = extra_id + '-' + expire
else:
msg = expire
hmacdigest = hmac.new(self.key, msg)
hmacdigest = hmac.new(self.key.encode('utf-8'), msg.encode('utf-8'))
hexdigest = hmacdigest.hexdigest()
if extra_id:
@ -349,7 +352,7 @@ class LimitReader(object):
length = self.limit
if length == 0:
return ''
return b''
buff = self.stream.read(length)
self.limit -= len(buff)
@ -362,7 +365,7 @@ class LimitReader(object):
length = self.limit
if length == 0:
return ''
return b''
buff = self.stream.readline(length)
self.limit -= len(buff)

View File

@ -4,6 +4,7 @@ Representation and parsing of HTTP-style status + headers
import pprint
from copy import copy
from six.moves import range
#=================================================================
@ -36,7 +37,7 @@ class StatusAndHeaders(object):
return old header value, if any
"""
name_lower = name.lower()
for index in xrange(len(self.headers) - 1, -1, -1):
for index in range(len(self.headers) - 1, -1, -1):
curr_name, curr_value = self.headers[index]
if curr_name.lower() == name_lower:
self.headers[index] = (curr_name, value)
@ -52,7 +53,7 @@ class StatusAndHeaders(object):
"""
header_dict = copy(header_dict)
for index in xrange(len(self.headers) - 1, -1, -1):
for index in range(len(self.headers) - 1, -1, -1):
curr_name, curr_value = self.headers[index]
name_lower = curr_name.lower()
if name_lower in header_dict:
@ -68,7 +69,7 @@ class StatusAndHeaders(object):
return True if header removed, False otherwise
"""
name_lower = name.lower()
for index in xrange(len(self.headers) - 1, -1, -1):
for index in range(len(self.headers) - 1, -1, -1):
if self.headers[index][0].lower() == name_lower:
del self.headers[index]
return True
@ -93,7 +94,7 @@ class StatusAndHeaders(object):
code = int(code)
assert(code > 0)
return True
except ValueError, AssertionError:
except(ValueError, AssertionError):
self.statusline = valid_statusline
return False

View File

@ -82,13 +82,13 @@ test_cdx_dir = get_test_dir() + 'cdx/'
def print_binsearch_results(key, iter_func):
with open(test_cdx_dir + 'iana.cdx', 'rb') as cdx:
for line in iter_func(cdx, key):
print line
for line in iter_func(cdx, key.encode('utf-8')):
print(line.decode('utf-8'))
def print_binsearch_results_range(key, end_key, iter_func, prev_size=0):
with open(test_cdx_dir + 'iana.cdx', 'rb') as cdx:
for line in iter_func(cdx, key, end_key, prev_size=prev_size):
print line
for line in iter_func(cdx, key.encode('utf-8'), end_key.encode('utf-8'), prev_size=prev_size):
print(line.decode('utf-8'))
if __name__ == "__main__":

View File

@ -3,19 +3,19 @@ r"""
#=================================================================
# DecompressingBufferedReader readline()
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline()
>>> print_str(DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb')).readline())
' CDX N b a m s k r M S V g\n'
# detect not compressed
>>> DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline()
>>> print_str(DecompressingBufferedReader(open(test_cdx_dir + 'iana.cdx', 'rb'), decomp_type = 'gzip').readline())
' CDX N b a m s k r M S V g\n'
# decompress with on the fly compression, default gzip compression
>>> DecompressingBufferedReader(BytesIO(compress('ABC\n1234\n'))).read()
>>> print_str(DecompressingBufferedReader(BytesIO(compress('ABC\n1234\n'))).read())
'ABC\n1234\n'
# decompress with on the fly compression, default 'inflate' compression
>>> DecompressingBufferedReader(BytesIO(compress_alt('ABC\n1234\n')), decomp_type='deflate').read()
>>> print_str(DecompressingBufferedReader(BytesIO(compress_alt('ABC\n1234\n')), decomp_type='deflate').read())
'ABC\n1234\n'
# error: invalid compress type
@ -23,26 +23,18 @@ r"""
Traceback (most recent call last):
Exception: Decompression type not supported: bzip2
# error: compressed member, followed by not compressed -- considered invalid
>>> x = DecompressingBufferedReader(BytesIO(compress('ABC') + '123'), decomp_type = 'gzip')
>>> b = x.read()
>>> b = x.read_next_member()
>>> x.read()
Traceback (most recent call last):
error: Error -3 while decompressing: incorrect header check
# invalid output when reading compressed data as not compressed
>>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = None).read() != 'ABC'
>>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = None).read() != b'ABC'
True
# DecompressingBufferedReader readline() with decompression (zipnum file, no header)
>>> DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline()
>>> print_str(DecompressingBufferedReader(open(test_zip_dir + 'zipnum-sample.cdx.gz', 'rb'), decomp_type = 'gzip').readline())
'com,example)/ 20140127171200 http://example.com text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1046 334 dupes.warc.gz\n'
# test very small block size
>>> dbr = DecompressingBufferedReader(BytesIO('ABCDEFG\nHIJKLMN\nOPQR\nXYZ'), block_size = 3)
>>> dbr.readline(); dbr.readline(4); dbr.readline(); dbr.readline(); dbr.readline(2); dbr.readline(); dbr.readline()
>>> dbr = DecompressingBufferedReader(BytesIO(b'ABCDEFG\nHIJKLMN\nOPQR\nXYZ'), block_size = 3)
>>> print_str(dbr.readline()); print_str(dbr.readline(4)); print_str(dbr.readline()); print_str(dbr.readline()); print_str(dbr.readline(2)); print_str(dbr.readline()); print_str(dbr.readline())
'ABCDEFG\n'
'HIJK'
'LMN\n'
@ -52,8 +44,8 @@ True
''
# test zero length reads
>>> x = DecompressingBufferedReader(LimitReader(BytesIO('\r\n'), 1))
>>> x.readline(0); x.read(0)
>>> x = DecompressingBufferedReader(LimitReader(BytesIO(b'\r\n'), 1))
>>> print_str(x.readline(0)); print_str(x.read(0))
''
''
@ -61,71 +53,69 @@ True
#=================================================================
Properly formatted chunked data:
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n0\r\n\r\n"));
>>> c.read() + c.read() + c.read()
>>> c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n0\r\n\r\n"));
>>> print_str(c.read() + c.read() + c.read())
'1234'
Non-chunked data:
>>> ChunkedDataReader(BytesIO("xyz123!@#")).read()
>>> print_str(ChunkedDataReader(BytesIO(b"xyz123!@#")).read())
'xyz123!@#'
Non-chunked, compressed data, specify decomp_type
>>> ChunkedDataReader(BytesIO(compress('ABCDEF')), decomp_type='gzip').read()
>>> print_str(ChunkedDataReader(BytesIO(compress('ABCDEF')), decomp_type='gzip').read())
'ABCDEF'
Non-chunked, compressed data, specifiy compression seperately
>>> c = ChunkedDataReader(BytesIO(compress('ABCDEF'))); c.set_decomp('gzip'); c.read()
>>> c = ChunkedDataReader(BytesIO(compress('ABCDEF'))); c.set_decomp('gzip'); print_str(c.read())
'ABCDEF'
Non-chunked, compressed data, wrap in DecompressingBufferedReader
>>> DecompressingBufferedReader(ChunkedDataReader(BytesIO(compress('\nABCDEF\nGHIJ')))).read()
>>> print_str(DecompressingBufferedReader(ChunkedDataReader(BytesIO(compress('\nABCDEF\nGHIJ')))).read())
'\nABCDEF\nGHIJ'
Chunked compressed data
Split compressed stream into 10-byte chunk and a remainder chunk
>>> b = compress('ABCDEFGHIJKLMNOP')
>>> l = len(b)
>>> in_ = format(10, 'x') + "\r\n" + b[:10] + "\r\n" + format(l - 10, 'x') + "\r\n" + b[10:] + "\r\n0\r\n\r\n"
>>> in_ = format(10, 'x').encode('utf-8') + b"\r\n" + b[:10] + b"\r\n" + format(l - 10, 'x').encode('utf-8') + b"\r\n" + b[10:] + b"\r\n0\r\n\r\n"
>>> c = ChunkedDataReader(BytesIO(in_), decomp_type='gzip')
>>> c.read()
>>> print_str(c.read())
'ABCDEFGHIJKLMNOP'
Starts like chunked data, but isn't:
>>> c = ChunkedDataReader(BytesIO("1\r\nxyz123!@#"));
>>> c.read() + c.read()
>>> c = ChunkedDataReader(BytesIO(b"1\r\nxyz123!@#"));
>>> print_str(c.read() + c.read())
'1\r\nx123!@#'
Chunked data cut off part way through:
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"));
>>> c.read() + c.read()
>>> c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n4\r\n12"));
>>> print_str(c.read() + c.read())
'123412'
Zero-Length chunk:
>>> ChunkedDataReader(BytesIO("0\r\n\r\n")).read()
>>> print_str(ChunkedDataReader(BytesIO(b"0\r\n\r\n")).read())
''
Chunked data cut off with exceptions
>>> c = ChunkedDataReader(BytesIO("4\r\n1234\r\n4\r\n12"), raise_exceptions=True)
>>> c.read() + c.read()
Traceback (most recent call last):
ChunkedDataException: Ran out of data before end of chunk
"""
from io import BytesIO
from pywb.utils.bufferedreaders import ChunkedDataReader
from pywb.utils.bufferedreaders import ChunkedDataReader, ChunkedDataException
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.utils.loaders import LimitReader
from pywb import get_test_dir
import six
import zlib
import pytest
test_cdx_dir = get_test_dir() + 'cdx/'
test_zip_dir = get_test_dir() + 'zipcdx/'
def compress(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
@ -134,6 +124,7 @@ def compress(buff):
# plain "inflate"
def compress_alt(buff):
buff = buff.encode('utf-8')
compressobj = zlib.compressobj(6, zlib.DEFLATED)
compressed = compressobj.compress(buff)
compressed += compressobj.flush()
@ -142,6 +133,32 @@ def compress_alt(buff):
return compressed
# Errors
def test_err_compress_mix():
# error: compressed member, followed by not compressed -- considered invalid
x = DecompressingBufferedReader(BytesIO(compress('ABC') + b'123'), decomp_type = 'gzip')
b = x.read()
b = x.read_next_member()
with pytest.raises(zlib.error):
x.read()
#error: Error -3 while decompressing: incorrect header check
def test_err_chunk_cut_off():
# Chunked data cut off with exceptions
c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n4\r\n12"), raise_exceptions=True)
with pytest.raises(ChunkedDataException):
c.read() + c.read()
#ChunkedDataException: Ran out of data before end of chunk
def print_str(string):
return string.decode('utf-8') if six.PY3 else string
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -1,30 +1,30 @@
#=================================================================
r"""
# LimitReader Tests
>>> LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 10).read(26)
>>> LimitReader(StringIO('abcdefghjiklmnopqrstuvwxyz'), 10).read(26)
'abcdefghji'
>>> LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 8).readline(26)
>>> LimitReader(StringIO('abcdefghjiklmnopqrstuvwxyz'), 8).readline(26)
'abcdefgh'
>>> LimitReader.wrap_stream(LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 8), 4).readline(26)
>>> LimitReader.wrap_stream(LimitReader(StringIO('abcdefghjiklmnopqrstuvwxyz'), 8), 4).readline(26)
'abcd'
>>> read_multiple(LimitReader(BytesIO('abcdefghjiklmnopqrstuvwxyz'), 10), [2, 2, 20])
>>> read_multiple(LimitReader(StringIO('abcdefghjiklmnopqrstuvwxyz'), 10), [2, 2, 20])
'efghji'
# zero-length read
>>> LimitReader(BytesIO('a'), 0).readline(0)
>>> print_str(LimitReader(StringIO('a'), 0).readline(0))
''
# don't wrap if invalid length
>>> b = BytesIO('b')
>>> b = StringIO('b')
>>> LimitReader.wrap_stream(b, 'abc') == b
True
# BlockLoader Tests (includes LimitReader)
# Ensure attempt to read more than 100 bytes, reads exactly 100 bytes
>>> len(BlockLoader().load(test_cdx_dir + 'iana.cdx', 0, 100).read('400'))
>>> len(BlockLoader().load(test_cdx_dir + 'iana.cdx', 0, 100).read(400))
100
# no length specified, read full amount requested
@ -32,26 +32,26 @@ True
400
# no such file
>>> len(BlockLoader().load('_x_no_such_file_', 0, 100).read('400'))
#>>> len(BlockLoader().load('_x_no_such_file_', 0, 100).read(400)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
IOError: [Errno 2] No such file or directory: '_x_no_such_file_'
# HMAC Cookie Maker
>>> BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read()
>>> print_str(BlockLoader(HMACCookieMaker('test', 'test', 5)).load('http://example.com', 41, 14).read())
'Example Domain'
# fixed cookie, range request
>>> BlockLoader('some=value').load('http://example.com', 41, 14).read()
>>> print_str(BlockLoader('some=value').load('http://example.com', 41, 14).read())
'Example Domain'
# range request
>>> BlockLoader().load('http://example.com', 1262).read()
>>> print_str(BlockLoader().load('http://example.com', 1262).read())
'</html>\n'
# unknown loader error
>>> BlockLoader().load('foo://example.com', 10).read()
Traceback (most recent call last):
IOError: No Loader for type: foo
#>>> BlockLoader().load('foo://example.com', 10).read() # doctest: +IGNORE_EXCEPTION_DETAIL
#Traceback (most recent call last):
#IOError: No Loader for type: foo
# test with extra id, ensure 4 parts of the A-B=C-D form are present
>>> len(re.split('[-=]', HMACCookieMaker('test', 'test', 5).make('extra')))
@ -84,42 +84,42 @@ IOError: No Loader for type: foo
# correct POST data
>>> post_data = 'foo=bar&dir=%2Fbaz'
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data), BytesIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data), StringIO(post_data))
'foo=bar&dir=/baz'
# unsupported method
>>> extract_post_query('PUT', 'application/x-www-form-urlencoded', len(post_data), BytesIO(post_data))
>>> extract_post_query('PUT', 'application/x-www-form-urlencoded', len(post_data), StringIO(post_data))
# unsupported type
>>> extract_post_query('POST', 'text/plain', len(post_data), BytesIO(post_data))
>>> extract_post_query('POST', 'text/plain', len(post_data), StringIO(post_data))
# invalid length
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', 'abc', BytesIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', 0, BytesIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', 'abc', StringIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', 0, StringIO(post_data))
# length too short
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data) - 4, BytesIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data) - 4, StringIO(post_data))
'foo=bar&dir=%2'
# length too long
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data) + 4, BytesIO(post_data))
>>> extract_post_query('POST', 'application/x-www-form-urlencoded', len(post_data) + 4, StringIO(post_data))
'foo=bar&dir=/baz'
# test read_last_line
>>> read_last_line(BytesIO('A\nB\nC'))
>>> print_str(read_last_line(BytesIO(b'A\nB\nC')))
'C'
>>> read_last_line(BytesIO('Some Line\nLonger Line\nLongest Last Line LL'), offset=8)
>>> print_str(read_last_line(BytesIO(b'Some Line\nLonger Line\nLongest Last Line LL'), offset=8))
'Longest Last Line LL'
>>> read_last_line(BytesIO('A\nBC'))
>>> print_str(read_last_line(BytesIO(b'A\nBC')))
'BC'
>>> read_last_line(BytesIO('A\nBC\n'))
>>> print_str(read_last_line(BytesIO(b'A\nBC\n')))
'BC\n'
>>> read_last_line(BytesIO('ABC'))
>>> print_str(read_last_line(BytesIO(b'ABC')))
'ABC'
"""
@ -130,7 +130,10 @@ import re
import os
import pytest
import six
from six import StringIO
from io import BytesIO
from pywb.utils.loaders import BlockLoader, HMACCookieMaker, to_file_url
from pywb.utils.loaders import LimitReader, extract_client_cookie, extract_post_query
from pywb.utils.loaders import append_post_query, read_last_line
@ -165,8 +168,27 @@ def test_s3_read_1():
assert len(buff) == 2526
reader = DecompressingBufferedReader(BytesIO(buff))
assert reader.readline() == 'WARC/1.0\r\n'
assert reader.readline() == 'WARC-Type: response\r\n'
assert reader.readline() == b'WARC/1.0\r\n'
assert reader.readline() == b'WARC-Type: response\r\n'
# Error
def test_err_no_such_file():
# no such file
with pytest.raises(IOError):
len(BlockLoader().load('_x_no_such_file_', 0, 100).read('400'))
def test_err_unknown_loader():
# unknown loader error
with pytest.raises(IOError):
BlockLoader().load('foo://example.com', 10).read()
#IOError: No Loader for type: foo
def print_str(string):
return string.decode('utf-8') if six.PY3 else string
if __name__ == "__main__":
import doctest

View File

@ -1,5 +1,5 @@
"""
>>> st1 = StatusAndHeadersParser(['HTTP/1.0']).parse(BytesIO(status_headers_1))
>>> st1 = StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_1))
>>> st1
StatusAndHeaders(protocol = 'HTTP/1.0', statusline = '200 OK', headers = [ ('Content-Type', 'ABC'),
('Some', 'Value'),
@ -12,30 +12,30 @@ StatusAndHeaders(protocol = '', statusline = '206 Partial Content', headers = [
('Accept-Ranges', 'bytes')])
# other protocol expected
>>> StatusAndHeadersParser(['Other']).parse(BytesIO(status_headers_1))
>>> StatusAndHeadersParser(['Other']).parse(StringIO(status_headers_1)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
StatusAndHeadersParserException: Expected Status Line starting with ['Other'] - Found: HTTP/1.0 200 OK
>>> StatusAndHeadersParser(['Other'], verify=False).parse(BytesIO(status_headers_1))
>>> StatusAndHeadersParser(['Other'], verify=False).parse(StringIO(status_headers_1))
StatusAndHeaders(protocol = 'HTTP/1.0', statusline = '200 OK', headers = [ ('Content-Type', 'ABC'),
('Some', 'Value'),
('Multi-Line', 'Value1 Also This')])
# verify protocol line
>>> StatusAndHeadersParser(['HTTP/1.0'], verify=True).parse(BytesIO(unknown_protocol_headers))
>>> StatusAndHeadersParser(['HTTP/1.0'], verify=True).parse(StringIO(unknown_protocol_headers)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
StatusAndHeadersParserException: Expected Status Line starting with ['HTTP/1.0'] - Found: OtherBlah
# allow unexpected/invalid protocol line
>>> StatusAndHeadersParser(['HTTP/1.0'], verify=False).parse(BytesIO(unknown_protocol_headers))
>>> StatusAndHeadersParser(['HTTP/1.0'], verify=False).parse(StringIO(unknown_protocol_headers))
StatusAndHeaders(protocol = 'OtherBlah', statusline = 'OtherBlah', headers = [('Foo', 'Bar')])
# test equality op
>>> st1 == StatusAndHeadersParser(['HTTP/1.0']).parse(BytesIO(status_headers_1))
>>> st1 == StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_1))
True
# replace header, print new headers
@ -55,15 +55,15 @@ True
False
# empty
>>> st2 = StatusAndHeadersParser(['HTTP/1.0']).parse(BytesIO(status_headers_2)); x = st2.validate_statusline('204 No Content'); st2
>>> st2 = StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_2)); x = st2.validate_statusline('204 No Content'); st2
StatusAndHeaders(protocol = '', statusline = '204 No Content', headers = [])
>>> StatusAndHeadersParser(['HTTP/1.0']).parse(BytesIO(status_headers_3))
>>> StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_3))
StatusAndHeaders(protocol = 'HTTP/1.0', statusline = '204 Empty', headers = [('Content-Type', 'Value'), ('Content-Length', '0')])
# case-insensitive match
>>> StatusAndHeadersParser(['HTTP/1.0']).parse(BytesIO(status_headers_4))
>>> StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_4))
StatusAndHeaders(protocol = 'HTTP/1.0', statusline = '204 empty', headers = [('Content-Type', 'Value'), ('Content-Length', '0')])
@ -71,7 +71,8 @@ StatusAndHeaders(protocol = 'HTTP/1.0', statusline = '204 empty', headers = [('C
from pywb.utils.statusandheaders import StatusAndHeadersParser, StatusAndHeaders
from io import BytesIO
#from io import StringIO
from six import StringIO
status_headers_1 = "\

View File

@ -7,7 +7,7 @@ import re
import time
import datetime
import calendar
from itertools import imap
from six.moves import map
from email.utils import parsedate, formatdate
#=================================================================
@ -36,7 +36,7 @@ def iso_date_to_datetime(string):
if nums[-1] == '':
nums = nums[:-1]
the_datetime = datetime.datetime(*imap(int, nums))
the_datetime = datetime.datetime(*map(int, nums))
return the_datetime

View File

@ -47,7 +47,7 @@ setup(
long_description=long_description,
license='GPL',
packages=find_packages(),
zip_safe=True,
zip_safe=False,
provides=[
'pywb',
'pywb.utils',
@ -73,11 +73,12 @@ setup(
glob.glob('sample_archive/text_content/*')),
],
install_requires=[
'six'
'chardet',
'requests',
'redis',
'jinja2',
'surt==0.2',
'surt==0.3b4',
'pyyaml',
'watchdog',
'webencodings',