mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
proxy: extensive https and cookie resolver testing
move extract_cookie utility to wbrequest fix head_insert 'wbinfo.proxy_magic' entry
This commit is contained in:
parent
5381c00c2a
commit
6b476d83de
@ -6,7 +6,7 @@ import urlparse
|
||||
import base64
|
||||
import os
|
||||
|
||||
try:
|
||||
try: # pragma: no coverage
|
||||
import uwsgi
|
||||
uwsgi_cache = True
|
||||
except ImportError:
|
||||
@ -14,7 +14,7 @@ except ImportError:
|
||||
|
||||
|
||||
#=================================================================
|
||||
class UwsgiCache(object):
|
||||
class UwsgiCache(object): # pragma: no coverage
|
||||
def __setitem__(self, item, value):
|
||||
uwsgi.cache_update(item, value)
|
||||
|
||||
@ -120,8 +120,7 @@ class ProxyAuthResolver(BaseCollResolver):
|
||||
|
||||
|
||||
#=================================================================
|
||||
# Experimental CookieResolver
|
||||
class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
class CookieResolver(BaseCollResolver):
|
||||
|
||||
SESH_COOKIE_NAME = '__pywb_proxy_sesh'
|
||||
|
||||
@ -137,7 +136,7 @@ class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
|
||||
self.extra_headers = config.get('extra_headers')
|
||||
|
||||
if uwsgi_cache:
|
||||
if uwsgi_cache: # pragma: no cover
|
||||
self.cache = UwsgiCache()
|
||||
else:
|
||||
self.cache = {}
|
||||
@ -193,7 +192,7 @@ class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
return self.make_redir_response(wb_url.url)
|
||||
|
||||
elif server_name.endswith(self.set_prefix):
|
||||
old_sesh_id = self.extract_client_cookie(env, self.cookie_name)
|
||||
old_sesh_id = WbRequest.extract_client_cookie(env, self.cookie_name)
|
||||
sesh_id = self.create_renew_sesh_id(old_sesh_id)
|
||||
|
||||
if sesh_id != old_sesh_id:
|
||||
@ -222,12 +221,8 @@ class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
return self.make_redir_response(full_url, headers=headers)
|
||||
|
||||
elif 'select.' in server_name:
|
||||
if not self.proxy_select_view:
|
||||
return WbResponse.text_response('select text for ' + path_url)
|
||||
|
||||
coll, ts, sesh_id = self.get_coll(env)
|
||||
|
||||
#scheme = env['pywb.proxy_scheme'] + '://'
|
||||
route_temp = '-set.' + self.magic_name + '/' + path_url
|
||||
|
||||
try:
|
||||
@ -287,7 +282,7 @@ class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
del self.cache[sesh_id + ':t']
|
||||
|
||||
def get_coll(self, env):
|
||||
sesh_id = self.extract_client_cookie(env, self.cookie_name)
|
||||
sesh_id = WbRequest.extract_client_cookie(env, self.cookie_name)
|
||||
|
||||
coll = None
|
||||
ts = None
|
||||
@ -318,26 +313,4 @@ class CookieResolver(BaseCollResolver): # pragma: no cover
|
||||
|
||||
return WbResponse.redir_response(url, headers=headers)
|
||||
|
||||
@staticmethod
|
||||
def extract_client_cookie(env, cookie_name):
|
||||
cookie_header = env.get('HTTP_COOKIE')
|
||||
if not cookie_header:
|
||||
return None
|
||||
|
||||
# attempt to extract cookie_name only
|
||||
inx = cookie_header.find(cookie_name)
|
||||
if inx < 0:
|
||||
return None
|
||||
|
||||
end_inx = cookie_header.find(';', inx)
|
||||
if end_inx > 0:
|
||||
value = cookie_header[inx:end_inx]
|
||||
else:
|
||||
value = cookie_header[inx:]
|
||||
|
||||
value = value.split('=')
|
||||
if len(value) < 2:
|
||||
return None
|
||||
|
||||
value = value[1].strip()
|
||||
return value
|
||||
|
@ -37,6 +37,18 @@
|
||||
>>> req_from_uri('/web/2010/example.com', {'wsgi.url_scheme': 'http', 'HTTP_HOST': 'localhost:8080'}).extract_referrer_wburl_str()
|
||||
|
||||
|
||||
# cookie extract tests
|
||||
>>> WbRequest.extract_client_cookie(dict(HTTP_COOKIE='a=b; c=d'), 'a')
|
||||
'b'
|
||||
|
||||
>>> WbRequest.extract_client_cookie(dict(HTTP_COOKIE='a=b; c=d'), 'c')
|
||||
'd'
|
||||
|
||||
>>> WbRequest.extract_client_cookie(dict(HTTP_COOKIE='a=b; c=d'), 'x')
|
||||
|
||||
>>> WbRequest.extract_client_cookie({}, 'y')
|
||||
|
||||
|
||||
# WbResponse Tests
|
||||
# =================
|
||||
>>> WbResponse.text_response('Test')
|
||||
|
@ -134,6 +134,30 @@ class WbRequest(object):
|
||||
if post_query:
|
||||
self.wb_url.url = append_post_query(self.wb_url.url, post_query)
|
||||
|
||||
@staticmethod
|
||||
def extract_client_cookie(env, cookie_name):
|
||||
cookie_header = env.get('HTTP_COOKIE')
|
||||
if not cookie_header:
|
||||
return None
|
||||
|
||||
# attempt to extract cookie_name only
|
||||
inx = cookie_header.find(cookie_name)
|
||||
if inx < 0:
|
||||
return None
|
||||
|
||||
end_inx = cookie_header.find(';', inx)
|
||||
if end_inx > 0:
|
||||
value = cookie_header[inx:end_inx]
|
||||
else:
|
||||
value = cookie_header[inx:]
|
||||
|
||||
value = value.split('=')
|
||||
if len(value) < 2:
|
||||
return None
|
||||
|
||||
value = value[1].strip()
|
||||
return value
|
||||
|
||||
|
||||
#=================================================================
|
||||
class WbResponse(object):
|
||||
|
@ -18,7 +18,8 @@
|
||||
wbinfo.mod = "{{ wbrequest.wb_url.mod }}";
|
||||
wbinfo.canon_url = "{{ canon_url }}";
|
||||
wbinfo.is_live = {{ "true" if cdx.is_live else "false" }};
|
||||
wbinfo.is_proxy_mode = {{ "true" if wbrequest.options.is_proxy else "false" }};
|
||||
wbinfo.coll = "{{ wbrequest.coll }}";
|
||||
wbinfo.proxy_magic = "{{ wbrequest.env.pywb_proxy_magic }}";
|
||||
</script>
|
||||
<script src='{{ wbrequest.host_prefix }}/{{ static_path }}/wb.js'> </script>
|
||||
<link rel='stylesheet' href='{{ wbrequest.host_prefix }}/{{ static_path }}/wb.css'/>
|
||||
|
@ -77,38 +77,3 @@ class TestProxyWb:
|
||||
resp = self.testapp.get('/x-ignore-this-x', headers = headers,
|
||||
extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''),
|
||||
status=407)
|
||||
|
||||
|
||||
class TestProxyCookieWb:
|
||||
TEST_CONFIG = 'tests/test_config_proxy.yaml'
|
||||
|
||||
def setup(self):
|
||||
self.app = init_app(create_wb_router,
|
||||
load_yaml=True,
|
||||
config_file=self.TEST_CONFIG)
|
||||
|
||||
self.testapp = webtest.TestApp(self.app)
|
||||
|
||||
def _assert_basic_html(self, resp):
|
||||
assert resp.status_int == 200
|
||||
assert resp.content_type == 'text/html'
|
||||
assert resp.content_length > 0
|
||||
|
||||
def _assert_basic_text(self, resp):
|
||||
assert resp.status_int == 200
|
||||
assert resp.content_type == 'text/plain'
|
||||
assert resp.content_length > 0
|
||||
|
||||
def test_proxy_cookie_first_select(self):
|
||||
resp = self.testapp.get('/x-ignore-this-x', extra_environ = dict(REQUEST_URI = 'http://www.iana.org/', SCRIPT_NAME = ''))
|
||||
assert resp.headers['Location'] == 'http://auto.pywb.proxy/http://www.iana.org/'
|
||||
assert resp.status_int == 302
|
||||
assert 'Set-Cookie' not in resp.headers
|
||||
|
||||
resp = self.testapp.get('/x-ignore-this-x', extra_environ = dict(REQUEST_URI = 'http://auto.pywb.proxy/http://www.iana.org/', SCRIPT_NAME = ''))
|
||||
assert resp.headers['Location'] == 'http://select.pywb.proxy/http://www.iana.org/'
|
||||
assert resp.status_int == 302
|
||||
assert 'Set-Cookie' not in resp.headers
|
||||
|
||||
#resp = resp.follow()
|
||||
#assert resp.status == 200
|
||||
|
@ -16,13 +16,16 @@ TEST_CA_DIR = './tests/pywb_test_certs'
|
||||
TEST_CA_ROOT = './tests/pywb_test_ca.pem'
|
||||
|
||||
server = None
|
||||
proxy_str = None
|
||||
sesh_key = None
|
||||
|
||||
def setup_module():
|
||||
global server
|
||||
server = ServeThread()
|
||||
server.daemon = True
|
||||
server.start()
|
||||
|
||||
global session
|
||||
session = requests.Session()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
@ -55,43 +58,140 @@ class ServeThread(threading.Thread):
|
||||
self.httpd.serve_forever()
|
||||
|
||||
|
||||
def test_replay():
|
||||
#cookie_val = CookieResolver.SESH_COOKIE_NAME + '=
|
||||
resp = requests.get('https://iana.org/',
|
||||
proxies=server.proxy_dict,
|
||||
# headers={'Cookie': cookie_val},
|
||||
verify=TEST_CA_ROOT)
|
||||
assert resp.status_code == 200
|
||||
class TestHttpsProxy:
|
||||
def setup(self):
|
||||
self.session = requests.Session()
|
||||
|
||||
def get_url(self, url, headers=None):
|
||||
global sesh_key
|
||||
if sesh_key:
|
||||
self.session.headers.update({'Cookie': '__pywb_proxy_sesh=' + sesh_key})
|
||||
self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.pywb.proxy')
|
||||
#self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.iana.org')
|
||||
|
||||
def test_replay_static():
|
||||
resp = requests.get('https://pywb.proxy/static/default/wb.js',
|
||||
proxies=server.proxy_dict,
|
||||
verify=TEST_CA_ROOT)
|
||||
assert resp.status_code == 200
|
||||
found = u'function init_banner' in resp.text
|
||||
assert found, resp.text
|
||||
return self.session.get(url,
|
||||
proxies=server.proxy_dict,
|
||||
verify=TEST_CA_ROOT)
|
||||
|
||||
def test_replay_no_coll(self):
|
||||
resp = self.get_url('https://iana.org/')
|
||||
assert resp.url == 'https://select.pywb.proxy/https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_replay_dl_page():
|
||||
resp = requests.get('https://pywb.proxy/',
|
||||
proxies=server.proxy_dict,
|
||||
verify=TEST_CA_ROOT)
|
||||
assert resp.status_code == 200
|
||||
assert 'text/html' in resp.headers['content-type']
|
||||
found = u'Download' in resp.text
|
||||
assert found, resp.text
|
||||
def test_replay_set_older_coll(self):
|
||||
resp = self.get_url('https://older-set.pywb.proxy/https://iana.org/')
|
||||
assert resp.url == 'https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert '20140126200624' in resp.text
|
||||
|
||||
sesh1 = self.session.cookies.get('__pywb_proxy_sesh', domain='.pywb.proxy')
|
||||
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
|
||||
assert sesh1 and sesh1 == sesh2, self.session.cookies
|
||||
|
||||
# store session cookie
|
||||
global sesh_key
|
||||
sesh_key = sesh1
|
||||
|
||||
def test_dl_pem():
|
||||
resp = requests.get('https://pywb.proxy/pywb-ca.pem',
|
||||
proxies=server.proxy_dict,
|
||||
verify=TEST_CA_ROOT)
|
||||
global sesh_key
|
||||
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
|
||||
assert sesh_key == sesh2
|
||||
|
||||
assert resp.headers['content-type'] == 'application/x-x509-ca-cert'
|
||||
def test_replay_same_coll(self):
|
||||
resp = self.get_url('https://iana.org/')
|
||||
assert resp.url == 'https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert 'wbinfo.proxy_magic = "pywb.proxy";' in resp.text
|
||||
assert '20140126200624' in resp.text
|
||||
|
||||
def test_dl_p12():
|
||||
resp = requests.get('https://pywb.proxy/pywb-ca.p12',
|
||||
proxies=server.proxy_dict,
|
||||
verify=TEST_CA_ROOT)
|
||||
def test_replay_set_change_coll(self):
|
||||
resp = self.get_url('https://all-set.pywb.proxy/https://iana.org/')
|
||||
assert resp.url == 'https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert '20140127171238' in resp.text
|
||||
|
||||
# verify still same session cookie
|
||||
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
|
||||
global sesh_key
|
||||
assert sesh_key == sesh2
|
||||
|
||||
assert resp.headers['content-type'] == 'application/x-pkcs12'
|
||||
def test_query(self):
|
||||
resp = self.get_url('https://query.pywb.proxy/*/https://iana.org/')
|
||||
assert resp.url == 'https://query.pywb.proxy/*/https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert 'text/html' in resp.headers['content-type']
|
||||
assert '20140126200624' in resp.text
|
||||
assert '20140127171238' in resp.text
|
||||
assert '<b>3</b> captures' in resp.text
|
||||
|
||||
# testing via http here
|
||||
def test_change_timestamp(self):
|
||||
resp = self.get_url('http://query.pywb.proxy/20140126200624/http://iana.org/')
|
||||
assert resp.url == 'http://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert '20140126200624' in resp.text
|
||||
|
||||
def test_change_coll_same_ts(self):
|
||||
resp = self.get_url('https://all-set.pywb.proxy/iana.org/')
|
||||
assert resp.url == 'https://iana.org/'
|
||||
assert resp.status_code == 200
|
||||
assert '20140126200624' in resp.text
|
||||
|
||||
# testing via http here
|
||||
def test_change_latest_ts(self):
|
||||
resp = self.get_url('http://query.pywb.proxy/http://iana.org/?_=1234')
|
||||
assert resp.url == 'http://iana.org/?_=1234'
|
||||
assert resp.status_code == 200
|
||||
assert '20140127171238' in resp.text
|
||||
|
||||
def test_diff_url(self):
|
||||
resp = self.get_url('https://example.com/')
|
||||
assert resp.url == 'https://example.com/'
|
||||
assert '20140127171251' in resp.text
|
||||
|
||||
# Bounce back to select.pywb.proxy due to missing session
|
||||
def test_clear_key(self):
|
||||
# clear session key
|
||||
global sesh_key
|
||||
sesh_key = None
|
||||
|
||||
def test_no_sesh_latest_bounce(self):
|
||||
resp = self.get_url('https://query.pywb.proxy/https://iana.org/')
|
||||
assert resp.url == 'https://select.pywb.proxy/https://iana.org/'
|
||||
|
||||
def test_no_sesh_coll_change_bounce(self):
|
||||
resp = self.get_url('https://auto.pywb.proxy/https://iana.org/')
|
||||
assert resp.url == 'https://select.pywb.proxy/https://iana.org/'
|
||||
|
||||
def test_no_sesh_ts_bounce(self):
|
||||
resp = self.get_url('https://query.pywb.proxy/20140126200624/https://iana.org/')
|
||||
assert resp.url == 'https://select.pywb.proxy/20140126200624/https://iana.org/'
|
||||
|
||||
def test_no_sesh_query_bounce(self):
|
||||
resp = self.get_url('https://query.pywb.proxy/*/https://iana.org/')
|
||||
assert resp.url == 'https://select.pywb.proxy/https://query.pywb.proxy/*/https://iana.org/'
|
||||
|
||||
# static replay
|
||||
def test_replay_static(self):
|
||||
resp = self.get_url('https://pywb.proxy/static/default/wb.js')
|
||||
assert resp.status_code == 200
|
||||
found = u'function init_banner' in resp.text
|
||||
assert found, resp.text
|
||||
|
||||
# download index page and cert downloads
|
||||
def test_replay_dl_page(self):
|
||||
resp = self.get_url('https://pywb.proxy/')
|
||||
assert resp.status_code == 200
|
||||
assert 'text/html' in resp.headers['content-type']
|
||||
found = u'Download' in resp.text
|
||||
assert found, resp.text
|
||||
|
||||
def test_dl_pem(self):
|
||||
resp = self.get_url('https://pywb.proxy/pywb-ca.pem')
|
||||
|
||||
assert resp.headers['content-type'] == 'application/x-x509-ca-cert'
|
||||
|
||||
def test_dl_p12(self):
|
||||
resp = self.get_url('https://pywb.proxy/pywb-ca.p12')
|
||||
|
||||
assert resp.headers['content-type'] == 'application/x-pkcs12'
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user