From 10d36cc9434966e7e5e00cd506835946c6994da7 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 26 Jul 2024 13:04:15 +0000 Subject: [PATCH] Replace PyOpenSSL with cryptography PyOpenSSL is deprecated. We replace it with `cryptography` following their recommendation at: https://pypi.org/project/pyOpenSSL/ We drop the `pyopenssl` dependency. --- setup.py | 1 - warcprox/certauth.py | 150 +++++++++++++++++++++++++------------------ 2 files changed, 86 insertions(+), 65 deletions(-) diff --git a/setup.py b/setup.py index 2c2272e..d21f984 100755 --- a/setup.py +++ b/setup.py @@ -29,7 +29,6 @@ deps = [ 'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311', 'urllib3>=1.23', 'requests>=2.0.1', - 'pyopenssl', 'PySocks>=1.6.8', 'cryptography>=2.3,<40', 'idna', diff --git a/warcprox/certauth.py b/warcprox/certauth.py index ca433e3..6a90100 100644 --- a/warcprox/certauth.py +++ b/warcprox/certauth.py @@ -1,14 +1,16 @@ import logging import os - -from OpenSSL import crypto -from OpenSSL.SSL import FILETYPE_PEM - import random from argparse import ArgumentParser - +from datetime import datetime, timedelta import threading +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + # ================================================================= # Valid for 3 years from now # Max validity is 39 months: @@ -19,7 +21,7 @@ CERTS_DIR = './ca/certs/' CERT_NAME = 'certauth sample CA' -DEF_HASH_FUNC = 'sha256' +DEF_HASH_FUNC = hashes.SHA256() # ================================================================= @@ -93,47 +95,54 @@ class CertificateAuthority(object): return certfile def get_root_PKCS12(self): - p12 = crypto.PKCS12() - p12.set_certificate(self.cert) - p12.set_privatekey(self.key) - return p12.export() + return serialization.pkcs12.serialize_key_and_certificates( + name=b"root", + key=self.key, + cert=self.cert, + cas=None, + encryption_algorithm=serialization.NoEncryption() + ) def _make_cert(self, certname): - cert = crypto.X509() - cert.set_serial_number(random.randint(0, 2 ** 64 - 1)) - cert.get_subject().CN = certname - - cert.set_version(2) - cert.gmtime_adj_notBefore(self.cert_not_before) - cert.gmtime_adj_notAfter(self.cert_not_after) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, certname), + ]) + cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + self.key.public_key() + ).serial_number( + random.randint(0, 2**64 - 1) + ).not_valid_before( + datetime.utcnow() + ).not_valid_after( + datetime.utcnow() + timedelta(seconds=self.cert_not_after) + ).add_extension( + x509.BasicConstraints(ca=True, path_length=0), critical=True, + ).add_extension( + x509.KeyUsage(key_cert_sign=True, crl_sign=True, digital_signature=False, + content_commitment=False, key_encipherment=False, + data_encipherment=False, key_agreement=False, encipher_only=False, + decipher_only=False), critical=True + ).add_extension( + x509.SubjectKeyIdentifier.from_public_key(self.key.public_key()), critical=False + ).sign(self.key, DEF_HASH_FUNC, default_backend()) return cert def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC): # Generate key - key = crypto.PKey() - key.generate_key(crypto.TYPE_RSA, 2048) + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) # Generate cert + self.key = key cert = self._make_cert(ca_name) - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(key) - cert.add_extensions([ - crypto.X509Extension(b"basicConstraints", - True, - b"CA:TRUE, pathlen:0"), - - crypto.X509Extension(b"keyUsage", - True, - b"keyCertSign, cRLSign"), - - crypto.X509Extension(b"subjectKeyIdentifier", - False, - b"hash", - subject=cert), - ]) - cert.sign(key, hash_func) - # Write cert + key self.write_pem(ca_file, cert, key) return cert, key @@ -144,34 +153,44 @@ class CertificateAuthority(object): host = host.encode('utf-8') # Generate key - key = crypto.PKey() - key.generate_key(crypto.TYPE_RSA, 2048) + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) # Generate CSR - req = crypto.X509Req() - req.get_subject().CN = host - req.set_pubkey(key) - req.sign(key, hash_func) + csr = x509.CertificateSigningRequestBuilder().subject_name( + x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, host.decode('utf-8')), + ]) + ).sign(key, hash_func, default_backend()) # Generate Cert - cert = self._make_cert(host) - - cert.set_issuer(root_cert.get_subject()) - cert.set_pubkey(req.get_pubkey()) + cert_builder = x509.CertificateBuilder().subject_name( + csr.subject + ).issuer_name( + root_cert.subject + ).public_key( + csr.public_key() + ).serial_number( + random.randint(0, 2**64 - 1) + ).not_valid_before( + datetime.utcnow() + ).not_valid_after( + datetime.utcnow() + timedelta(seconds=self.cert_not_after) + ) if wildcard: - DNS = b'DNS:' - alt_hosts = [DNS + host, - DNS + b'*.' + host] + cert_builder = cert_builder.add_extension( + x509.SubjectAlternativeName([ + x509.DNSName(host.decode('utf-8')), + x509.DNSName('*.' + host.decode('utf-8')), + ]), + critical=False, + ) - alt_hosts = b', '.join(alt_hosts) - - cert.add_extensions([ - crypto.X509Extension(b'subjectAltName', - False, - alt_hosts)]) - - cert.sign(root_key, hash_func) + cert = cert_builder.sign(root_key, hash_func, default_backend()) # Write cert + key self.write_pem(host_filename, cert, key) @@ -179,15 +198,18 @@ class CertificateAuthority(object): def write_pem(self, filename, cert, key): with open(filename, 'wb+') as f: - f.write(crypto.dump_privatekey(FILETYPE_PEM, key)) - - f.write(crypto.dump_certificate(FILETYPE_PEM, cert)) + f.write(key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + )) + f.write(cert.public_bytes(serialization.Encoding.PEM)) def read_pem(self, filename): - with open(filename, 'r') as f: - cert = crypto.load_certificate(FILETYPE_PEM, f.read()) + with open(filename, 'rb') as f: + cert = x509.load_pem_x509_certificate(f.read(), default_backend()) f.seek(0) - key = crypto.load_privatekey(FILETYPE_PEM, f.read()) + key = serialization.load_pem_private_key(f.read(), password=None, backend=default_backend()) return cert, key