diff --git a/setup.py b/setup.py index 4573e2f..c08d67b 100755 --- a/setup.py +++ b/setup.py @@ -24,12 +24,12 @@ import sys import setuptools deps = [ - 'certauth==1.1.6', 'warctools>=4.10.0', 'urlcanon>=0.3.0', 'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311', 'urllib3>=1.23', 'requests>=2.0.1', + 'pyopenssl', 'PySocks>=1.6.8', 'cryptography>=2.3,<40', 'idna', diff --git a/tests/test_certauth.py b/tests/test_certauth.py new file mode 100644 index 0000000..61cf191 --- /dev/null +++ b/tests/test_certauth.py @@ -0,0 +1,89 @@ +import os +import shutil + +from warcprox.certauth import main, CertificateAuthority +import tempfile +from OpenSSL import crypto +import datetime +import time + +def setup_module(): + global TEST_CA_DIR + TEST_CA_DIR = tempfile.mkdtemp() + + global TEST_CA_ROOT + TEST_CA_ROOT = os.path.join(TEST_CA_DIR, 'certauth_test_ca.pem') + +def teardown_module(): + shutil.rmtree(TEST_CA_DIR) + assert not os.path.isdir(TEST_CA_DIR) + assert not os.path.isfile(TEST_CA_ROOT) + +def test_create_root(): + ret = main([TEST_CA_ROOT, '-c', 'Test Root Cert']) + assert ret == 0 + +def test_create_host_cert(): + ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com']) + assert ret == 0 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + +def test_create_wildcard_host_cert_force_overwrite(): + ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '--hostname', 'example.com', '-w', '-f']) + assert ret == 0 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + +def test_explicit_wildcard(): + ca = CertificateAuthority(TEST_CA_ROOT, TEST_CA_DIR, 'Test CA') + filename = ca.get_wildcard_cert('test.example.proxy') + certfile = os.path.join(TEST_CA_DIR, 'example.proxy.pem') + assert filename == certfile + assert os.path.isfile(certfile) + os.remove(certfile) + +def test_create_already_exists(): + ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com', '-w']) + assert ret == 1 + certfile = os.path.join(TEST_CA_DIR, 'example.com.pem') + assert os.path.isfile(certfile) + # remove now + os.remove(certfile) + +def test_create_root_already_exists(): + ret = main([TEST_CA_ROOT]) + # not created, already exists + assert ret == 1 + # remove now + os.remove(TEST_CA_ROOT) + +def test_create_root_subdir(): + # create a new cert in a subdirectory + subdir = os.path.join(TEST_CA_DIR, 'subdir') + + ca_file = os.path.join(subdir, 'certauth_test_ca.pem') + + ca = CertificateAuthority(ca_file, subdir, 'Test CA', + cert_not_before=-60 * 60, + cert_not_after=60 * 60 * 24 * 3) + + assert os.path.isdir(subdir) + assert os.path.isfile(ca_file) + + buff = ca.get_root_PKCS12() + assert len(buff) > 0 + + expected_not_before = datetime.datetime.utcnow() - datetime.timedelta(seconds=60 * 60) + expected_not_after = datetime.datetime.utcnow() + datetime.timedelta(seconds=60 * 60 * 24 * 3) + + cert = crypto.load_pkcs12(buff).get_certificate() + + actual_not_before = datetime.datetime.strptime( + cert.get_notBefore().decode('ascii'), '%Y%m%d%H%M%SZ') + actual_not_after = datetime.datetime.strptime( + cert.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ') + + time.mktime(expected_not_before.utctimetuple()) + assert abs((time.mktime(actual_not_before.utctimetuple()) - time.mktime(expected_not_before.utctimetuple()))) < 10 + assert abs((time.mktime(actual_not_after.utctimetuple()) - time.mktime(expected_not_after.utctimetuple()))) < 10 diff --git a/warcprox/certauth.py b/warcprox/certauth.py new file mode 100644 index 0000000..ca433e3 --- /dev/null +++ b/warcprox/certauth.py @@ -0,0 +1,263 @@ +import logging +import os + +from OpenSSL import crypto +from OpenSSL.SSL import FILETYPE_PEM + +import random +from argparse import ArgumentParser + +import threading + +# ================================================================= +# Valid for 3 years from now +# Max validity is 39 months: +# https://casecurity.org/2015/02/19/ssl-certificate-validity-periods-limited-to-39-months-starting-in-april/ +CERT_NOT_AFTER = 3 * 365 * 24 * 60 * 60 + +CERTS_DIR = './ca/certs/' + +CERT_NAME = 'certauth sample CA' + +DEF_HASH_FUNC = 'sha256' + + +# ================================================================= +class CertificateAuthority(object): + """ + Utility class for signing individual certificate + with a root cert. + + Static generate_ca_root() method for creating the root cert + + All certs saved on filesystem. Individual certs are stored + in specified certs_dir and reused if previously created. + """ + + def __init__(self, ca_file, certs_dir, ca_name, + overwrite=False, + cert_not_before=0, + cert_not_after=CERT_NOT_AFTER): + + assert(ca_file) + self.ca_file = ca_file + + assert(certs_dir) + self.certs_dir = certs_dir + + assert(ca_name) + self.ca_name = ca_name + + self._file_created = False + + self.cert_not_before = cert_not_before + self.cert_not_after = cert_not_after + + if not os.path.exists(certs_dir): + os.makedirs(certs_dir) + + # if file doesn't exist or overwrite is true + # create new root cert + if (overwrite or not os.path.isfile(ca_file)): + self.cert, self.key = self.generate_ca_root(ca_file, ca_name) + self._file_created = True + + # read previously created root cert + else: + self.cert, self.key = self.read_pem(ca_file) + + self._lock = threading.Lock() + + def cert_for_host(self, host, overwrite=False, wildcard=False): + with self._lock: + host_filename = os.path.join(self.certs_dir, host) + '.pem' + + if not overwrite and os.path.exists(host_filename): + self._file_created = False + return host_filename + + self.generate_host_cert(host, self.cert, self.key, host_filename, + wildcard) + + self._file_created = True + return host_filename + + def get_wildcard_cert(self, cert_host): + host_parts = cert_host.split('.', 1) + if len(host_parts) == 2 and '.' in host_parts[1]: + cert_host = host_parts[1] + + certfile = self.cert_for_host(cert_host, + wildcard=True) + + return certfile + + def get_root_PKCS12(self): + p12 = crypto.PKCS12() + p12.set_certificate(self.cert) + p12.set_privatekey(self.key) + return p12.export() + + def _make_cert(self, certname): + cert = crypto.X509() + cert.set_serial_number(random.randint(0, 2 ** 64 - 1)) + cert.get_subject().CN = certname + + cert.set_version(2) + cert.gmtime_adj_notBefore(self.cert_not_before) + cert.gmtime_adj_notAfter(self.cert_not_after) + return cert + + def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC): + # Generate key + key = crypto.PKey() + key.generate_key(crypto.TYPE_RSA, 2048) + + # Generate cert + cert = self._make_cert(ca_name) + + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(key) + cert.add_extensions([ + crypto.X509Extension(b"basicConstraints", + True, + b"CA:TRUE, pathlen:0"), + + crypto.X509Extension(b"keyUsage", + True, + b"keyCertSign, cRLSign"), + + crypto.X509Extension(b"subjectKeyIdentifier", + False, + b"hash", + subject=cert), + ]) + cert.sign(key, hash_func) + + # Write cert + key + self.write_pem(ca_file, cert, key) + return cert, key + + def generate_host_cert(self, host, root_cert, root_key, host_filename, + wildcard=False, hash_func=DEF_HASH_FUNC): + + host = host.encode('utf-8') + + # Generate key + key = crypto.PKey() + key.generate_key(crypto.TYPE_RSA, 2048) + + # Generate CSR + req = crypto.X509Req() + req.get_subject().CN = host + req.set_pubkey(key) + req.sign(key, hash_func) + + # Generate Cert + cert = self._make_cert(host) + + cert.set_issuer(root_cert.get_subject()) + cert.set_pubkey(req.get_pubkey()) + + if wildcard: + DNS = b'DNS:' + alt_hosts = [DNS + host, + DNS + b'*.' + host] + + alt_hosts = b', '.join(alt_hosts) + + cert.add_extensions([ + crypto.X509Extension(b'subjectAltName', + False, + alt_hosts)]) + + cert.sign(root_key, hash_func) + + # Write cert + key + self.write_pem(host_filename, cert, key) + return cert, key + + def write_pem(self, filename, cert, key): + with open(filename, 'wb+') as f: + f.write(crypto.dump_privatekey(FILETYPE_PEM, key)) + + f.write(crypto.dump_certificate(FILETYPE_PEM, cert)) + + def read_pem(self, filename): + with open(filename, 'r') as f: + cert = crypto.load_certificate(FILETYPE_PEM, f.read()) + f.seek(0) + key = crypto.load_privatekey(FILETYPE_PEM, f.read()) + + return cert, key + + +# ================================================================= +def main(args=None): + parser = ArgumentParser(description='Certificate Authority Cert Maker Tools') + + parser.add_argument('root_ca_cert', + help='Path to existing or new root CA file') + + parser.add_argument('-c', '--certname', action='store', default=CERT_NAME, + help='Name for root certificate') + + parser.add_argument('-n', '--hostname', + help='Hostname certificate to create') + + parser.add_argument('-d', '--certs-dir', default=CERTS_DIR, + help='Directory for host certificates') + + parser.add_argument('-f', '--force', action='store_true', + help='Overwrite certificates if they already exist') + + parser.add_argument('-w', '--wildcard_cert', action='store_true', + help='add wildcard SAN to host: *., ') + + r = parser.parse_args(args=args) + + certs_dir = r.certs_dir + wildcard = r.wildcard_cert + + root_cert = r.root_ca_cert + hostname = r.hostname + + if not hostname: + overwrite = r.force + else: + overwrite = False + + ca = CertificateAuthority(ca_file=root_cert, + certs_dir=r.certs_dir, + ca_name=r.certname, + overwrite=overwrite) + + # Just creating the root cert + if not hostname: + if ca._file_created: + print('Created new root cert: "' + root_cert + '"') + return 0 + else: + print('Root cert "' + root_cert + + '" already exists,' + ' use -f to overwrite') + return 1 + + # Sign a certificate for a given host + overwrite = r.force + host_filename = ca.cert_for_host(hostname, + overwrite, wildcard) + + if ca._file_created: + print('Created new cert "' + hostname + + '" signed by root cert ' + + root_cert) + return 0 + + else: + print('Cert for "' + hostname + '" already exists,' + + ' use -f to overwrite') + return 1 + + +if __name__ == "__main__": #pragma: no cover + main() diff --git a/warcprox/controller.py b/warcprox/controller.py index 8d670cb..fcd4ab7 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -31,7 +31,6 @@ import sys import gc import datetime import warcprox -import certauth import functools import doublethink import importlib diff --git a/warcprox/main.py b/warcprox/main.py index 01bd189..d0bb1a6 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -39,7 +39,6 @@ import socket import traceback import signal import threading -import certauth.certauth import yaml import warcprox import doublethink diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index e6af7ac..bc1c4b9 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -83,7 +83,8 @@ from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError import doublethink from cachetools import TTLCache from threading import RLock -from certauth.certauth import CertificateAuthority + +from .certauth import CertificateAuthority class ProxyingRecorder(object): """