Merge pull request #199 from vbanos/add-certauth

Create warcprox.certauth and drop certauth dependency
This commit is contained in:
Barbara Miller 2024-07-24 17:09:19 -07:00 committed by GitHub
commit 6756ba60fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 355 additions and 4 deletions

View File

@ -24,12 +24,12 @@ import sys
import setuptools import setuptools
deps = [ deps = [
'certauth==1.1.6',
'warctools>=4.10.0', 'warctools>=4.10.0',
'urlcanon>=0.3.0', 'urlcanon>=0.3.0',
'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311', 'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311',
'urllib3>=1.23', 'urllib3>=1.23',
'requests>=2.0.1', 'requests>=2.0.1',
'pyopenssl',
'PySocks>=1.6.8', 'PySocks>=1.6.8',
'cryptography>=2.3,<40', 'cryptography>=2.3,<40',
'idna', 'idna',

89
tests/test_certauth.py Normal file
View File

@ -0,0 +1,89 @@
import os
import shutil
from warcprox.certauth import main, CertificateAuthority
import tempfile
from OpenSSL import crypto
import datetime
import time
def setup_module():
global TEST_CA_DIR
TEST_CA_DIR = tempfile.mkdtemp()
global TEST_CA_ROOT
TEST_CA_ROOT = os.path.join(TEST_CA_DIR, 'certauth_test_ca.pem')
def teardown_module():
shutil.rmtree(TEST_CA_DIR)
assert not os.path.isdir(TEST_CA_DIR)
assert not os.path.isfile(TEST_CA_ROOT)
def test_create_root():
ret = main([TEST_CA_ROOT, '-c', 'Test Root Cert'])
assert ret == 0
def test_create_host_cert():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com'])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
def test_create_wildcard_host_cert_force_overwrite():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '--hostname', 'example.com', '-w', '-f'])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
def test_explicit_wildcard():
ca = CertificateAuthority(TEST_CA_ROOT, TEST_CA_DIR, 'Test CA')
filename = ca.get_wildcard_cert('test.example.proxy')
certfile = os.path.join(TEST_CA_DIR, 'example.proxy.pem')
assert filename == certfile
assert os.path.isfile(certfile)
os.remove(certfile)
def test_create_already_exists():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com', '-w'])
assert ret == 1
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
# remove now
os.remove(certfile)
def test_create_root_already_exists():
ret = main([TEST_CA_ROOT])
# not created, already exists
assert ret == 1
# remove now
os.remove(TEST_CA_ROOT)
def test_create_root_subdir():
# create a new cert in a subdirectory
subdir = os.path.join(TEST_CA_DIR, 'subdir')
ca_file = os.path.join(subdir, 'certauth_test_ca.pem')
ca = CertificateAuthority(ca_file, subdir, 'Test CA',
cert_not_before=-60 * 60,
cert_not_after=60 * 60 * 24 * 3)
assert os.path.isdir(subdir)
assert os.path.isfile(ca_file)
buff = ca.get_root_PKCS12()
assert len(buff) > 0
expected_not_before = datetime.datetime.utcnow() - datetime.timedelta(seconds=60 * 60)
expected_not_after = datetime.datetime.utcnow() + datetime.timedelta(seconds=60 * 60 * 24 * 3)
cert = crypto.load_pkcs12(buff).get_certificate()
actual_not_before = datetime.datetime.strptime(
cert.get_notBefore().decode('ascii'), '%Y%m%d%H%M%SZ')
actual_not_after = datetime.datetime.strptime(
cert.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
time.mktime(expected_not_before.utctimetuple())
assert abs((time.mktime(actual_not_before.utctimetuple()) - time.mktime(expected_not_before.utctimetuple()))) < 10
assert abs((time.mktime(actual_not_after.utctimetuple()) - time.mktime(expected_not_after.utctimetuple()))) < 10

263
warcprox/certauth.py Normal file
View File

@ -0,0 +1,263 @@
import logging
import os
from OpenSSL import crypto
from OpenSSL.SSL import FILETYPE_PEM
import random
from argparse import ArgumentParser
import threading
# =================================================================
# Valid for 3 years from now
# Max validity is 39 months:
# https://casecurity.org/2015/02/19/ssl-certificate-validity-periods-limited-to-39-months-starting-in-april/
CERT_NOT_AFTER = 3 * 365 * 24 * 60 * 60
CERTS_DIR = './ca/certs/'
CERT_NAME = 'certauth sample CA'
DEF_HASH_FUNC = 'sha256'
# =================================================================
class CertificateAuthority(object):
"""
Utility class for signing individual certificate
with a root cert.
Static generate_ca_root() method for creating the root cert
All certs saved on filesystem. Individual certs are stored
in specified certs_dir and reused if previously created.
"""
def __init__(self, ca_file, certs_dir, ca_name,
overwrite=False,
cert_not_before=0,
cert_not_after=CERT_NOT_AFTER):
assert(ca_file)
self.ca_file = ca_file
assert(certs_dir)
self.certs_dir = certs_dir
assert(ca_name)
self.ca_name = ca_name
self._file_created = False
self.cert_not_before = cert_not_before
self.cert_not_after = cert_not_after
if not os.path.exists(certs_dir):
os.makedirs(certs_dir)
# if file doesn't exist or overwrite is true
# create new root cert
if (overwrite or not os.path.isfile(ca_file)):
self.cert, self.key = self.generate_ca_root(ca_file, ca_name)
self._file_created = True
# read previously created root cert
else:
self.cert, self.key = self.read_pem(ca_file)
self._lock = threading.Lock()
def cert_for_host(self, host, overwrite=False, wildcard=False):
with self._lock:
host_filename = os.path.join(self.certs_dir, host) + '.pem'
if not overwrite and os.path.exists(host_filename):
self._file_created = False
return host_filename
self.generate_host_cert(host, self.cert, self.key, host_filename,
wildcard)
self._file_created = True
return host_filename
def get_wildcard_cert(self, cert_host):
host_parts = cert_host.split('.', 1)
if len(host_parts) == 2 and '.' in host_parts[1]:
cert_host = host_parts[1]
certfile = self.cert_for_host(cert_host,
wildcard=True)
return certfile
def get_root_PKCS12(self):
p12 = crypto.PKCS12()
p12.set_certificate(self.cert)
p12.set_privatekey(self.key)
return p12.export()
def _make_cert(self, certname):
cert = crypto.X509()
cert.set_serial_number(random.randint(0, 2 ** 64 - 1))
cert.get_subject().CN = certname
cert.set_version(2)
cert.gmtime_adj_notBefore(self.cert_not_before)
cert.gmtime_adj_notAfter(self.cert_not_after)
return cert
def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC):
# Generate key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
# Generate cert
cert = self._make_cert(ca_name)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.add_extensions([
crypto.X509Extension(b"basicConstraints",
True,
b"CA:TRUE, pathlen:0"),
crypto.X509Extension(b"keyUsage",
True,
b"keyCertSign, cRLSign"),
crypto.X509Extension(b"subjectKeyIdentifier",
False,
b"hash",
subject=cert),
])
cert.sign(key, hash_func)
# Write cert + key
self.write_pem(ca_file, cert, key)
return cert, key
def generate_host_cert(self, host, root_cert, root_key, host_filename,
wildcard=False, hash_func=DEF_HASH_FUNC):
host = host.encode('utf-8')
# Generate key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
# Generate CSR
req = crypto.X509Req()
req.get_subject().CN = host
req.set_pubkey(key)
req.sign(key, hash_func)
# Generate Cert
cert = self._make_cert(host)
cert.set_issuer(root_cert.get_subject())
cert.set_pubkey(req.get_pubkey())
if wildcard:
DNS = b'DNS:'
alt_hosts = [DNS + host,
DNS + b'*.' + host]
alt_hosts = b', '.join(alt_hosts)
cert.add_extensions([
crypto.X509Extension(b'subjectAltName',
False,
alt_hosts)])
cert.sign(root_key, hash_func)
# Write cert + key
self.write_pem(host_filename, cert, key)
return cert, key
def write_pem(self, filename, cert, key):
with open(filename, 'wb+') as f:
f.write(crypto.dump_privatekey(FILETYPE_PEM, key))
f.write(crypto.dump_certificate(FILETYPE_PEM, cert))
def read_pem(self, filename):
with open(filename, 'r') as f:
cert = crypto.load_certificate(FILETYPE_PEM, f.read())
f.seek(0)
key = crypto.load_privatekey(FILETYPE_PEM, f.read())
return cert, key
# =================================================================
def main(args=None):
parser = ArgumentParser(description='Certificate Authority Cert Maker Tools')
parser.add_argument('root_ca_cert',
help='Path to existing or new root CA file')
parser.add_argument('-c', '--certname', action='store', default=CERT_NAME,
help='Name for root certificate')
parser.add_argument('-n', '--hostname',
help='Hostname certificate to create')
parser.add_argument('-d', '--certs-dir', default=CERTS_DIR,
help='Directory for host certificates')
parser.add_argument('-f', '--force', action='store_true',
help='Overwrite certificates if they already exist')
parser.add_argument('-w', '--wildcard_cert', action='store_true',
help='add wildcard SAN to host: *.<host>, <host>')
r = parser.parse_args(args=args)
certs_dir = r.certs_dir
wildcard = r.wildcard_cert
root_cert = r.root_ca_cert
hostname = r.hostname
if not hostname:
overwrite = r.force
else:
overwrite = False
ca = CertificateAuthority(ca_file=root_cert,
certs_dir=r.certs_dir,
ca_name=r.certname,
overwrite=overwrite)
# Just creating the root cert
if not hostname:
if ca._file_created:
print('Created new root cert: "' + root_cert + '"')
return 0
else:
print('Root cert "' + root_cert +
'" already exists,' + ' use -f to overwrite')
return 1
# Sign a certificate for a given host
overwrite = r.force
host_filename = ca.cert_for_host(hostname,
overwrite, wildcard)
if ca._file_created:
print('Created new cert "' + hostname +
'" signed by root cert ' +
root_cert)
return 0
else:
print('Cert for "' + hostname + '" already exists,' +
' use -f to overwrite')
return 1
if __name__ == "__main__": #pragma: no cover
main()

View File

@ -31,7 +31,6 @@ import sys
import gc import gc
import datetime import datetime
import warcprox import warcprox
import certauth
import functools import functools
import doublethink import doublethink
import importlib import importlib

View File

@ -39,7 +39,6 @@ import socket
import traceback import traceback
import signal import signal
import threading import threading
import certauth.certauth
import yaml import yaml
import warcprox import warcprox
import doublethink import doublethink

View File

@ -83,7 +83,8 @@ from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError
import doublethink import doublethink
from cachetools import TTLCache from cachetools import TTLCache
from threading import RLock from threading import RLock
from certauth.certauth import CertificateAuthority
from .certauth import CertificateAuthority
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """