1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

test coverage: proxy certauth: add unit tests for certauth cert creation (though not verifying validity yet)

add https proxy post test
This commit is contained in:
Ilya Kreymer 2014-09-06 13:31:10 -07:00
parent eaaefbfd24
commit c7228bf887
5 changed files with 93 additions and 24 deletions

View File

@ -84,7 +84,7 @@ class CertificateAuthority(object):
return cert
@staticmethod
def generate_ca_root(ca_file, certname=None, overwrite=False):
def generate_ca_root(ca_file, certname, overwrite=False):
if not certname:
certname = CERT_NAME
@ -179,7 +179,7 @@ class CertificateAuthority(object):
#=================================================================
def main():
def main(args=None):
parser = ArgumentParser(description='Cert Auth Cert Maker')
parser.add_argument('output_pem_file', help='path to cert .pem file')
@ -198,17 +198,16 @@ def main():
parser.add_argument('-w', '--wildcard_cert', action='store_true',
help='add wildcard SAN to host: *.<host>, <host>')
result = parser.parse_args()
result = parser.parse_args(args=args)
overwrite = result.force
# Create a new signed certificate using specified root
if result.use_root:
certs_dir = result.certs_dir
wildcard = result.wildcard
wildcard = result.wildcard_cert
ca = CertificateAuthority(ca_file=result.use_root,
certs_dir=result.certs_dir,
certname=result.name)
certs_dir=result.certs_dir)
created, host_filename = ca.get_cert_for_host(result.output_pem_file,
overwrite, wildcard)
@ -217,9 +216,12 @@ def main():
print ('Created new cert "' + host_filename +
'" signed by root cert ' +
result.use_root)
return 0
else:
print ('Cert "' + host_filename + '" already exists,' +
' use -f to overwrite')
return 1
# Create new root certificate
else:
@ -230,9 +232,11 @@ def main():
if created:
print 'Created new root cert: "' + result.output_pem_file + '"'
return 0
else:
print ('Root cert "' + result.output_pem_file +
'" already exists,' + ' use -f to overwrite')
return 1
if __name__ == "__main__":
main()

View File

@ -326,9 +326,6 @@ class ProxyRouter(object):
return None
elif env['pywb.proxy_req_uri'] == self.CERT_DL_PEM:
if not self.ca:
return None
buff = ''
with open(self.ca.ca_file) as fh:
buff = fh.read()
@ -339,14 +336,9 @@ class ProxyRouter(object):
content_type=content_type)
elif env['pywb.proxy_req_uri'] == self.CERT_DL_P12:
if not self.ca:
return None
buff = self.ca.get_root_PKCS12()
content_type = 'application/x-pkcs12'
return WbResponse.text_response(buff,
content_type=content_type)
else:
return None

View File

@ -0,0 +1,52 @@
import os
import shutil
from pywb.framework.certauth import main, CertificateAuthority
TEST_CA_DIR = './pywb/framework/test/pywb_test_ca_certs'
TEST_CA_ROOT = './pywb/framework/test/pywb_test_ca.pem'
def test_create_root():
ret = main([TEST_CA_ROOT, '-n', 'Test Root Cert'])
assert ret == 0
def test_create_host_cert():
ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
#os.remove(certfile)
def test_create_wildcard_host_cert_force_overwrite():
ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR, '-w', '-f'])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
def test_explicit_wildcard():
ca = CertificateAuthority(TEST_CA_ROOT, TEST_CA_DIR)
filename = ca.get_wildcard_cert('test.example.proxy')
certfile = os.path.join(TEST_CA_DIR, 'example.proxy.pem')
assert filename == certfile
assert os.path.isfile(certfile)
os.remove(certfile)
def test_create_already_exists():
ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR, '-w'])
assert ret == 1
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
# remove now
os.remove(certfile)
def test_create_root_already_exists():
ret = main([TEST_CA_ROOT])
# not created, already exists
assert ret == 1
# remove now
os.remove(TEST_CA_ROOT)
def test_delete_files():
shutil.rmtree(TEST_CA_DIR)
assert not os.path.isdir(TEST_CA_DIR)
assert not os.path.isfile(TEST_CA_ROOT)

View File

@ -2,7 +2,7 @@ collections:
all:
- ./sample_archive/cdx/iana.cdx
- ./sample_archive/cdx/dupes.cdx
- ./sample_archive/cdx/post-test.cdx
older:
- ./sample_archive/cdx/iana.cdx

View File

@ -23,10 +23,10 @@ def setup_module():
server = ServeThread()
server.daemon = True
server.start()
global session
global session
session = requests.Session()
def teardown_module():
try:
@ -46,7 +46,7 @@ class ServeThread(threading.Thread):
self.app = init_app(create_wb_router,
load_yaml=True,
config_file=TEST_CONFIG)
# init with port 0 to allow os to pick a port
self.httpd = make_server('', 0, self.app)
port = self.httpd.socket.getsockname()[1]
@ -72,7 +72,19 @@ class TestHttpsProxy:
return self.session.get(url,
proxies=server.proxy_dict,
verify=TEST_CA_ROOT)
def post_url(self, url, data):
global sesh_key
if sesh_key:
self.session.headers.update({'Cookie': '__pywb_proxy_sesh=' + sesh_key})
self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.pywb.proxy')
#self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.iana.org')
return self.session.post(url,
data=data,
proxies=server.proxy_dict,
verify=TEST_CA_ROOT)
def test_replay_no_coll(self):
resp = self.get_url('https://iana.org/')
assert resp.url == 'https://select.pywb.proxy/https://iana.org/'
@ -83,11 +95,11 @@ class TestHttpsProxy:
assert resp.url == 'https://iana.org/'
assert resp.status_code == 200
assert '20140126200624' in resp.text
sesh1 = self.session.cookies.get('__pywb_proxy_sesh', domain='.pywb.proxy')
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
assert sesh1 and sesh1 == sesh2, self.session.cookies
# store session cookie
global sesh_key
sesh_key = sesh1
@ -96,7 +108,7 @@ class TestHttpsProxy:
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
assert sesh_key == sesh2
def test_replay_same_coll(self):
def test_replay_same_coll(self):
resp = self.get_url('https://iana.org/')
assert resp.url == 'https://iana.org/'
assert resp.status_code == 200
@ -108,7 +120,7 @@ class TestHttpsProxy:
assert resp.url == 'https://iana.org/'
assert resp.status_code == 200
assert '20140127171238' in resp.text
# verify still same session cookie
sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org')
global sesh_key
@ -148,6 +160,15 @@ class TestHttpsProxy:
assert resp.url == 'https://example.com/'
assert '20140127171251' in resp.text
def test_post_replay_all_coll(self):
resp = self.post_url('https://httpbin.org/post', data={'foo': 'bar', 'test': 'abc'})
assert resp.url == 'https://httpbin.org/post'
assert 'application/json' in resp.headers['content-type']
assert resp.status_code == 200
#assert 'wbinfo.proxy_magic = "pywb.proxy";' in resp.text
#assert '20140126200624' in resp.text
# Bounce back to select.pywb.proxy due to missing session
def test_clear_key(self):
# clear session key