diff --git a/pywb/framework/certauth.py b/pywb/framework/certauth.py index 3037e2d2..35c0c000 100644 --- a/pywb/framework/certauth.py +++ b/pywb/framework/certauth.py @@ -84,7 +84,7 @@ class CertificateAuthority(object): return cert @staticmethod - def generate_ca_root(ca_file, certname=None, overwrite=False): + def generate_ca_root(ca_file, certname, overwrite=False): if not certname: certname = CERT_NAME @@ -179,7 +179,7 @@ class CertificateAuthority(object): #================================================================= -def main(): +def main(args=None): parser = ArgumentParser(description='Cert Auth Cert Maker') parser.add_argument('output_pem_file', help='path to cert .pem file') @@ -198,17 +198,16 @@ def main(): parser.add_argument('-w', '--wildcard_cert', action='store_true', help='add wildcard SAN to host: *., ') - result = parser.parse_args() + result = parser.parse_args(args=args) overwrite = result.force # Create a new signed certificate using specified root if result.use_root: certs_dir = result.certs_dir - wildcard = result.wildcard + wildcard = result.wildcard_cert ca = CertificateAuthority(ca_file=result.use_root, - certs_dir=result.certs_dir, - certname=result.name) + certs_dir=result.certs_dir) created, host_filename = ca.get_cert_for_host(result.output_pem_file, overwrite, wildcard) @@ -217,9 +216,12 @@ def main(): print ('Created new cert "' + host_filename + '" signed by root cert ' + result.use_root) + return 0 + else: print ('Cert "' + host_filename + '" already exists,' + ' use -f to overwrite') + return 1 # Create new root certificate else: @@ -230,9 +232,11 @@ def main(): if created: print 'Created new root cert: "' + result.output_pem_file + '"' + return 0 else: print ('Root cert "' + result.output_pem_file + '" already exists,' + ' use -f to overwrite') + return 1 if __name__ == "__main__": main() diff --git a/pywb/framework/proxy.py b/pywb/framework/proxy.py index e07a531f..2352ed9e 100644 --- a/pywb/framework/proxy.py +++ b/pywb/framework/proxy.py @@ -326,9 +326,6 @@ class ProxyRouter(object): return None elif env['pywb.proxy_req_uri'] == self.CERT_DL_PEM: - if not self.ca: - return None - buff = '' with open(self.ca.ca_file) as fh: buff = fh.read() @@ -339,14 +336,9 @@ class ProxyRouter(object): content_type=content_type) elif env['pywb.proxy_req_uri'] == self.CERT_DL_P12: - if not self.ca: - return None - buff = self.ca.get_root_PKCS12() content_type = 'application/x-pkcs12' return WbResponse.text_response(buff, content_type=content_type) - else: - return None diff --git a/pywb/framework/test/test_certauth.py b/pywb/framework/test/test_certauth.py new file mode 100644 index 00000000..5ca89070 --- /dev/null +++ b/pywb/framework/test/test_certauth.py @@ -0,0 +1,52 @@ +import os +import shutil + +from pywb.framework.certauth import main, CertificateAuthority + +TEST_CA_DIR = './pywb/framework/test/pywb_test_ca_certs' +TEST_CA_ROOT = './pywb/framework/test/pywb_test_ca.pem' + +def test_create_root(): + ret = main([TEST_CA_ROOT, '-n', 'Test Root Cert']) + assert ret == 0 + +def test_create_host_cert(): + ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR]) + assert ret == 0 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + #os.remove(certfile) + +def test_create_wildcard_host_cert_force_overwrite(): + ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR, '-w', '-f']) + assert ret == 0 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + +def test_explicit_wildcard(): + ca = CertificateAuthority(TEST_CA_ROOT, TEST_CA_DIR) + filename = ca.get_wildcard_cert('test.example.proxy') + certfile = os.path.join(TEST_CA_DIR, 'example.proxy.pem') + assert filename == certfile + assert os.path.isfile(certfile) + os.remove(certfile) + +def test_create_already_exists(): + ret = main(['example.com', '-r', TEST_CA_ROOT, '-d', TEST_CA_DIR, '-w']) + assert ret == 1 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + # remove now + os.remove(certfile) + +def test_create_root_already_exists(): + ret = main([TEST_CA_ROOT]) + # not created, already exists + assert ret == 1 + # remove now + os.remove(TEST_CA_ROOT) + +def test_delete_files(): + shutil.rmtree(TEST_CA_DIR) + assert not os.path.isdir(TEST_CA_DIR) + assert not os.path.isfile(TEST_CA_ROOT) diff --git a/tests/test_config_proxy.yaml b/tests/test_config_proxy.yaml index 0ebcec4a..246af87c 100644 --- a/tests/test_config_proxy.yaml +++ b/tests/test_config_proxy.yaml @@ -2,7 +2,7 @@ collections: all: - ./sample_archive/cdx/iana.cdx - ./sample_archive/cdx/dupes.cdx - + - ./sample_archive/cdx/post-test.cdx older: - ./sample_archive/cdx/iana.cdx diff --git a/tests/test_proxy_https.py b/tests/test_proxy_https.py index 940f6e06..58bc9d6c 100644 --- a/tests/test_proxy_https.py +++ b/tests/test_proxy_https.py @@ -23,10 +23,10 @@ def setup_module(): server = ServeThread() server.daemon = True server.start() - - global session + + global session session = requests.Session() - + def teardown_module(): try: @@ -46,7 +46,7 @@ class ServeThread(threading.Thread): self.app = init_app(create_wb_router, load_yaml=True, config_file=TEST_CONFIG) - + # init with port 0 to allow os to pick a port self.httpd = make_server('', 0, self.app) port = self.httpd.socket.getsockname()[1] @@ -72,7 +72,19 @@ class TestHttpsProxy: return self.session.get(url, proxies=server.proxy_dict, verify=TEST_CA_ROOT) - + + def post_url(self, url, data): + global sesh_key + if sesh_key: + self.session.headers.update({'Cookie': '__pywb_proxy_sesh=' + sesh_key}) + self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.pywb.proxy') + #self.session.cookies.set('__pywb_proxy_sesh', sesh_key, domain='.iana.org') + + return self.session.post(url, + data=data, + proxies=server.proxy_dict, + verify=TEST_CA_ROOT) + def test_replay_no_coll(self): resp = self.get_url('https://iana.org/') assert resp.url == 'https://select.pywb.proxy/https://iana.org/' @@ -83,11 +95,11 @@ class TestHttpsProxy: assert resp.url == 'https://iana.org/' assert resp.status_code == 200 assert '20140126200624' in resp.text - + sesh1 = self.session.cookies.get('__pywb_proxy_sesh', domain='.pywb.proxy') sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org') assert sesh1 and sesh1 == sesh2, self.session.cookies - + # store session cookie global sesh_key sesh_key = sesh1 @@ -96,7 +108,7 @@ class TestHttpsProxy: sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org') assert sesh_key == sesh2 - def test_replay_same_coll(self): + def test_replay_same_coll(self): resp = self.get_url('https://iana.org/') assert resp.url == 'https://iana.org/' assert resp.status_code == 200 @@ -108,7 +120,7 @@ class TestHttpsProxy: assert resp.url == 'https://iana.org/' assert resp.status_code == 200 assert '20140127171238' in resp.text - + # verify still same session cookie sesh2 = self.session.cookies.get('__pywb_proxy_sesh', domain='.iana.org') global sesh_key @@ -148,6 +160,15 @@ class TestHttpsProxy: assert resp.url == 'https://example.com/' assert '20140127171251' in resp.text + def test_post_replay_all_coll(self): + resp = self.post_url('https://httpbin.org/post', data={'foo': 'bar', 'test': 'abc'}) + assert resp.url == 'https://httpbin.org/post' + assert 'application/json' in resp.headers['content-type'] + assert resp.status_code == 200 + + #assert 'wbinfo.proxy_magic = "pywb.proxy";' in resp.text + #assert '20140126200624' in resp.text + # Bounce back to select.pywb.proxy due to missing session def test_clear_key(self): # clear session key