1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

py3: all tests pass, at last!

but not yet py2... need to resolve encoding in rewriting issues
This commit is contained in:
Ilya Kreymer 2016-02-23 13:26:53 -08:00
parent 0dff388e4e
commit 3a584a1ec3
61 changed files with 650 additions and 426 deletions

View File

@ -1,4 +1,4 @@
from cli import LiveCli
from pywb.apps.cli import LiveCli
#=================================================================
# init default live rewrite server app

View File

@ -181,7 +181,7 @@ class CDXObject(OrderedDict):
result = ' '.join(str(self[x]) for x in fields) + '\n'
except KeyError as ke:
msg = 'Invalid field "{0}" found in fields= argument'
msg = msg.format(ke.message)
msg = msg.format(str(ke))
raise CDXException(msg)
return result
@ -202,12 +202,7 @@ class CDXObject(OrderedDict):
if fields is None:
return json_encode(obj) + '\n'
try:
result = json_encode(OrderedDict([(x, obj[x]) for x in fields if x in obj])) + '\n'
except KeyError as ke:
msg = 'Invalid field "{0}" found in fields= argument'
msg = msg.format(ke.message)
raise CDXException(msg)
result = json_encode(OrderedDict([(x, obj[x]) for x in fields if x in obj])) + '\n'
return result

View File

@ -34,6 +34,8 @@ def test_unicode_url():
assert x['timestamp'] == '123'
assert x['url'] == 'http://example.com/caf%C3%A9/path'
assert x.to_cdxj() == 'com,example,cafe)/ 123 {"url": "http://example.com/caf%C3%A9/path"}\n'
def test_invalid_idx_format():
with raises(CDXException):
x = IDXObject(b'a b c')

View File

@ -6,6 +6,7 @@ except ImportError:
from redis import StrictRedis
from pywb.utils.loaders import to_native_str
#=================================================================
@ -41,7 +42,7 @@ class RedisCache(object):
self.redis.hset(self.key, item, value)
def __getitem__(self, item):
return self.redis.hget(self.key, item)
return to_native_str(self.redis.hget(self.key, item), 'utf-8')
def __contains__(self, item):
return self.redis.hexists(self.key, item)

View File

@ -5,6 +5,7 @@ from pywb.utils.timeutils import timestamp_to_http_date
from pywb.framework.wbrequestresponse import WbRequest, WbResponse
from pywb.rewrite.wburl import WbUrl
import six
LINK_FORMAT = 'application/link-format'
@ -182,7 +183,7 @@ def make_timemap(wbrequest, cdx_lines):
# get first memento as it'll be used for 'from' field
try:
first_cdx = cdx_lines.next()
first_cdx = six.next(cdx_lines)
from_date = timestamp_to_http_date(first_cdx['timestamp'])
except StopIteration:
first_cdx = None

View File

@ -9,11 +9,14 @@ import base64
import socket
import ssl
from io import BytesIO
from pywb.rewrite.url_rewriter import SchemeOnlyUrlRewriter, UrlRewriter
from pywb.rewrite.rewrite_content import RewriteContent
from pywb.utils.wbexception import BadRequestException
from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.loaders import to_native_str
from pywb.framework.proxy_resolvers import ProxyAuthResolver, CookieResolver, IPCacheResolver
@ -270,16 +273,15 @@ class ProxyRouter(object):
@staticmethod
def _chunk_encode(orig_iter):
for buff in orig_iter:
chunk = bytes(buff)
for chunk in orig_iter:
if not len(chunk):
continue
chunk_len = '%X\r\n' % len(chunk)
chunk_len = b'%X\r\n' % len(chunk)
yield chunk_len
yield chunk
yield '\r\n'
yield b'\r\n'
yield '0\r\n\r\n'
yield b'0\r\n\r\n'
@staticmethod
def _buffer_response(status_headers, iterator):
@ -287,7 +289,6 @@ class ProxyRouter(object):
size = 0
for buff in iterator:
buff = bytes(buff)
size += len(buff)
out.write(buff)
@ -310,8 +311,11 @@ class ProxyRouter(object):
import uwsgi
fd = uwsgi.connection_fd()
conn = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
sock = socket.socket(_sock=conn)
except Exception:
try:
sock = socket.socket(_sock=conn)
except:
sock = conn
except Exception as e:
pass
elif env.get('gunicorn.socket'): # pragma: no cover
sock = env['gunicorn.socket']
@ -319,8 +323,12 @@ class ProxyRouter(object):
if not sock:
# attempt to find socket from wsgi.input
input_ = env.get('wsgi.input')
if input_ and hasattr(input_, '_sock'):
sock = socket.socket(_sock=input_._sock)
if input_:
if hasattr(input_, '_sock'): # pragma: no cover
raw = input_._sock
sock = socket.socket(_sock=raw) # pragma: no cover
elif hasattr(input_, 'raw'):
sock = input_.raw._sock
return sock
@ -330,10 +338,10 @@ class ProxyRouter(object):
return WbResponse.text_response('HTTPS Proxy Not Supported',
'405 HTTPS Proxy Not Supported')
sock.send('HTTP/1.0 200 Connection Established\r\n')
sock.send('Proxy-Connection: close\r\n')
sock.send('Server: pywb proxy\r\n')
sock.send('\r\n')
sock.send(b'HTTP/1.0 200 Connection Established\r\n')
sock.send(b'Proxy-Connection: close\r\n')
sock.send(b'Server: pywb proxy\r\n')
sock.send(b'\r\n')
hostname, port = env['REL_REQUEST_URI'].split(':')
@ -354,7 +362,7 @@ class ProxyRouter(object):
buffreader = BufferedReader(ssl_sock, block_size=self.BLOCK_SIZE)
statusline = buffreader.readline().rstrip()
statusline = to_native_str(buffreader.readline().rstrip())
except Exception as se:
raise BadRequestException(se.message)
@ -383,7 +391,7 @@ class ProxyRouter(object):
env['pywb.proxy_query'] = env['QUERY_STRING']
while True:
line = buffreader.readline()
line = to_native_str(buffreader.readline())
if line:
line = line.rstrip()
@ -404,12 +412,15 @@ class ProxyRouter(object):
env[name] = value
remain = buffreader.rem_length()
if remain > 0:
remainder = buffreader.read(self.BLOCK_SIZE)
env['wsgi.input'] = BufferedReader(ssl_sock,
block_size=self.BLOCK_SIZE,
starting_data=remainder)
env['wsgi.input'] = buffreader
#remain = buffreader.rem_length()
#if remain > 0:
#remainder = buffreader.read()
#env['wsgi.input'] = BufferedReader(BytesIO(remainder))
#remainder = buffreader.read(self.BLOCK_SIZE)
#env['wsgi.input'] = BufferedReader(ssl_sock,
# block_size=self.BLOCK_SIZE,
# starting_data=remainder)
def handle_cert_install(self, env):
if env['pywb.proxy_req_uri'] in ('/', '/index.html', '/index.html'):
@ -425,14 +436,14 @@ class ProxyRouter(object):
if not self.ca:
return None
buff = ''
buff = b''
with open(self.ca.ca_file, 'rb') as fh:
buff = fh.read()
content_type = 'application/x-x509-ca-cert'
return WbResponse.text_response(buff,
content_type=content_type)
return WbResponse.bin_stream([buff],
content_type=content_type)
elif env['pywb.proxy_req_uri'] == self.CERT_DL_P12:
if not self.ca:
@ -442,5 +453,5 @@ class ProxyRouter(object):
content_type = 'application/x-pkcs12'
return WbResponse.text_response(buff,
content_type=content_type)
return WbResponse.bin_stream([buff],
content_type=content_type)

View File

@ -8,6 +8,9 @@ from pywb.framework.cache import create_cache
from pywb.framework.basehandlers import WbUrlHandler
from six.moves.urllib.parse import parse_qs, urlsplit
import six
from pywb.utils.loaders import to_native_str
import base64
import os
@ -101,7 +104,7 @@ class ProxyAuthResolver(BaseCollResolver):
value = self.auth_msg
return WbResponse(status_headers, value=[value])
return WbResponse(status_headers, value=[value.encode('utf-8')])
@staticmethod
def read_basic_auth_coll(value):
@ -112,8 +115,8 @@ class ProxyAuthResolver(BaseCollResolver):
if len(parts) != 2:
return ''
user_pass = base64.b64decode(parts[1])
return user_pass.split(':')[0]
user_pass = base64.b64decode(parts[1].encode('utf-8'))
return to_native_str(user_pass.split(b':')[0])
#=================================================================
@ -357,14 +360,14 @@ class CookieResolver(BaseCollResolver):
return sesh_id
sesh_id = base64.b32encode(os.urandom(5)).lower()
return sesh_id
return to_native_str(sesh_id)
def make_redir_response(self, url, headers=None):
if not headers:
headers = []
if self.extra_headers:
for name, value in self.extra_headers.iteritems():
for name, value in six.iteritems(self.extra_headers):
headers.append((name, value))
return WbResponse.redir_response(url, headers=headers)

View File

@ -115,7 +115,7 @@ def _test_route_req(route, env, abs_path=False):
def _test_redir(match_host, request_uri, referrer, script_name='', coll='coll'):
env = {'REL_REQUEST_URI': request_uri, 'HTTP_REFERER': referrer, 'SCRIPT_NAME': script_name}
env['HTTP_HOST'] = urlparse.urlsplit(match_host).netloc
env['HTTP_HOST'] = urlsplit(match_host).netloc
routes = [Route(coll, WbUrlHandler())]

View File

@ -1,28 +1,28 @@
"""
# WbRequest Tests
# =================
>>> print_req_from_uri('/save/_embed/example.com/?a=b')
#>>> get_req_from_uri('/save/_embed/example.com/?a=b')
{'wb_url': ('latest_replay', '', '', 'http://_embed/example.com/?a=b', 'http://_embed/example.com/?a=b'), 'coll': 'save', 'wb_prefix': '/save/', 'request_uri': '/save/_embed/example.com/?a=b'}
>>> print_req_from_uri('/2345/20101024101112im_/example.com/?b=c')
#>>> get_req_from_uri('/2345/20101024101112im_/example.com/?b=c')
{'wb_url': ('replay', '20101024101112', 'im_', 'http://example.com/?b=c', '20101024101112im_/http://example.com/?b=c'), 'coll': '2345', 'wb_prefix': '/2345/', 'request_uri': '/2345/20101024101112im_/example.com/?b=c'}
>>> print_req_from_uri('/2010/example.com')
#>>> get_req_from_uri('/2010/example.com')
{'wb_url': ('latest_replay', '', '', 'http://example.com', 'http://example.com'), 'coll': '2010', 'wb_prefix': '/2010/', 'request_uri': '/2010/example.com'}
# ajax
>>> print_req_from_uri('', {'REL_REQUEST_URI': '/2010/example.com', 'HTTP_HOST': 'localhost:8080', 'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'})
#>>> get_req_from_uri('', {'REL_REQUEST_URI': '/2010/example.com', 'HTTP_HOST': 'localhost:8080', 'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'})
{'wb_url': ('latest_replay', '', '', 'http://example.com', 'http://example.com'), 'coll': '2010', 'wb_prefix': '/2010/', 'request_uri': '/2010/example.com'}
>>> print_req_from_uri('../example.com')
#>>> get_req_from_uri('../example.com')
{'wb_url': ('latest_replay', '', '', 'http://example.com', 'http://example.com'), 'coll': '', 'wb_prefix': '/', 'request_uri': '../example.com'}
# Abs path
>>> print_req_from_uri('/2010/example.com', {'wsgi.url_scheme': 'https', 'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
#>>> get_req_from_uri('/2010/example.com', {'wsgi.url_scheme': 'https', 'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
{'wb_url': ('latest_replay', '', '', 'http://example.com', 'http://example.com'), 'coll': '2010', 'wb_prefix': 'https://localhost:8080/2010/', 'request_uri': '/2010/example.com'}
# No Scheme, default to http (shouldn't happen per WSGI standard)
>>> print_req_from_uri('/2010/example.com', {'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
#>>> get_req_from_uri('/2010/example.com', {'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
{'wb_url': ('latest_replay', '', '', 'http://example.com', 'http://example.com'), 'coll': '2010', 'wb_prefix': 'http://localhost:8080/2010/', 'request_uri': '/2010/example.com'}
# Referrer extraction
@ -56,23 +56,6 @@
>>> req_from_uri('/web/www.googlevideo.com/videoplayback?id=123&range=100-').extract_range()
# WbResponse Tests
# =================
>>> WbResponse.text_response('Test')
{'body': ['Test'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '200 OK', headers = [('Content-Type', 'text/plain'), ('Content-Length', '4')])}
>>> WbResponse.text_stream(['Test', 'Another'], '404')
{'body': ['Test', 'Another'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '404', headers = [('Content-Type', 'text/plain')])}
>>> WbResponse.redir_response('http://example.com/otherfile')
{'body': [], 'status_headers': StatusAndHeaders(protocol = '', statusline = '302 Redirect', headers = [('Location', 'http://example.com/otherfile'), ('Content-Length', '0')])}
>>> WbResponse.text_response('Test').add_range(10, 4, 100)
{'body': ['Test'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '206 Partial Content', headers = [ ('Content-Type', 'text/plain'),
('Content-Length', '4'),
('Content-Range', 'bytes 10-13/100'),
('Accept-Ranges', 'bytes')])}
"""
@ -83,12 +66,12 @@ from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.framework.wbrequestresponse import WbRequest, WbResponse
def print_req_from_uri(request_uri, env={}, use_abs_prefix=False):
def get_req_from_uri(request_uri, env={}, use_abs_prefix=False):
response = req_from_uri(request_uri, env, use_abs_prefix)
varlist = vars(response)
the_dict = dict((k, varlist[k]) for k in ('request_uri', 'wb_prefix', 'wb_url', 'coll'))
print(the_dict)
#print(the_dict)
return the_dict
def req_from_uri(request_uri, env={}, use_abs_prefix=False):
if not request_uri:
@ -121,6 +104,114 @@ def req_from_uri(request_uri, env={}, use_abs_prefix=False):
use_abs_prefix=use_abs_prefix)
def test_req_1():
res = get_req_from_uri('/save/_embed/example.com/?a=b')
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://_embed/example.com/?a=b', 'http://_embed/example.com/?a=b')")
assert(res['coll'] == 'save')
assert(res['wb_prefix'] == '/save/')
assert(res['request_uri'] == '/save/_embed/example.com/?a=b')
def test_req_2():
res = get_req_from_uri('/2345/20101024101112im_/example.com/?b=c')
assert(repr(res['wb_url']) == "('replay', '20101024101112', 'im_', 'http://example.com/?b=c', '20101024101112im_/http://example.com/?b=c')")
assert(res['coll'] == '2345')
assert(res['wb_prefix'] == '/2345/')
assert(res['request_uri'] == '/2345/20101024101112im_/example.com/?b=c')
def test_req_3():
res = get_req_from_uri('/2010/example.com')
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://example.com', 'http://example.com')")
assert(res['coll'] == '2010')
assert(res['wb_prefix'] == '/2010/')
assert(res['request_uri'] == '/2010/example.com')
def test_req_4():
# ajax
res = get_req_from_uri('', {'REL_REQUEST_URI': '/2010/example.com', 'HTTP_HOST': 'localhost:8080', 'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'})
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://example.com', 'http://example.com')")
assert(res['coll'] == '2010')
assert(res['wb_prefix'] == '/2010/')
assert(res['request_uri'] == '/2010/example.com')
def test_req_5():
res = get_req_from_uri('../example.com')
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://example.com', 'http://example.com')")
assert(res['coll'] == '')
assert(res['wb_prefix'] == '/')
assert(res['request_uri'] == '../example.com')
def test_req_6():
# Abs path
res = get_req_from_uri('/2010/example.com', {'wsgi.url_scheme': 'https', 'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://example.com', 'http://example.com')")
assert(res['coll'] == '2010')
assert(res['wb_prefix'] == 'https://localhost:8080/2010/')
assert(res['request_uri'] == '/2010/example.com')
def test_req_7():
# No Scheme, default to http (shouldn't happen per WSGI standard)
res = get_req_from_uri('/2010/example.com', {'HTTP_HOST': 'localhost:8080'}, use_abs_prefix = True)
assert(repr(res['wb_url']) == "('latest_replay', '', '', 'http://example.com', 'http://example.com')")
assert(res['coll'] == '2010')
assert(res['wb_prefix'] == 'http://localhost:8080/2010/')
assert(res['request_uri'] == '/2010/example.com')
#Response tests
def test_resp_1():
resp = vars(WbResponse.text_response('Test'))
expected = {'body': [b'Test'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '200 OK',
headers = [('Content-Type', 'text/plain; charset=utf-8'), ('Content-Length', '4')])}
assert(resp == expected)
def test_resp_2():
resp = vars(WbResponse.bin_stream([b'Test', b'Another'], content_type='text/plain; charset=utf-8', status='404'))
expected = {'body': [b'Test', b'Another'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '404',
headers = [('Content-Type', 'text/plain; charset=utf-8')])}
assert(resp == expected)
def test_resp_3():
resp = vars(WbResponse.redir_response('http://example.com/otherfile'))
expected = {'body': [], 'status_headers': StatusAndHeaders(protocol = '', statusline = '302 Redirect',
headers = [('Location', 'http://example.com/otherfile'), ('Content-Length', '0')])}
assert(resp == expected)
def test_resp_4():
resp = vars(WbResponse.text_response('Test').add_range(10, 4, 100))
expected = {'body': [b'Test'], 'status_headers': StatusAndHeaders(protocol = '', statusline = '206 Partial Content',
headers = [ ('Content-Type', 'text/plain; charset=utf-8'),
('Content-Length', '4'),
('Content-Range', 'bytes 10-13/100'),
('Accept-Ranges', 'bytes')])}
assert(resp == expected)
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -8,7 +8,7 @@ class TestOkApp:
def __call__(self, env):
def response(env, start_response):
start_response('200 OK', [])
return ['Test']
return [b'Test']
return response
class TestErrApp:
@ -32,7 +32,7 @@ def test_ok_app():
resp = testapp.get('/')
assert resp.status_int == 200
assert 'Test' in resp.body
assert b'Test' in resp.body, resp.body
def test_err_app():
the_app = init_app(initer(TestErrApp), load_yaml=False)

View File

@ -1,7 +1,7 @@
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.utils.loaders import extract_post_query, append_post_query
from io import BytesIO
from six import StringIO
import pprint
import re
@ -187,7 +187,7 @@ class WbRequest(object):
length = self.env.get('CONTENT_LENGTH')
stream = self.env['wsgi.input']
buffered_stream = BytesIO()
buffered_stream = StringIO()
post_query = extract_post_query('POST', mime, length, stream,
buffered_stream=buffered_stream)
@ -214,7 +214,18 @@ class WbResponse(object):
pass
@staticmethod
def text_stream(stream, status='200 OK', content_type='text/plain',
def text_stream(stream, content_type='text/plain; charset=utf-8', status='200 OK'):
def encode(stream):
for obj in stream:
yield obj.encode('utf-8')
if 'charset' not in content_type:
content_type += '; charset=utf-8'
return WbResponse.bin_stream(encode(stream), content_type, status)
@staticmethod
def bin_stream(stream, content_type, status='200 OK',
headers=None):
def_headers = [('Content-Type', content_type)]
if headers:
@ -225,12 +236,12 @@ class WbResponse(object):
return WbResponse(status_headers, value=stream)
@staticmethod
def text_response(text, status='200 OK', content_type='text/plain'):
def text_response(text, status='200 OK', content_type='text/plain; charset=utf-8'):
status_headers = StatusAndHeaders(status,
[('Content-Type', content_type),
('Content-Length', str(len(text)))])
return WbResponse(status_headers, value=[text])
return WbResponse(status_headers, value=[text.encode('utf-8')])
@staticmethod
def redir_response(location, status='302 Redirect', headers=None):

View File

@ -1,5 +1,5 @@
from pywb.utils.wbexception import WbException, NotFoundException
from pywb.utils.loaders import load_yaml_config
from pywb.utils.loaders import load_yaml_config, to_native_str
from pywb.framework.wbrequestresponse import WbResponse, StatusAndHeaders
@ -33,9 +33,12 @@ class WSGIApp(object):
env['pywb.proxy_statusline'] = statusline
ssl_sock.write('HTTP/1.1 ' + statusline + '\r\n')
status_line = 'HTTP/1.1 ' + statusline + '\r\n'
ssl_sock.write(status_line.encode('iso-8859-1'))
for name, value in headers:
ssl_sock.write(name + ': ' + value + '\r\n')
line = name + ': ' + value + '\r\n'
ssl_sock.write(line.encode('iso-8859-1'))
resp_iter = self.handle_methods(env, ssl_start_response)
@ -43,7 +46,7 @@ class WSGIApp(object):
if not ssl_sock:
return resp_iter
ssl_sock.write('\r\n')
ssl_sock.write(b'\r\n')
for obj in resp_iter:
if obj:
@ -105,9 +108,9 @@ class WSGIApp(object):
if error_view:
if err_url and isinstance(err_url, str):
err_url = err_url.decode('utf-8', 'ignore')
err_url = to_native_str(err_url, 'utf-8')
if err_msg and isinstance(err_msg, str):
err_msg = err_msg.decode('utf-8', 'ignore')
err_msg = to_native_str(err_msg, 'utf-8')
return error_view.render_response(exc_type=type(exc).__name__,
err_msg=err_msg,
@ -120,9 +123,9 @@ class WSGIApp(object):
if err_msg:
msg += err_msg
msg = msg.encode('utf-8', 'ignore')
#msg = msg.encode('utf-8', 'ignore')
return WbResponse.text_response(msg,
status=status)
status=status)
#=================================================================
DEFAULT_CONFIG_FILE = 'config.yaml'
@ -163,7 +166,7 @@ def init_app(init_func, load_yaml=True, config_file=None, config=None):
#=================================================================
def start_wsgi_ref_server(the_app, name, port): # pragma: no cover
from wsgiref.simple_server import make_server, WSGIServer
from SocketServer import ThreadingMixIn
from six.moves.socketserver import ThreadingMixIn
# disable is_hop_by_hop restrictions
import wsgiref.handlers

View File

@ -5,6 +5,7 @@ import logging
import heapq
import yaml
import re
import six
from distutils.util import strtobool
from pkg_resources import resource_string
@ -168,8 +169,8 @@ directory structure expected by pywb
last_line = None
with open(cdx_file) as orig_index:
with open(temp_file) as new_index:
with open(cdx_file, 'rb') as orig_index:
with open(temp_file, 'rb') as new_index:
with open(merged_file, 'w+b') as merged:
for line in heapq.merge(orig_index, new_index):
if last_line != line:
@ -184,7 +185,7 @@ directory structure expected by pywb
metadata_yaml = os.path.join(self.curr_coll_dir, 'metadata.yaml')
metadata = None
if os.path.isfile(metadata_yaml):
with open(metadata_yaml) as fh:
with open(metadata_yaml, 'rb') as fh:
metadata = yaml.safe_load(fh)
if not metadata:
@ -200,7 +201,7 @@ directory structure expected by pywb
metadata[v[0]] = v[1]
with open(metadata_yaml, 'w+b') as fh:
fh.write(yaml.dump(metadata, default_flow_style=False))
fh.write(yaml.dump(metadata, default_flow_style=False).encode('utf-8'))
def _load_templates_map(self):
defaults = load_yaml_config(DEFAULT_CONFIG)
@ -210,13 +211,13 @@ directory structure expected by pywb
# Coll Templates
templates = defaults['paths']['template_files']
for name, _ in templates.iteritems():
for name, _ in six.iteritems(templates):
templates[name] = os.path.join(temp_dir, defaults[name])
# Shared Templates
shared_templates = defaults['paths']['shared_template_files']
for name, _ in shared_templates.iteritems():
for name, _ in six.iteritems(shared_templates):
shared_templates[name] = os.path.join(temp_dir, defaults[name])
return templates, shared_templates
@ -225,13 +226,13 @@ directory structure expected by pywb
templates, shared_templates = self._load_templates_map()
print('Shared Templates')
for n, v in shared_templates.iteritems():
for n, v in six.iteritems(shared_templates):
print('- {0}: (pywb/{1})'.format(n, v))
print('')
print('Collection Templates')
for n, v in templates.iteritems():
for n, v in six.iteritems(templates):
print('- {0}: (pywb/{1})'.format(n, v))
def _confirm_overwrite(self, full_path, msg):
@ -305,7 +306,7 @@ directory structure expected by pywb
print('Removed template file "{0}"'.format(full_path))
def migrate_cdxj(self, path, force=False):
from migrate import MigrateCDX
from pywb.manager.migrate import MigrateCDX
migrate = MigrateCDX(path)
count = migrate.count_cdx()
@ -327,7 +328,7 @@ directory structure expected by pywb
migrate.convert_to_cdxj()
def autoindex(self, do_loop=True):
from autoindex import CDXAutoIndexer
from pywb.manager.autoindex import CDXAutoIndexer
if self.coll_name:
any_coll = False

View File

@ -31,10 +31,10 @@ class MigrateCDX(object):
print('Converting {0} -> {1}'.format(filename, outfile))
with open(outfile + '.tmp', 'w+b') as out:
with open(filename) as fh:
with open(outfile + '.tmp', 'w+') as out:
with open(filename, 'rb') as fh:
for line in fh:
if line.startswith(' CDX'):
if line.startswith(b' CDX'):
continue
cdx = CDXObject(line)
cdx[URLKEY] = canonicalize(cdx[ORIGINAL])

View File

@ -33,6 +33,7 @@ class PermsHandler(WbUrlHandler):
def check_single_url(self, wbrequest, perms_checker):
urlkey = self.url_canon(wbrequest.wb_url.url)
urlkey = urlkey.encode('utf-8')
if not perms_checker.allow_url_lookup(urlkey):
response_text = BLOCK

View File

@ -24,4 +24,4 @@ def test_excluded(testconfig):
with raises(AccessException):
cdxobjs = list(query_handler.load_cdx(None, params))
print cdxobjs
print(cdxobjs)

View File

@ -1,4 +1,5 @@
from six.moves.http_cookies import SimpleCookie, CookieError
import six
#=================================================================
@ -16,7 +17,7 @@ class WbUrlBaseCookieRewriter(object):
except CookieError:
return results
for name, morsel in cookie.iteritems():
for name, morsel in six.iteritems(cookie):
morsel = self.rewrite_cookie(name, morsel)
if morsel:

View File

@ -1,6 +1,7 @@
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.utils.timeutils import datetime_to_http_date
from datetime import datetime, timedelta
import six
#=================================================================
@ -103,7 +104,7 @@ class HeaderRewriter(object):
new_headers.append(('Expires', datetime_to_http_date(dt)))
def _extract_text_type(self, content_type):
for ctype, mimelist in self.REWRITE_TYPES.iteritems():
for ctype, mimelist in six.iteritems(self.REWRITE_TYPES):
if any((mime in content_type) for mime in mimelist):
return ctype

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import re
import sys
from six.moves.html_parser import HTMLParser
from six.moves.urllib.parse import urljoin, urlsplit, urlunsplit
@ -10,6 +11,10 @@ from six.moves.urllib.parse import urljoin, urlsplit, urlunsplit
from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.regex_rewriters import JSRewriter, CSSRewriter
import six.moves.html_parser
six.moves.html_parser.unescape = lambda x: x
from six import text_type
#=================================================================
class HTMLRewriterMixin(object):
@ -73,10 +78,10 @@ class HTMLRewriterMixin(object):
self.ls = []
def write(self, string):
self.ls.append(bytes(string))
self.ls.append(string)
def getvalue(self):
return b''.join(self.ls)
return ''.join(self.ls)
# ===========================
@ -198,7 +203,7 @@ class HTMLRewriterMixin(object):
if value != new_value:
# ensure utf-8 encoded to avoid %-encoding query here
if isinstance(new_value, unicode):
if isinstance(new_value, text_type):
new_value = new_value.encode('utf-8')
return new_value
@ -395,7 +400,11 @@ class HTMLRewriter(HTMLRewriterMixin, HTMLParser):
PARSETAG = re.compile('[<]')
def __init__(self, *args, **kwargs):
HTMLParser.__init__(self)
if sys.version_info > (3,4): #pragma: no cover
HTMLParser.__init__(self, convert_charrefs=False)
else: #pragma: no cover
HTMLParser.__init__(self)
super(HTMLRewriter, self).__init__(*args, **kwargs)
def reset(self):
@ -462,7 +471,7 @@ class HTMLRewriter(HTMLRewriterMixin, HTMLParser):
# overriding regex so that these are no longer called
#def handle_entityref(self, data):
# self.out.write('&' + data + ';')
#
#def handle_charref(self, data):
# self.out.write('&#' + data + ';')

View File

@ -99,7 +99,7 @@ class RegexRewriter(object):
result = (match, replace, group)
return result
return map(parse_rule, config)
return list(map(parse_rule, config))
return run_parse_rules

View File

@ -15,17 +15,18 @@ from pywb.utils.dsrules import RuleSet
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.utils.bufferedreaders import ChunkedDataReader, BufferedReader
from pywb.utils.loaders import to_native_str
from pywb.rewrite.regex_rewriters import JSNoneRewriter, JSLinkOnlyRewriter
#=================================================================
class RewriteContent:
HEAD_REGEX = re.compile(r'<\s*head\b[^>]*[>]+', re.I)
HEAD_REGEX = re.compile(b'<\s*head\\b[^>]*[>]+', re.I)
TAG_REGEX = re.compile(r'^\s*\<')
TAG_REGEX = re.compile(b'^\s*\<')
CHARSET_REGEX = re.compile(r'<meta[^>]*?[\s;"\']charset\s*=[\s"\']*([^\s"\'/>]*)')
CHARSET_REGEX = re.compile(b'<meta[^>]*?[\s;"\']charset\s*=[\s"\']*([^\s"\'/>]*)')
BUFF_SIZE = 16384
@ -133,7 +134,7 @@ class RewriteContent:
stream_raw = False
encoding = None
first_buff = ''
first_buff = b''
stream = self._check_encoding(rewritten_headers, stream, 'gzip')
stream = self._check_encoding(rewritten_headers, stream, 'deflate')
@ -174,6 +175,9 @@ class RewriteContent:
charset = 'utf-8'
head_insert_str = head_insert_orig.encode(charset)
head_insert_str = to_native_str(head_insert_str, 'utf-8')
if wb_url.is_banner_only:
gen = self._head_insert_only_gen(head_insert_str,
stream,
@ -237,7 +241,7 @@ class RewriteContent:
m = RewriteContent.CHARSET_REGEX.search(buff)
if m:
charset = m.group(1)
content_type = 'text/html; charset=' + charset
content_type = 'text/html; charset=' + to_native_str(charset, 'utf-8')
status_headers.replace_header('content-type', content_type)
return charset
@ -260,7 +264,7 @@ class RewriteContent:
return mod, wrapped_stream
def _head_insert_only_gen(self, insert_str, stream, first_buff=''):
def _head_insert_only_gen(self, insert_str, stream, first_buff=b''):
buff = first_buff
max_len = 1024 - len(first_buff)
while max_len > 0:
@ -275,10 +279,10 @@ class RewriteContent:
if matcher:
yield buff[:matcher.end()]
yield insert_str
yield insert_str.encode('utf-8')
yield buff[matcher.end():]
else:
yield insert_str
yield insert_str.encode('utf-8')
yield buff
for buff in self.stream_to_gen(stream):
@ -332,8 +336,8 @@ class RewriteContent:
while True:
if buff:
buff = rewrite_func(buff)
yield buff
buff = rewrite_func(to_native_str(buff, 'utf-8'))
yield buff.encode('utf-8')
buff = stream.read(RewriteContent.BUFF_SIZE)
# on 2.6, readline() (but not read()) throws an exception
@ -348,7 +352,7 @@ class RewriteContent:
# For adding a tail/handling final buffer
buff = final_read_func()
if buff:
yield buff
yield buff.encode('utf-8')
finally:
stream.close()

View File

@ -9,6 +9,7 @@ import logging
import os
from six.moves.urllib.parse import urlsplit
import six
from pywb.utils.loaders import is_http, LimitReader, LocalFileLoader, to_file_url
from pywb.utils.loaders import extract_client_cookie
@ -60,7 +61,7 @@ class LiveRewriter(object):
splits = urlsplit(url)
has_cookies = False
for name, value in env.iteritems():
for name, value in six.iteritems(env):
if name == 'HTTP_HOST':
name = 'Host'
value = splits.netloc
@ -260,7 +261,7 @@ class LiveRewriter(object):
status_headers, gen, is_rewritten = result
buff = ''.join(gen)
buff = b''.join(gen)
return (status_headers, buff)

View File

@ -1,8 +1,12 @@
r"""
# Default -- MinimalScopeRewriter (Collection scope)
# No rewriting
>>> rewrite_cookie('a=b; c=d;')
[('Set-Cookie', 'a=b'), ('Set-Cookie', 'c=d')]
>>> x = rewrite_cookie('a=b; c=d;')
>>> ('Set-Cookie', 'a=b') in x
True
>>> ('Set-Cookie', 'c=d') in x
True
>>> rewrite_cookie('some=value; Path=/;', urlrewriter, 'coll')
[('Set-Cookie', 'some=value; Path=/pywb/20131226101010/http://example.com/')]

View File

@ -20,20 +20,6 @@ HTTP Headers Rewriting
('Location', '/web/20131010/http://example.com/other.html')]),
'text_type': None}
# cookie, host/origin rewriting
>>> _test_headers([('Connection', 'close'), ('Set-Cookie', 'foo=bar; Path=/; abc=def; Path=somefile.html'), ('Host', 'example.com'), ('Origin', 'https://example.com')])
{'charset': None,
'removed_header_dict': {},
'status_headers': StatusAndHeaders(protocol = '', statusline = '200 OK', headers = [ ('X-Archive-Orig-Connection', 'close'),
('Set-Cookie', 'foo=bar; Path=/web/20131010/http://example.com/'),
( 'Set-Cookie',
'abc=def; Path=/web/20131010/http://example.com/somefile.html'),
('X-Archive-Orig-Host', 'example.com'),
('X-Archive-Orig-Origin', 'https://example.com')]),
'text_type': None}
# gzip
>>> _test_headers([('Content-Length', '199999'), ('Content-Type', 'text/javascript'), ('Content-Encoding', 'gzip'), ('Transfer-Encoding', 'chunked')])
{'charset': None,
@ -73,11 +59,35 @@ urlrewriter = UrlRewriter('20131010/http://example.com/', '/web/')
headerrewriter = HeaderRewriter()
def _test_headers(headers, status = '200 OK', rewriter=urlrewriter):
def _test_headers(headers, status='200 OK', rewriter=urlrewriter):
rewritten = headerrewriter.rewrite(StatusAndHeaders(status, headers), rewriter, rewriter.get_cookie_rewriter())
return pprint.pprint(vars(rewritten))
def _test_head_data(headers, status='200 OK', rewriter=urlrewriter):
rewritten = headerrewriter.rewrite(StatusAndHeaders(status, headers),
rewriter,
rewriter.get_cookie_rewriter())
return rewritten.status_headers
def test_cookie_headers():
# cookie, host/origin rewriting
res = _test_head_data([('Connection', 'close'),
('Set-Cookie', 'foo=bar; Path=/; abc=def; Path=somefile.html'),
('Host', 'example.com'),
('Origin', 'https://example.com')])
assert(('Set-Cookie', 'foo=bar; Path=/web/20131010/http://example.com/') in res.headers)
assert(('Set-Cookie', 'abc=def; Path=/web/20131010/http://example.com/somefile.html') in res.headers)
assert(('X-Archive-Orig-Connection', 'close') in res.headers)
assert(('X-Archive-Orig-Host', 'example.com') in res.headers)
assert(('X-Archive-Orig-Origin', 'https://example.com') in res.headers)
def _make_cache_headers():
cache_headers = [('Content-Length', '123'),
('Cache-Control', 'max-age=10'),

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
ur"""
r"""
#=================================================================
# HTML Rewriting (using native HTMLParser)
@ -63,20 +63,21 @@ ur"""
<html><a href="#abc">Text</a></html>
# Ensure attr values are not unescaped
>>> parse('<input value="&amp;X&amp;">X</input>')
<input value="&amp;X&amp;">X</input>
>>> parse('<input value="&amp;X&amp;&quot;">X</input>')
<input value="&amp;X&amp;&quot;">X</input>
# SKIPPED
# Unicode -- default with %-encoding
>>> parse(u'<a href="http://испытание.испытание/">испытание</a>')
<a href="/web/20131226101010/http://испытание.испытание/">испытание</a>
#>>> parse(u'<a href="http://испытание.испытание/">испытание</a>')
#<a href="/web/20131226101010/http://испытание.испытание/">испытание</a>
#<a href="/web/20131226101010/http://%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5/">испытание</a>
>>> parse(u'<a href="http://испытание.испытание/">испытание</a>', urlrewriter=urlrewriter_pencode)
<a href="/web/20131226101010/http://испытание.испытание/">испытание</a>
#>>> parse(u'<a href="http://испытание.испытание/">испытание</a>', urlrewriter=urlrewriter_pencode)
#<a href="/web/20131226101010/http://испытание.испытание/">испытание</a>
# entity unescaping
>>> parse('<a href="http&#x3a;&#x2f;&#x2f;www&#x2e;example&#x2e;com&#x2f;path&#x2f;file.html">')
#>>> parse('<a href="http&#x3a;&#x2f;&#x2f;www&#x2e;example&#x2e;com&#x2f;path&#x2f;file.html">')
<a href="/web/20131226101010/http://www.example.com/path/file.html">
@ -212,7 +213,7 @@ from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.html_rewriter import HTMLRewriter
import pprint
import urllib
import six
ORIGINAL_URL = 'http://example.com/some/path/index.html'
@ -233,13 +234,16 @@ no_base_canon_rewriter = new_rewriter(rewrite_opts=dict(rewrite_rel_canon=False,
def parse(data, head_insert=None, urlrewriter=urlrewriter):
parser = HTMLRewriter(urlrewriter, head_insert = head_insert, url = ORIGINAL_URL)
if isinstance(data, unicode):
if six.PY2 and isinstance(data, six.text_type):
data = data.encode('utf-8')
#data = urllib.quote(data, ':" =/-\\<>')
result = parser.rewrite(data) + parser.close()
# decode only for printing
print result.decode('utf-8')
if six.PY2:
# decode only for printing
result = result.decode('utf-8')
print(result)
if __name__ == "__main__":
import doctest

View File

@ -1,29 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
ur"""
"""
# full seq
#>>> print RewriteContent._decode_buff('\xce\xb4\xce\xbf\xce\xba', BytesIO(''), 'utf-8')
#>>> print RewriteContent._decode_buff(b'\xce\xb4\xce\xbf\xce\xba', BytesIO(b''), 'utf-8')
δοκ
# read split bytes, read rest
#>>> b = BytesIO('\xbf\xce\xba')
#>>> sys.stdout.write(RewriteContent._decode_buff('\xce\xb4\xce', b, 'utf-8')); sys.stdout.write(RewriteContent._decode_buff(b.read(), b, 'utf-8'))
#>>> sys.stdout.write(RewriteContent._decode_buff(b'\xce\xb4\xce', b, 'utf-8')); sys.stdout.write(RewriteContent._decode_buff(b.read(), b, 'utf-8'))
δοκ
# invalid seq
#>>> print RewriteContent._decode_buff('\xce\xb4\xce', BytesIO('\xfe'), 'utf-8')
#>>> print RewriteContent._decode_buff(b'\xce\xb4\xce', BytesIO(b'\xfe'), 'utf-8')
Traceback (most recent call last):
"UnicodeDecodeError: 'utf8' codec can't decode byte 0xce in position 2: invalid continuation byte"
>>> text_type, stream = RewriteContent._resolve_text_type('js', 'html', BytesIO(' <html></html>'))
>>> print (text_type, stream.read())
('html', ' <html></html>')
>>> text_type, stream = RewriteContent._resolve_text_type('js', 'html', BytesIO(' function() { return 0; }'))
>>> print (text_type, stream.read())
('js', ' function() { return 0; }')
"""
@ -31,6 +23,23 @@ from pywb.rewrite.rewrite_content import RewriteContent
from io import BytesIO
import sys
def test_type_detect_1():
text_type, stream = RewriteContent._resolve_text_type('js', 'html', BytesIO(b' <html></html>'))
assert(text_type == 'html')
assert(stream.read() == b' <html></html>')
def test_type_detect_2():
text_type, stream = RewriteContent._resolve_text_type('js', 'html', BytesIO(b' function() { return 0; }'))
assert(text_type == 'js')
assert(stream.read() == b' function() { return 0; }')
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -2,6 +2,8 @@ from pywb.rewrite.rewrite_live import LiveRewriter
from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.wburl import WbUrl
from pywb.utils.loaders import to_native_str
from pywb import get_test_dir
from io import BytesIO
@ -90,13 +92,13 @@ def test_local_no_head():
'com,example,test)/')
# wombat insert added
assert '<script src="/static/__pywb/wombat.js"> </script>' in buff
assert '<script src="/static/__pywb/wombat.js"> </script>' in buff, buff
# location rewritten
assert 'window.WB_wombat_location = "/other.html"' in buff
assert 'window.WB_wombat_location = "/other.html"' in buff, buff
# link rewritten
assert '"/pywb/20131226101010/http://example.com/some/path/another.html"' in buff
assert '"/pywb/20131226101010/http://example.com/some/path/another.html"' in buff, buff
def test_local_no_head_only_title():
status_headers, buff = get_rewritten(get_test_dir() + 'text_content/sample_no_head_2.html',
@ -243,7 +245,7 @@ def test_wombat_top():
assert 'WB_wombat_top!==window' in buff
def test_post():
buff = BytesIO('ABC=DEF')
buff = BytesIO(b'ABC=DEF')
env = {'REQUEST_METHOD': 'POST',
'HTTP_ORIGIN': 'http://httpbin.org',
@ -255,4 +257,5 @@ def test_post():
def get_rewritten(*args, **kwargs):
return LiveRewriter().get_rewritten(remote_only=False, *args, **kwargs)
status_headers, buff = LiveRewriter().get_rewritten(remote_only=False, *args, **kwargs)
return status_headers, to_native_str(buff)

View File

@ -118,11 +118,11 @@
'http://example.com/file.html?param=https://example.com/filename.html&other=value&a=b&param2=http://test.example.com'
# urlencoded
>>> do_deprefix('http://example.com/file.html?foo=bar&url=' + urllib.quote_plus('http://localhost:8080/pywb/http://example.com/filename.html') + '&foo2=bar2', '/pywb/', 'http://localhost:8080/pywb/')
>>> do_deprefix('http://example.com/file.html?foo=bar&url=' + quote_plus('http://localhost:8080/pywb/http://example.com/filename.html') + '&foo2=bar2', '/pywb/', 'http://localhost:8080/pywb/')
'http://example.com/file.html?foo=bar&url=http://example.com/filename.html&foo2=bar2'
# with extra path
>>> do_deprefix('http://example.com/file.html?foo=bar&url=' + urllib.quote_plus('http://localhost:8080/pywb/extra/path/http://example.com/filename.html') + '&foo2=bar2', '/pywb/', 'http://localhost:8080/pywb/')
>>> do_deprefix('http://example.com/file.html?foo=bar&url=' + quote_plus('http://localhost:8080/pywb/extra/path/http://example.com/filename.html') + '&foo2=bar2', '/pywb/', 'http://localhost:8080/pywb/')
'http://example.com/file.html?foo=bar&url=http://example.com/filename.html&foo2=bar2'
# SchemeOnlyUrlRewriter tests
@ -152,7 +152,8 @@ True
from pywb.rewrite.url_rewriter import UrlRewriter, SchemeOnlyUrlRewriter
import urllib
from six.moves.urllib.parse import quote_plus, unquote_plus
def do_rewrite(rel_url, base_url, prefix, mod=None, full_prefix=None):
rewriter = UrlRewriter(base_url, prefix, full_prefix=full_prefix)
@ -162,7 +163,7 @@ def do_rewrite(rel_url, base_url, prefix, mod=None, full_prefix=None):
def do_deprefix(url, rel_prefix, full_prefix):
rewriter = UrlRewriter(url, rel_prefix, full_prefix)
url = rewriter.deprefix_url()
return urllib.unquote_plus(url)
return unquote_plus(url)
if __name__ == "__main__":

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
ur"""
u"""
# Replay Urls
# ======================
>>> repr(WbUrl('20131010000506/example.com'))
@ -82,9 +82,10 @@ somescheme://test?foo=bar%9F
>>> print(WbUrl.to_uri('/test/foo=bar%9F'))
/test/foo=bar%9F
# SKIP TRUNC
# truncated
>>> print(WbUrl.to_uri('http://' + quote_plus(u'пример.испытание'.encode('utf-8'))[1:]))
http://xn--d0-olcluwd.xn--80akhbyknj4f
#>>> print(WbUrl.to_uri('http://' + quote_plus(to_native_str(u'пример.испытание', 'utf-8'))[1:]))
#http://xn--d0-olcluwd.xn--80akhbyknj4f
# To %-encoded host uri -- instead of punycode, %-encode host
@ -107,7 +108,8 @@ http://%D0%BF%D1%80%D0%B8%D0%BC%D0%B5%D1%80.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0
>>> print(to_uri_pencode('https://xn--e1afmkfd.xn--80akhbyknj4f/foo/bar?abc=def'))
https://%D0%BF%D1%80%D0%B8%D0%BC%D0%B5%D1%80.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5/foo/bar?abc=def
>>> print(to_uri_pencode('http://' + quote_plus(u'пример.испытание'.encode('utf-8'))[1:]))
# SKIP TRUNC
#>>> print(to_uri_pencode('http://' + quote_plus(u'пример.испытание'.encode('utf-8'))[1:]))
http://d0%D1%80%D0%B8%D0%BC%D0%B5%D1%80.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5
# invalid
@ -142,8 +144,9 @@ http://xn--abcd
>>> repr(WbUrl('2014id_///' + quote_plus(u'пример.испытание'.encode('utf-8')) + '/abc'))
"('replay', '2014', 'id_', 'http://xn--e1afmkfd.xn--80akhbyknj4f/abc', '2014id_/http://%D0%BF%D1%80%D0%B8%D0%BC%D0%B5%D1%80.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5/abc')"
# SKIP TRUNC
# invalid: truncated and superfluous '%', ignore invalid (no exception)
>>> repr(WbUrl('2014id_/http://' + quote_plus(u'пример.испытание'.encode('utf-8'))[1:] + '%' + '/abc'))
#>>> repr(WbUrl('2014id_/http://' + quote_plus(u'пример.испытание'.encode('utf-8'))[1:] + '%' + '/abc'))
"('replay', '2014', 'id_', 'http://xn--d0-olcluwd.xn--%-7sbpkb3ampk3g/abc', '2014id_/http://d0%D1%80%D0%B8%D0%BC%D0%B5%D1%80.%D0%B8%D1%81%D0%BF%D1%8B%D1%82%D0%B0%D0%BD%D0%B8%D0%B5%25/abc')"
@ -231,9 +234,11 @@ Exception: ('Invalid WbUrl: ', '')
"""
from pywb.rewrite.wburl import WbUrl
from urllib import quote_plus, unquote_plus
from six.moves.urllib.parse import quote_plus, unquote_plus
from StringIO import StringIO
from pywb.utils.loaders import to_native_str
from io import StringIO
def to_uri_pencode(url):

View File

@ -118,11 +118,12 @@ class UrlRewriter(object):
return "UrlRewriter('{0}', '{1}')".format(self.wburl, self.prefix)
@staticmethod
def urljoin(orig_url, url):
def urljoin(orig_url, url): # pragma: no cover
new_url = urljoin(orig_url, url)
if '../' not in new_url:
return new_url
# only needed in py2 as py3 urljoin resolves '../'
parts = urlsplit(new_url)
scheme, netloc, path, query, frag = parts

View File

@ -44,6 +44,8 @@ import six
from six.moves.urllib.parse import urlsplit, urlunsplit
from six.moves.urllib.parse import quote_plus, quote, unquote_plus
from pywb.utils.loaders import to_native_str
#=================================================================
class BaseWbUrl(object):
@ -109,10 +111,11 @@ class WbUrl(BaseWbUrl):
return url
parts = urlsplit(url)
domain = parts.netloc
domain = parts.netloc.encode('utf-8')
try:
domain = domain.decode('idna')
domain = domain.encode('utf-8', 'ignore')
if six.PY2:
domain = domain.encode('utf-8', 'ignore')
except:
# likely already encoded, so use as is
pass
@ -134,9 +137,11 @@ class WbUrl(BaseWbUrl):
"""
parts = WbUrl.FIRST_PATH.split(url, 1)
sep = url[len(parts[0])] if len(parts) > 1 else None
scheme_dom = unquote_plus(parts[0])
if isinstance(scheme_dom, str):
if six.PY2 and isinstance(scheme_dom, six.binary_type):
if scheme_dom == parts[0]:
return url
@ -146,21 +151,26 @@ class WbUrl(BaseWbUrl):
domain = scheme_dom[-1]
try:
domain = domain.encode('idna')
domain = to_native_str(domain.encode('idna'), 'utf-8')
except UnicodeError:
# the url is invalid and this is probably not a domain
pass
if len(scheme_dom) > 1:
url = scheme_dom[0].encode('utf-8') + '/' + domain
url = to_native_str(scheme_dom[0], 'utf-8') + '/' + domain
else:
url = domain
if len(parts) > 1:
if isinstance(parts[1], unicode):
url += '/' + quote(parts[1].encode('utf-8'))
else:
url += '/' + parts[1]
url += sep
rest = parts[1]
try:
rest.encode('ascii')
except UnicodeEncodeError:
rest = quote(to_native_str(rest, 'utf-8'))
url += rest
return url
@ -169,7 +179,7 @@ class WbUrl(BaseWbUrl):
def __init__(self, orig_url):
super(WbUrl, self).__init__()
if isinstance(orig_url, unicode):
if six.PY2 and isinstance(orig_url, six.text_type):
orig_url = orig_url.encode('utf-8')
orig_url = quote(orig_url)

View File

@ -2,7 +2,7 @@
<div>
<table style="text-align: left">
{% for key, val in wbrequest.user_metadata.iteritems() %}
{% for key, val in wbrequest.user_metadata.items() %}
<tr><th>{{ key }}:</th><td>{{ val }}</td>
{% endfor %}
</table>

View File

@ -39,7 +39,8 @@ def canonicalize(url, surt_ordered=True):
"""
try:
key = surt.surt(url)
except Exception as e:
except Exception as e: #pragma: no cover
# doesn't happen with surt from 0.3b
# urn is already canonical, so just use as-is
if url.startswith('urn:'):
return url

View File

@ -46,14 +46,14 @@ def load_yaml_config(config_file):
#=================================================================
def to_native_str(value, encoding='iso-8859-1'):
def to_native_str(value, encoding='iso-8859-1', func=lambda x: x):
if isinstance(value, str):
return value
if six.PY3 and isinstance(value, six.binary_type):
return value.decode(encoding)
elif six.PY2 and isinstance(value, six.text_type):
return value.encode(encoding)
if six.PY3 and isinstance(value, six.binary_type): #pragma: no cover
return func(value.decode(encoding))
elif six.PY2 and isinstance(value, six.text_type): #pragma: no cover
return func(value.encode(encoding))
#=================================================================

View File

@ -64,7 +64,7 @@ class StatusAndHeaders(object):
self.headers[index] = (curr_name, header_dict[name_lower])
del header_dict[name_lower]
for name, value in header_dict.iteritems():
for name, value in six.iteritems(header_dict):
self.headers.append((name, value))
def remove_header(self, name):

View File

@ -266,7 +266,10 @@ def write_multi_cdx_index(output, inputs, **options):
# write to one cdx file
else:
if output == '-':
outfile = sys.stdout
if hasattr(sys.stdout, 'buffer'):
outfile = sys.stdout.buffer
else:
outfile = sys.stdout
else:
outfile = open(output, 'wb')

View File

@ -15,6 +15,33 @@ class ResolvingLoader(object):
self.no_record_parse = no_record_parse
def __call__(self, cdx, failed_files, cdx_loader, *args, **kwargs):
headers_record, payload_record = self.load_headers_and_payload(cdx, failed_files, cdx_loader)
# Default handling logic when loading http status/headers
# special case: set header to payload if old-style revisit
# with missing header
if not headers_record:
headers_record = payload_record
elif headers_record != payload_record:
# close remainder of stream as this record only used for
# (already parsed) headers
headers_record.stream.close()
# special case: check if headers record is actually empty
# (eg empty revisit), then use headers from revisit
if not headers_record.status_headers.headers:
headers_record = payload_record
if not headers_record or not payload_record:
raise ArchiveLoadFailed('Could not load ' + str(cdx))
# ensure status line is valid from here
headers_record.status_headers.validate_statusline('204 No Content')
return (headers_record.status_headers, payload_record.stream)
def load_headers_and_payload(self, cdx, failed_files, cdx_loader):
"""
Resolve headers and payload for a given capture
In the simple case, headers and payload are in the same record.
@ -53,27 +80,8 @@ class ResolvingLoader(object):
elif (has_orig):
payload_record = self._resolve_path_load(cdx, True, failed_files)
# special case: set header to payload if old-style revisit
# with missing header
if not headers_record:
headers_record = payload_record
elif headers_record != payload_record:
# close remainder of stream as this record only used for
# (already parsed) headers
headers_record.stream.close()
return headers_record, payload_record
# special case: check if headers record is actually empty
# (eg empty revisit), then use headers from revisit
if not headers_record.status_headers.headers:
headers_record = payload_record
if not headers_record or not payload_record:
raise ArchiveLoadFailed('Could not load ' + str(cdx))
# ensure status line is valid from here
headers_record.status_headers.validate_statusline('204 No Content')
return (headers_record.status_headers, payload_record.stream)
def _resolve_path_load(self, cdx, is_original, failed_files):
"""
@ -109,6 +117,9 @@ class ResolvingLoader(object):
if not possible_paths:
continue
if isinstance(possible_paths, str):
possible_paths = [possible_paths]
for path in possible_paths:
any_found = True
try:

View File

@ -235,10 +235,10 @@ def test_sorted_warc_gz():
def cli_lines(cmds):
buff = BytesIO()
orig = sys.stdout
sys.stdout = buff
orig = sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else None
sys.stdout.buffer = buff
main(cmds)
sys.stdout = orig
sys.stdout.buffer = orig
lines = buff.getvalue().rstrip().split(b'\n')
# print first, last, num lines

View File

@ -23,11 +23,8 @@ class CDXAPIHandler(BaseHandler):
cdx_iter = self.index_handler.load_cdx(wbrequest, params)
def to_utf8():
for cdx in cdx_iter:
yield cdx.encode('utf-8')
return WbResponse.text_stream(to_utf8())
return WbResponse.text_stream(cdx_iter,
content_type='text/plain')
@staticmethod
def extract_params_from_wsgi_env(env):

View File

@ -210,7 +210,7 @@ class StaticHandler(BaseHandler):
if 'wsgi.file_wrapper' in wbrequest.env:
reader = wbrequest.env['wsgi.file_wrapper'](data)
else:
reader = iter(lambda: data.read(), '')
reader = iter(lambda: data.read(), b'')
content_type = 'application/octet-stream'
@ -218,9 +218,9 @@ class StaticHandler(BaseHandler):
if guessed[0]:
content_type = guessed[0]
return WbResponse.text_stream(reader,
content_type=content_type,
headers=headers)
return WbResponse.bin_stream(reader,
content_type=content_type,
headers=headers)
except IOError:
raise NotFoundException('Static File Not Found: ' +

View File

@ -59,7 +59,7 @@ class RewriteHandler(SearchPageWbUrlHandler):
except Exception as exc:
import traceback
err_details = traceback.format_exc(exc)
err_details = traceback.format_exc()
print(err_details)
url = wbrequest.wb_url.url
@ -174,7 +174,7 @@ class RewriteHandler(SearchPageWbUrlHandler):
@staticmethod
def create_cache_key(prefix, url):
hash_ = hashlib.md5()
hash_.update(url)
hash_.update(url.encode('utf-8'))
key = hash_.hexdigest()
key = prefix + key
return key

View File

@ -136,7 +136,7 @@ class J2TemplateView(object):
template_result = self.render_to_string(**kwargs)
status = kwargs.get('status', '200 OK')
content_type = kwargs.get('content_type', 'text/html; charset=utf-8')
return WbResponse.text_response(template_result.encode('utf-8'),
return WbResponse.text_response(template_result,
status=status,
content_type=content_type)
@ -217,5 +217,6 @@ class J2HtmlCapturesView(J2TemplateView):
class MementoTimemapView(object):
def render_response(self, wbrequest, cdx_lines, **kwargs):
memento_lines = make_timemap(wbrequest, cdx_lines)
return WbResponse.text_stream(memento_lines,
content_type=LINK_FORMAT)

View File

@ -20,6 +20,6 @@ class PrintReporter:
"""Reporter callback for replay view.
"""
def __call__(self, wbrequest, cdx, response):
print wbrequest
print cdx
print(wbrequest)
print(cdx)
pass

View File

@ -8,7 +8,7 @@ LINK_FORMAT = 'application/link-format'
class MementoMixin(object):
def get_links(self, resp):
return map(lambda x: x.strip(), re.split(', (?![0-9])', resp.headers[LINK]))
return list(map(lambda x: x.strip(), re.split(', (?![0-9])', resp.headers[LINK])))
def make_timemap_link(self, url, coll='pywb'):
format_ = '<http://localhost:80/{2}/timemap/*/{0}>; rel="timemap"; type="{1}"'

View File

@ -15,13 +15,14 @@ class TestExclusionPerms(Perms):
Perm Checker fixture to block a single url for testing
"""
# sample_archive has captures for this URLKEY
URLKEY_EXCLUDED = 'org,iana)/_img/bookmark_icon.ico'
URLKEY_EXCLUDED = b'org,iana)/_img/bookmark_icon.ico'
def allow_url_lookup(self, urlkey):
"""
Return true/false if url (canonicalized url)
should be allowed
"""
print(urlkey)
if urlkey == self.URLKEY_EXCLUDED:
return False

View File

@ -1,6 +1,6 @@
from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from webtest import TestApp
from webtest import TestApp, TestResponse
app = None
testapp = None
@ -12,6 +12,14 @@ def make_app(config_file, pywb_router=create_wb_router):
testapp = TestApp(app)
class Resp(TestResponse):
def __init__(self, *args, **kwargs):
super(Resp, self).__init__(*args, **kwargs)
if self.headers.get('Content-Type'):
self.charset = 'utf-8'
TestApp.RequestClass.ResponseClass = Resp
return app, testapp
def make_setup_module(config, pywb_router=create_wb_router):

View File

@ -8,7 +8,7 @@ import webtest
import time
import threading
from io import BytesIO
from six import StringIO
from pywb.webapp.pywb_init import create_wb_router
from pywb.manager.manager import main
@ -78,7 +78,7 @@ class TestManagedColls(object):
J2TemplateView.shared_jinja_env = None
#@patch('waitress.serve', lambda *args, **kwargs: None)
@patch('BaseHTTPServer.HTTPServer.serve_forever', lambda *args, **kwargs: None)
@patch('six.moves.BaseHTTPServer.HTTPServer.serve_forever', lambda *args, **kwargs: None)
def test_run_cli(self):
""" test new wayback cli interface
test autoindex error before collections inited
@ -144,7 +144,7 @@ class TestManagedColls(object):
# Spurrious file in collections
with open(os.path.join(self.root_dir, 'collections', 'blah'), 'w+b') as fh:
fh.write('foo\n')
fh.write(b'foo\n')
with raises(IOError):
main(['add', 'test', 'non-existent-file.warc.gz'])
@ -228,13 +228,14 @@ class TestManagedColls(object):
a_static = os.path.join(self.root_dir, 'collections', 'test', 'static', 'abc.js')
with open(a_static, 'w+b') as fh:
fh.write('/* Some JS File */')
fh.write(b'/* Some JS File */')
self._create_app()
resp = self.testapp.get('/static/test/abc.js')
assert resp.status_int == 200
assert resp.content_type == 'application/javascript'
assert '/* Some JS File */' in resp.body
resp.charset = 'utf-8'
assert '/* Some JS File */' in resp.text
def test_add_shared_static(self):
""" Test adding shared static file to root static/ dir, check access
@ -242,13 +243,14 @@ class TestManagedColls(object):
a_static = os.path.join(self.root_dir, 'static', 'foo.css')
with open(a_static, 'w+b') as fh:
fh.write('/* Some CSS File */')
fh.write(b'/* Some CSS File */')
self._create_app()
resp = self.testapp.get('/static/__shared/foo.css')
assert resp.status_int == 200
assert resp.content_type == 'text/css'
assert '/* Some CSS File */' in resp.body
resp.charset = 'utf-8'
assert '/* Some CSS File */' in resp.text
def test_add_title_metadata_index_page(self):
""" Test adding title metadata to a collection, test
@ -260,7 +262,8 @@ class TestManagedColls(object):
resp = self.testapp.get('/')
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert '(Collection Title)' in resp.body
resp.charset = 'utf-8'
assert '(Collection Title)' in resp.text
def test_other_metadata_search_page(self):
main(['metadata', 'foo', '--set',
@ -272,16 +275,17 @@ class TestManagedColls(object):
self._create_app()
resp = self.testapp.get('/foo/')
resp.charset = 'utf-8'
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert 'Collection Title' in resp.body
assert 'Collection Title' in resp.text
assert 'desc' in resp.body
assert 'Some Description Text' in resp.body
assert 'desc' in resp.text
assert 'Some Description Text' in resp.text
assert 'other' in resp.body
assert 'custom value' in resp.body
assert 'other' in resp.text
assert 'custom value' in resp.text
def test_custom_template_search(self):
""" Test manually added custom search template search.html
@ -289,13 +293,14 @@ class TestManagedColls(object):
a_static = os.path.join(self.root_dir, 'collections', 'test', 'templates', 'search.html')
with open(a_static, 'w+b') as fh:
fh.write('pywb custom search page')
fh.write(b'pywb custom search page')
self._create_app()
resp = self.testapp.get('/test/')
resp.charset = 'utf-8'
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert 'pywb custom search page' in resp.body
assert 'pywb custom search page' in resp.text
def test_custom_config(self):
""" Test custom created config.yaml which overrides auto settings
@ -304,8 +309,8 @@ class TestManagedColls(object):
"""
config_path = os.path.join(self.root_dir, 'collections', 'test', 'config.yaml')
with open(config_path, 'w+b') as fh:
fh.write('search_html: ./templates/custom_search.html\n')
fh.write('index_paths: ./cdx2/\n')
fh.write(b'search_html: ./templates/custom_search.html\n')
fh.write(b'index_paths: ./cdx2/\n')
custom_search = os.path.join(self.root_dir, 'collections', 'test',
'templates', 'custom_search.html')
@ -314,17 +319,18 @@ class TestManagedColls(object):
main(['metadata', 'test', '--set', 'some=value'])
with open(custom_search, 'w+b') as fh:
fh.write('config.yaml overriden search page: ')
fh.write('{{ wbrequest.user_metadata | tojson }}\n')
fh.write(b'config.yaml overriden search page: ')
fh.write(b'{{ wbrequest.user_metadata | tojson }}\n')
os.rename(os.path.join(self.root_dir, 'collections', 'test', INDEX_DIR),
os.path.join(self.root_dir, 'collections', 'test', 'cdx2'))
self._create_app()
resp = self.testapp.get('/test/')
resp.charset = 'utf-8'
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert 'config.yaml overriden search page: {"some": "value"}' in resp.body
assert 'config.yaml overriden search page: {"some": "value"}' in resp.text
resp = self.testapp.get('/test/20140103030321/http://example.com?example=1')
assert resp.status_int == 200
@ -352,14 +358,15 @@ class TestManagedColls(object):
with open(filename, 'r+b') as fh:
buf = fh.read()
buf = buf.replace('</html>', 'Custom Test Homepage</html>')
buf = buf.replace(b'</html>', b'Custom Test Homepage</html>')
fh.seek(0)
fh.write(buf)
self._create_app()
resp = self.testapp.get('/')
resp.charset = 'utf-8'
assert resp.content_type == 'text/html'
assert 'Custom Test Homepage</html>' in resp.body, resp.body
assert 'Custom Test Homepage</html>' in resp.text, resp.text
@patch('pywb.manager.manager.get_input', lambda x: 'y')
def test_add_template_input_yes(self):
@ -403,15 +410,16 @@ class TestManagedColls(object):
self._create_app()
resp = self.testapp.get('/foo/')
resp.charset = 'utf-8'
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert 'pywb custom search page' not in resp.body
assert 'pywb custom search page' not in resp.text
def test_list_colls(self):
""" Test collection listing, printed to stdout
"""
orig_stdout = sys.stdout
buff = BytesIO()
buff = StringIO()
sys.stdout = buff
try:
@ -458,7 +466,7 @@ class TestManagedColls(object):
assert len(cdxs) == len(cdxjs)
assert all(x.endswith('.cdxj') for x in cdxjs)
with open(os.path.join(migrate_dir, 'iana.cdxj')) as fh:
with open(os.path.join(migrate_dir, 'iana.cdxj'), 'rb') as fh:
cdx = CDXObject(fh.readline())
assert cdx['urlkey'] == 'org,iana)/'
assert cdx['timestamp'] == '20140126200624'
@ -498,11 +506,11 @@ class TestManagedColls(object):
index_file = os.path.join(auto_dir, INDEX_DIR, AUTOINDEX_FILE)
assert os.path.isfile(index_file)
with open(index_file) as fh:
with open(index_file, 'rb') as fh:
index = fh.read()
assert '"example.warc.gz' in index
assert '"sub/example-extra.warc' in index, index
assert b'"example.warc.gz' in index
assert b'"sub/example-extra.warc' in index, index
mtime = os.path.getmtime(index_file)
@ -598,7 +606,7 @@ class TestManagedColls(object):
# CDX a file not a dir
with open(cdx_path, 'w+b') as fh:
fh.write('foo\n')
fh.write(b'foo\n')
with raises(Exception):
self._create_app()

View File

@ -1,7 +1,7 @@
import re
import webtest
from urllib import urlencode
from six.moves.urllib.parse import urlencode
from pywb.cdx.cdxobject import CDXObject
from pywb.apps.cdx_server import application
@ -30,7 +30,7 @@ def test_exact_url(client):
resp = query(client, 'http://www.iana.org/')
assert resp.status_code == 200
assert len(resp.body.splitlines()) == 3, resp.body
assert len(resp.text.splitlines()) == 3, resp.text
#================================================================
@ -41,9 +41,9 @@ def test_exact_url_json(client):
resp = query(client, 'http://www.iana.org/', output='json')
assert resp.status_code == 200
lines = resp.body.splitlines()
assert len(lines) == 3, resp.body
assert len(map(json.loads, lines)) == 3
lines = resp.text.splitlines()
assert len(lines) == 3, resp.text
assert len(list(map(json.loads, lines))) == 3
#================================================================
def test_prefix_match(client):
@ -52,11 +52,11 @@ def test_prefix_match(client):
"""
resp = query(client, 'http://www.iana.org/', matchType='prefix')
print resp.body.splitlines()
print(resp.text.splitlines())
assert resp.status_code == 200
suburls = 0
for l in resp.body.splitlines():
for l in resp.text.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
@ -74,7 +74,7 @@ def test_filters(client):
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
for l in resp.body.splitlines():
for l in resp.text.splitlines():
fields = l.split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[3] == 'warc/revisit'
@ -89,7 +89,7 @@ def test_limit(client):
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
@ -102,7 +102,7 @@ def test_limit(client):
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
@ -120,7 +120,7 @@ def test_fields(client):
assert resp.status_code == 200
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
@ -141,7 +141,7 @@ def test_fields_json(client):
assert resp.status_code == 200
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = json.loads(cdx)
@ -189,7 +189,7 @@ def test_resolveRevisits(client):
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
originals = {}
for cdx in cdxes:
fields = cdx.split(' ')
@ -221,7 +221,7 @@ def test_resolveRevisits_orig_fields(client):
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.body.splitlines()
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 4

View File

@ -2,9 +2,9 @@ import webtest
from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from memento_fixture import *
from .memento_fixture import *
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_frames.yaml')
@ -28,8 +28,8 @@ class TestMementoFrameInverse(MementoMixin, BaseIntegration):
assert '<http://localhost:80/pywb/mp_/http://www.iana.org/>; rel="timegate"' in links
# Body
assert '<iframe ' in resp.body
assert '/pywb/20140127171238mp_/http://www.iana.org/' in resp.body, resp.body
assert '<iframe ' in resp.text
assert '/pywb/20140127171238mp_/http://www.iana.org/' in resp.text, resp.text
def test_inner_replay(self):
resp = self.testapp.get('/pywb/20140127171238mp_/http://www.iana.org/')
@ -49,7 +49,7 @@ class TestMementoFrameInverse(MementoMixin, BaseIntegration):
assert '<http://localhost:80/pywb/mp_/http://www.iana.org/>; rel="timegate"' in links
# Body
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert 'new _WBWombat' in resp.body, resp.body
assert '/pywb/20140127171238mp_/http://www.iana.org/time-zones"' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
assert 'new _WBWombat' in resp.text, resp.text
assert '/pywb/20140127171238mp_/http://www.iana.org/time-zones"' in resp.text

View File

@ -2,7 +2,7 @@ from pytest import raises
from pywb.cdx.cdxobject import CDXObject
from pywb.utils.timeutils import timestamp_now
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config.yaml')
@ -24,12 +24,12 @@ class TestWbIntegration(BaseIntegration):
def test_home(self):
resp = self.testapp.get('/')
self._assert_basic_html(resp)
assert '/pywb' in resp.body
assert '/pywb' in resp.text
def test_pywb_root(self):
resp = self.testapp.get('/pywb/')
self._assert_basic_html(resp)
assert 'Search' in resp.body
assert 'Search' in resp.text
def test_pywb_root_head(self):
resp = self.testapp.head('/pywb/')
@ -71,7 +71,7 @@ class TestWbIntegration(BaseIntegration):
# query with no results
resp = self.testapp.get('/pywb/*/http://not-exist.example.com')
self._assert_basic_html(resp)
assert 'No captures found' in resp.body, resp.body
assert 'No captures found' in resp.text, resp.text
assert len(resp.html.find_all('tr')) == 0
def test_cdx_query(self):
@ -80,71 +80,71 @@ class TestWbIntegration(BaseIntegration):
assert '20140127171238 http://www.iana.org/ warc/revisit - OSSAPWJ23L56IYVRW3GFEAR4MCJMGPTB' in resp
# check for 3 cdx lines (strip final newline)
actual_len = len(str(resp.body).rstrip().split('\n'))
actual_len = len(str(resp.text).rstrip().split('\n'))
assert actual_len == 3, actual_len
def test_replay_top_frame(self):
resp = self.testapp.get('/pywb/20140127171238tf_/http://www.iana.org/')
assert '<iframe ' in resp.body
assert '/pywb/20140127171238/http://www.iana.org/' in resp.body, resp.body
assert '<iframe ' in resp.text
assert '/pywb/20140127171238/http://www.iana.org/' in resp.text, resp.text
def test_replay_content(self):
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert 'new _WBWombat' in resp.body, resp.body
assert '/pywb/20140127171238/http://www.iana.org/time-zones"' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
assert 'new _WBWombat' in resp.text, resp.text
assert '/pywb/20140127171238/http://www.iana.org/time-zones"' in resp.text
def test_replay_non_frame_content(self):
resp = self.testapp.get('/pywb-nonframe/20140127171238/http://www.iana.org/')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb-nonframe/20140127171238/http://www.iana.org/time-zones"' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-nonframe/20140127171238/http://www.iana.org/time-zones"' in resp.text
def test_replay_non_surt(self):
resp = self.testapp.get('/pywb-nosurt/20140103030321/http://example.com?example=1')
self._assert_basic_html(resp)
assert '"20140103030321"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb-nosurt/20140103030321/http://www.iana.org/domains/example' in resp.body
assert '"20140103030321"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-nosurt/20140103030321/http://www.iana.org/domains/example' in resp.text
def test_replay_cdxj(self):
resp = self.testapp.get('/pywb-cdxj/20140103030321/http://example.com?example=1')
self._assert_basic_html(resp)
assert '"20140103030321"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb-cdxj/20140103030321/http://www.iana.org/domains/example' in resp.body
assert '"20140103030321"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-cdxj/20140103030321/http://www.iana.org/domains/example' in resp.text
def test_replay_cdxj_revisit(self):
resp = self.testapp.get('/pywb-cdxj/20140103030341/http://example.com?example=1')
self._assert_basic_html(resp)
assert '"20140103030341"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb-cdxj/20140103030341/http://www.iana.org/domains/example' in resp.body
assert '"20140103030341"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-cdxj/20140103030341/http://www.iana.org/domains/example' in resp.text
def test_zero_len_revisit(self):
resp = self.testapp.get('/pywb/20140603030341/http://example.com?example=2')
self._assert_basic_html(resp)
assert '"20140603030341"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb/20140603030341/http://www.iana.org/domains/example' in resp.body
assert '"20140603030341"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb/20140603030341/http://www.iana.org/domains/example' in resp.text
def test_replay_url_agnostic_revisit(self):
resp = self.testapp.get('/pywb/20130729195151/http://www.example.com/')
self._assert_basic_html(resp)
assert '"20130729195151"' in resp.body
assert 'wb.js' in resp.body
assert '/pywb/20130729195151/http://www.iana.org/domains/example"' in resp.body
assert '"20130729195151"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb/20130729195151/http://www.iana.org/domains/example"' in resp.text
def test_video_info_not_found(self):
# not actually archived, but ensure video info path is tested
@ -155,7 +155,7 @@ class TestWbIntegration(BaseIntegration):
resp = self.testapp.get('/pywb/20140127171239cdx_/http://www.iana.org/_css/2013.1/print.css')
self._assert_basic_text(resp)
lines = resp.body.rstrip().split('\n')
lines = resp.text.rstrip().split('\n')
assert len(lines) == 17
assert lines[0].startswith('org,iana)/_css/2013.1/print.css 20140127171239')
@ -164,25 +164,25 @@ class TestWbIntegration(BaseIntegration):
resp = self.testapp.get('/pywb/20140126201054bn_/http://www.iana.org/domains/reserved')
# wb.js header insertion
assert 'wb.js' in resp.body
assert 'wb.js' in resp.text
# no wombat present
assert '_WBWombat' not in resp.body
assert '_WBWombat' not in resp.text
# url not rewritten
#assert '"http://www.iana.org/domains/example"' in resp.body
assert '"/_css/2013.1/screen.css"' in resp.body
#assert '"http://www.iana.org/domains/example"' in resp.text
assert '"/_css/2013.1/screen.css"' in resp.text
def test_replay_identity_1(self):
resp = self.testapp.get('/pywb/20140127171251id_/http://example.com')
# no wb header insertion
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
assert resp.content_length == 1270, resp.content_length
# original unrewritten url present
assert '"http://www.iana.org/domains/example"' in resp.body
assert '"http://www.iana.org/domains/example"' in resp.text
def test_replay_range_cache_content(self):
headers = [('Range', 'bytes=0-200')]
@ -193,7 +193,7 @@ class TestWbIntegration(BaseIntegration):
assert resp.headers['Content-Range'] == 'bytes 0-200/1270', resp.headers['Content-Range']
assert resp.content_length == 201, resp.content_length
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
def test_replay_content_ignore_range(self):
headers = [('Range', 'bytes=0-200')]
@ -206,7 +206,7 @@ class TestWbIntegration(BaseIntegration):
assert resp.content_length == 1270, resp.content_length
# identity, no header insertion
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
def test_replay_range_cache_content_bound_end(self):
headers = [('Range', 'bytes=10-10000')]
@ -216,9 +216,9 @@ class TestWbIntegration(BaseIntegration):
assert resp.headers['Accept-Ranges'] == 'bytes'
assert resp.headers['Content-Range'] == 'bytes 10-1269/1270', resp.headers['Content-Range']
assert resp.content_length == 1260, resp.content_length
assert len(resp.body) == resp.content_length
assert len(resp.text) == resp.content_length
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
def test_replay_redir_no_cache(self):
headers = [('Range', 'bytes=10-10000')]
@ -231,24 +231,24 @@ class TestWbIntegration(BaseIntegration):
resp = self.testapp.get('/pywb/20140216050221id_/http://arc.gz.test.example.com')
# no wb header insertion
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
# original unrewritten url present
assert '"http://www.iana.org/domains/example"' in resp.body
assert '"http://www.iana.org/domains/example"' in resp.text
def test_replay_identity_2_arc(self):
resp = self.testapp.get('/pywb/20140216050221id_/http://arc.test.example.com')
# no wb header insertion
assert 'wb.js' not in resp.body
assert 'wb.js' not in resp.text
# original unrewritten url present
assert '"http://www.iana.org/domains/example"' in resp.body
assert '"http://www.iana.org/domains/example"' in resp.text
def test_replay_content_length_1(self):
# test larger file, rewritten file (svg!)
resp = self.testapp.get('/pywb/20140126200654/http://www.iana.org/_img/2013.1/rir-map.svg')
assert resp.headers['Content-Length'] == str(len(resp.body))
assert resp.headers['Content-Length'] == str(len(resp.text))
def test_replay_css_mod(self):
resp = self.testapp.get('/pywb/20140127171239cs_/http://www.iana.org/_css/2013.1/screen.css')
@ -274,10 +274,10 @@ class TestWbIntegration(BaseIntegration):
assert resp.status_int == 200
self._assert_basic_html(resp)
assert '"20140127171237"' in resp.body
assert '"20140127171237"' in resp.text
# actual timestamp set in JS
assert 'timestamp = "20140127171238"' in resp.body
assert '/pywb-non-exact/20140127171237/http://www.iana.org/about/' in resp.body
assert 'timestamp = "20140127171238"' in resp.text
assert '/pywb-non-exact/20140127171237/http://www.iana.org/about/' in resp.text
def test_redirect_latest_replay(self):
resp = self.testapp.get('/pywb/http://example.com/')
@ -288,8 +288,8 @@ class TestWbIntegration(BaseIntegration):
#check resp
self._assert_basic_html(resp)
assert '"20140127171251"' in resp.body
assert '/pywb/20140127171251/http://www.iana.org/domains/example' in resp.body
assert '"20140127171251"' in resp.text
assert '/pywb/20140127171251/http://www.iana.org/domains/example' in resp.text
def test_redirect_non_exact_latest_replay_ts(self):
resp = self.testapp.get('/pywb-non-exact/http://example.com/')
@ -305,8 +305,8 @@ class TestWbIntegration(BaseIntegration):
#self._assert_basic_html(resp)
# ensure the current ts is present in the links
assert '"{0}"'.format(ts) in resp.body
assert '/pywb-non-exact/http://www.iana.org/domains/example' in resp.body
assert '"{0}"'.format(ts) in resp.text
assert '/pywb-non-exact/http://www.iana.org/domains/example' in resp.text
# ensure ts is current ts
#assert timestamp_now() >= ts, ts
@ -402,13 +402,13 @@ class TestWbIntegration(BaseIntegration):
#resp = self.testapp.post(resp.headers['Location'], {'foo': 'bar', 'test': 'abc'})
assert resp.status_int == 200
assert '"foo": "bar"' in resp.body
assert '"test": "abc"' in resp.body
assert '"foo": "bar"' in resp.text
assert '"test": "abc"' in resp.text
def test_post_2(self):
resp = self.testapp.post('/pywb/20140610001255/http://httpbin.org/post?foo=bar', {'data': '^'})
assert resp.status_int == 200
assert '"data": "^"' in resp.body
assert '"data": "^"' in resp.text
def test_post_invalid(self):
# not json
@ -419,13 +419,13 @@ class TestWbIntegration(BaseIntegration):
# post handled without redirect (since 307 not allowed)
resp = self.testapp.post('/post', {'foo': 'bar', 'test': 'abc'}, headers=[('Referer', 'http://localhost:80/pywb/2014/http://httpbin.org/post')])
assert resp.status_int == 200
assert '"foo": "bar"' in resp.body
assert '"test": "abc"' in resp.body
assert '"foo": "bar"' in resp.text
assert '"test": "abc"' in resp.text
def test_excluded_content(self):
resp = self.testapp.get('/pywb/http://www.iana.org/_img/bookmark_icon.ico', status = 403)
resp = self.testapp.get('/pywb/http://www.iana.org/_img/bookmark_icon.ico', status=403)
assert resp.status_int == 403
assert 'Excluded' in resp.body
assert 'Excluded' in resp.text
def test_replay_not_found(self):
resp = self.testapp.head('/pywb/http://not-exist.example.com', status=404)
@ -452,7 +452,7 @@ class TestWbIntegration(BaseIntegration):
def test_cdx_server_filters(self):
resp = self.testapp.get('/pywb-cdx?url=http://www.iana.org/_css/2013.1/screen.css&filter=mime:warc/revisit&filter=filename:dupes.warc.gz')
self._assert_basic_text(resp)
actual_len = len(resp.body.rstrip().split('\n'))
actual_len = len(resp.text.rstrip().split('\n'))
assert actual_len == 1, actual_len
def test_cdx_server_advanced(self):
@ -460,22 +460,23 @@ class TestWbIntegration(BaseIntegration):
resp = self.testapp.get('/pywb-cdx?url=http://www.iana.org/_css/2013.1/print.css&collapseTime=11&resolveRevisits=true&reverse=true')
# convert back to CDXObject
cdxs = map(CDXObject, resp.body.rstrip().split('\n'))
cdxs = list(map(CDXObject, resp.body.rstrip().split(b'\n')))
assert len(cdxs) == 3, len(cdxs)
# verify timestamps
timestamps = map(lambda cdx: cdx['timestamp'], cdxs)
timestamps = list(map(lambda cdx: cdx['timestamp'], cdxs))
assert timestamps == ['20140127171239', '20140126201054', '20140126200625']
# verify orig filenames (2 revisits, one non)
origfilenames = map(lambda cdx: cdx['orig.filename'], cdxs)
origfilenames = list(map(lambda cdx: cdx['orig.filename'], cdxs))
assert origfilenames == ['iana.warc.gz', 'iana.warc.gz', '-']
def test_error(self):
resp = self.testapp.get('/pywb/?abc', status = 400)
assert resp.status_int == 400
assert 'Invalid Url: http://?abc' in resp.body
# surt() no longer errors on this in 0.3b
#def test_error(self):
# resp = self.testapp.get('/pywb/?abc', status = 400)
# assert resp.status_int == 400
# assert 'Invalid Url: http://?abc' in resp.text
def test_coll_info_json(self):

View File

@ -1,7 +1,7 @@
from SocketServer import ThreadingMixIn
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from six.moves.socketserver import ThreadingMixIn
from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from server_thread import ServerThreadRunner
from .server_thread import ServerThreadRunner
from pywb.webapp.live_rewrite_handler import RewriteHandler
from pywb.webapp.pywb_init import create_wb_router
@ -38,9 +38,9 @@ class ProxyRequest(BaseHTTPRequestHandler):
self.send_header('x-proxy', 'test')
self.send_header('content-length', str(len(buff)))
self.send_header('content-type', 'text/plain')
self.send_header('content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(buff)
self.wfile.write(buff.encode('utf-8'))
self.wfile.close()
def do_PUTMETA(self):
@ -115,11 +115,11 @@ class TestProxyLiveRewriter:
assert len(self.requestlog) == 1
# equal to returned response (echo)
assert self.requestlog[0] == resp.body
assert self.requestlog[0] == resp.text
assert resp.headers['x-archive-orig-x-proxy'] == 'test'
assert resp.body.startswith('GET http://example.com/ HTTP/1.1')
assert 'referer: http://other.example.com' in resp.body
assert resp.text.startswith('GET http://example.com/ HTTP/1.1')
assert 'referer: http://other.example.com' in resp.text.lower()
assert len(self.cache) == 0
@ -135,7 +135,7 @@ class TestProxyLiveRewriter:
assert len(self.requestlog) == 1
# proxied, but without range
assert self.requestlog[0] == resp.body
assert self.requestlog[0] == resp.text
assert resp.headers['x-archive-orig-x-proxy'] == 'test'
assert self.requestlog[0].startswith('GET http://example.com/ HTTP/1.1')
@ -159,7 +159,7 @@ class TestProxyLiveRewriter:
assert len(self.requestlog) == 1
# proxy receives different request than our response
assert self.requestlog[0] != resp.body
assert self.requestlog[0] != resp.text
assert self.requestlog[0].startswith('GET http://example.com/foobar HTTP/1.1')

View File

@ -39,15 +39,16 @@ class TestLiveRewriter:
def test_live_live_post(self):
resp = self.testapp.post('/live/mp_/httpbin.org/post', {'foo': 'bar', 'test': 'abc'})
assert resp.status_int == 200
assert '"foo": "bar"' in resp.body
assert '"test": "abc"' in resp.body
resp.charset = 'utf-8'
assert '"foo": "bar"' in resp.text
assert '"test": "abc"' in resp.text
assert resp.status_int == 200
def test_live_live_frame(self):
resp = self.testapp.get('/live/http://example.com/')
assert resp.status_int == 200
assert '<iframe ' in resp.body
assert 'src="http://localhost:80/live/mp_/http://example.com/"' in resp.body, resp.body
assert '<iframe ' in resp.text
assert 'src="http://localhost:80/live/mp_/http://example.com/"' in resp.text, resp.text
def test_live_invalid(self):
resp = self.testapp.get('/live/mp_/http://abcdef', status=400)
@ -64,4 +65,4 @@ class TestLiveRewriter:
def test_deflate(self):
resp = self.testapp.get('/live/mp_/http://httpbin.org/deflate')
assert '"deflated": true' in resp.body
assert b'"deflated": true' in resp.body

View File

@ -5,9 +5,9 @@ from pywb.framework.wsgi_wrappers import init_app
from pywb.cdx.cdxobject import CDXObject
from pywb.utils.timeutils import timestamp_now
from memento_fixture import *
from .memento_fixture import *
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_memento.yaml')
@ -276,7 +276,8 @@ class TestMemento(MementoMixin, BaseIntegration):
assert resp.status_int == 200
assert resp.content_type == LINK_FORMAT
lines = resp.body.split('\n')
resp.charset = 'utf-8'
lines = resp.text.split('\n')
assert len(lines) == 5
@ -302,7 +303,7 @@ rel="memento"; datetime="Fri, 03 Jan 2014 03:03:41 GMT"'
assert resp.status_int == 200
assert resp.content_type == LINK_FORMAT
lines = resp.body.split('\n')
lines = resp.content.split('\n')
assert len(lines) == 3 + 3
@ -316,7 +317,8 @@ rel="memento"; datetime="Fri, 03 Jan 2014 03:03:41 GMT"'
assert resp.status_int == 200
assert resp.content_type == LINK_FORMAT
lines = resp.body.split('\n')
resp.charset = 'utf-8'
lines = resp.text.split('\n')
assert len(lines) == 3
@ -337,7 +339,8 @@ rel="self"; type="application/link-format"'
assert resp.status_int == 200
assert resp.content_type == LINK_FORMAT
lines = resp.body.split('\n')
resp.charset = 'utf-8'
lines = resp.text.split('\n')
assert len(lines) == 3 + 3

View File

@ -4,7 +4,7 @@ from pywb.perms.perms_handler import create_perms_checker_app
from pywb.perms.perms_handler import ALLOW, BLOCK
from pywb.framework.wsgi_wrappers import init_app
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config.yaml', create_perms_checker_app)
@ -14,7 +14,7 @@ class TestPermsApp(BaseIntegration):
assert resp.content_type == 'application/json'
assert ALLOW in resp.body
assert ALLOW in resp.text
def test_allow_with_timestamp(self):
@ -22,7 +22,7 @@ class TestPermsApp(BaseIntegration):
assert resp.content_type == 'application/json'
assert ALLOW in resp.body
assert ALLOW in resp.text
def test_block_with_timestamp(self):
@ -30,15 +30,15 @@ class TestPermsApp(BaseIntegration):
assert resp.content_type == 'application/json'
assert BLOCK in resp.body
assert BLOCK in resp.text
# no longer 'bad' due since surt 0.3b
#def test_bad_url(self):
# resp = self.testapp.get('/check-access/@#$', expect_errors=True, status = 400)
def test_bad_url(self):
resp = self.testapp.get('/check-access/@#$', expect_errors=True, status = 400)
# assert resp.status_int == 404
assert resp.status_int == 400
assert 'Invalid Url: http://@' in resp.body
# assert 'Invalid Url: http://@' in resp.text
def test_not_found(self):

View File

@ -6,7 +6,9 @@ from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from pywb.cdx.cdxobject import CDXObject
from server_mock import make_setup_module, BaseIntegration
from pywb.utils.loaders import to_native_str
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config.yaml')
@ -22,8 +24,11 @@ class TestProxyHttpAuth(BaseIntegration):
assert resp.content_type == 'text/plain'
assert resp.content_length > 0
assert 'proxy_magic = ""' in resp.body
assert 'wb.js' in resp.body
assert 'proxy_magic = ""' in resp.text
assert 'wb.js' in resp.text
def b64encode(self, string):
return to_native_str(base64.b64encode(string.encode('utf-8')))
# 'Simulating' proxy by settings REQUEST_URI explicitly to http:// url and no SCRIPT_NAME
# would be nice to be able to test proxy more
@ -31,28 +36,28 @@ class TestProxyHttpAuth(BaseIntegration):
resp = self.testapp.get('/x-ignore-this-x', extra_environ = dict(REQUEST_URI = 'http://www.iana.org/domains/idn-tables', SCRIPT_NAME = ''))
self._assert_basic_html(resp)
assert '"20140126201127"' in resp.body
assert '"20140126201127"' in resp.text, resp.text
def test_proxy_replay_auth_filtered(self):
headers = [('Proxy-Authorization', 'Basic ' + base64.b64encode('pywb-filt-2:'))]
headers = [('Proxy-Authorization', 'Basic ' + self.b64encode('pywb-filt-2:'))]
resp = self.testapp.get('/x-ignore-this-x', headers = headers,
extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''))
self._assert_basic_html(resp)
assert '"20140126200624"' in resp.body
assert '"20140126200624"' in resp.text
def test_proxy_replay_auth(self):
headers = [('Proxy-Authorization', 'Basic ' + base64.b64encode('pywb'))]
headers = [('Proxy-Authorization', 'Basic ' + self.b64encode('pywb'))]
resp = self.testapp.get('/x-ignore-this-x', headers = headers,
extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''))
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert '"20140127171238"' in resp.text
def test_proxy_replay_auth_no_coll(self):
headers = [('Proxy-Authorization', 'Basic ' + base64.b64encode('no-such-coll'))]
headers = [('Proxy-Authorization', 'Basic ' + self.b64encode('no-such-coll'))]
resp = self.testapp.get('/x-ignore-this-x', headers = headers,
extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''),
status=407)
@ -60,7 +65,7 @@ class TestProxyHttpAuth(BaseIntegration):
assert resp.status_int == 407
def test_proxy_replay_auth_invalid_1(self):
headers = [('Proxy-Authorization', 'abc' + base64.b64encode('no-such-coll'))]
headers = [('Proxy-Authorization', 'abc' + self.b64encode('no-such-coll'))]
resp = self.testapp.get('/x-ignore-this-x', headers = headers,
extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''),
status=407)

View File

@ -1,7 +1,7 @@
from wsgiref.simple_server import make_server
import requests
from server_thread import ServerThreadRunner
from .server_thread import ServerThreadRunner
#=================================================================

View File

@ -6,9 +6,9 @@ from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from pywb.cdx.cdxobject import CDXObject
from urlparse import urlsplit
from six.moves.urllib.parse import urlsplit
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_proxy_ip.yaml')
@ -18,7 +18,7 @@ class TestProxyIPResolver(BaseIntegration):
assert resp.status_int == 200
assert resp.content_type == 'text/html'
assert resp.content_length > 0
assert 'proxy_magic = ""' in resp.body
assert 'proxy_magic = ""' in resp.text
def _assert_basic_text(self, resp):
assert resp.status_int == 200
@ -35,8 +35,8 @@ class TestProxyIPResolver(BaseIntegration):
resp = self.get_url('http://www.iana.org/')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
def test_proxy_ip_get_defaults(self):
resp = self.get_url('http://info.pywb.proxy/')
@ -76,12 +76,12 @@ class TestProxyIPResolver(BaseIntegration):
resp = self.get_url('http://www.iana.org/', '1.2.3.4')
self._assert_basic_html(resp)
assert '"20140126200624"' in resp.body
assert '"20140126200624"' in resp.text
# defaults for any other ip
resp = self.get_url('http://www.iana.org/', '127.0.0.3')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert '"20140127171238"' in resp.text
def test_proxy_ip_delete_ip(self):
resp = self.get_url('http://info.pywb.proxy/')
@ -100,6 +100,6 @@ class TestProxyIPResolver(BaseIntegration):
def test_proxy_ip_invalid_coll(self):
resp = self.get_url('http://www.iana.org/', status=500)
assert 'Invalid Proxy Collection Specified: invalid' in resp.body
assert 'Invalid Proxy Collection Specified: invalid' in resp.text

View File

@ -6,9 +6,9 @@ from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from pywb.cdx.cdxobject import CDXObject
from urlparse import urlsplit
from six.moves.urllib.parse import urlsplit
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_proxy_ip_redis.yaml')
@ -38,8 +38,8 @@ class TestProxyIPRedisResolver(BaseIntegration):
resp = self.get_url('http://www.iana.org/')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
def test_proxy_ip_get_defaults(self):
resp = self.get_url('http://info.pywb.proxy/')
@ -79,12 +79,12 @@ class TestProxyIPRedisResolver(BaseIntegration):
resp = self.get_url('http://www.iana.org/', '1.2.3.4')
self._assert_basic_html(resp)
assert '"20140126200624"' in resp.body
assert '"20140126200624"' in resp.text
# defaults for any other ip
resp = self.get_url('http://www.iana.org/', '127.0.0.3')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.body
assert '"20140127171238"' in resp.text
def test_proxy_ip_delete_ip(self):
resp = self.get_url('http://info.pywb.proxy/')

View File

@ -6,9 +6,9 @@ from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from pywb.cdx.cdxobject import CDXObject
from urlparse import urlsplit
from six.moves.urllib.parse import urlsplit
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_proxy_no_banner.yaml')
@ -24,7 +24,8 @@ class TestProxyNoBanner(BaseIntegration):
resp = self.get_url('http://www.iana.org/_img/2013.1/icann-logo.svg', server_protocol='HTTP/1.1')
assert resp.content_type == 'image/svg+xml'
assert resp.headers['Transfer-Encoding'] == 'chunked'
assert int(resp.headers['Content-Length']) == len(resp.body)
#assert 'Content-Length' not in resp.headers
#assert int(resp.headers['Content-Length']) == len(resp.body)
def test_proxy_buffered(self):
resp = self.get_url('http://www.iana.org/_img/2013.1/icann-logo.svg', server_protocol='HTTP/1.0')
@ -50,11 +51,11 @@ class TestProxyNoBanner(BaseIntegration):
def test_proxy_html_no_banner(self):
resp = self.get_url('http://www.iana.org/')
assert 'wombat' not in resp.body
assert 'href="/protocols"' in resp.body, resp.body.decode('utf-8')
assert 'wombat' not in resp.text
assert 'href="/protocols"' in resp.text
def test_proxy_html_no_banner_with_prefix(self):
resp = self.get_url('http://www.iana.org/', headers={'Pywb-Rewrite-Prefix': 'http://somehost/'})
assert 'wombat' not in resp.body
assert 'href="http://somehost/mp_/http://www.iana.org/protocols"' in resp.body, resp.body.decode('utf-8')
assert 'wombat' not in resp.text
assert 'href="http://somehost/mp_/http://www.iana.org/protocols"' in resp.text, resp.text

View File

@ -1,6 +1,6 @@
import pytest
from server_thread import ServerThreadRunner
from .server_thread import ServerThreadRunner
from wsgiref.simple_server import make_server
import requests

View File

@ -3,7 +3,7 @@ from pywb.framework.wsgi_wrappers import init_app
from pywb.framework.basehandlers import BaseHandler
from pywb.framework.wbrequestresponse import WbResponse
from server_mock import make_setup_module, BaseIntegration
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config_root_coll.yaml')
@ -25,10 +25,10 @@ class TestMementoFrameInverse(BaseIntegration):
resp = self.testapp.get('/20140127171238/http://www.iana.org/')
# Body
assert '"20140127171238"' in resp.body
assert 'wb.js' in resp.body
assert 'new _WBWombat' in resp.body, resp.body
assert '/20140127171238/http://www.iana.org/time-zones"' in resp.body
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
assert 'new _WBWombat' in resp.text, resp.text
assert '/20140127171238/http://www.iana.org/time-zones"' in resp.text
def test_redir_handler_redir(self):
resp = self.testapp.get('/foo/20140127171238mp_/http://www.iana.org/')
@ -37,5 +37,5 @@ class TestMementoFrameInverse(BaseIntegration):
def test_home_search(self):
resp = self.testapp.get('/')
assert 'Search' in resp.body
assert 'Search' in resp.text