Replace PyOpenSSL with cryptography

PyOpenSSL is deprecated. We replace it with `cryptography` following
their recommendation at: https://pypi.org/project/pyOpenSSL/

We drop the `pyopenssl` dependency.
This commit is contained in:
Vangelis Banos 2024-07-26 13:04:15 +00:00
parent a65b8b82b9
commit 10d36cc943
2 changed files with 86 additions and 65 deletions

View File

@ -29,7 +29,6 @@ deps = [
'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311',
'urllib3>=1.23',
'requests>=2.0.1',
'pyopenssl',
'PySocks>=1.6.8',
'cryptography>=2.3,<40',
'idna',

View File

@ -1,14 +1,16 @@
import logging
import os
from OpenSSL import crypto
from OpenSSL.SSL import FILETYPE_PEM
import random
from argparse import ArgumentParser
from datetime import datetime, timedelta
import threading
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
# =================================================================
# Valid for 3 years from now
# Max validity is 39 months:
@ -19,7 +21,7 @@ CERTS_DIR = './ca/certs/'
CERT_NAME = 'certauth sample CA'
DEF_HASH_FUNC = 'sha256'
DEF_HASH_FUNC = hashes.SHA256()
# =================================================================
@ -93,47 +95,54 @@ class CertificateAuthority(object):
return certfile
def get_root_PKCS12(self):
p12 = crypto.PKCS12()
p12.set_certificate(self.cert)
p12.set_privatekey(self.key)
return p12.export()
return serialization.pkcs12.serialize_key_and_certificates(
name=b"root",
key=self.key,
cert=self.cert,
cas=None,
encryption_algorithm=serialization.NoEncryption()
)
def _make_cert(self, certname):
cert = crypto.X509()
cert.set_serial_number(random.randint(0, 2 ** 64 - 1))
cert.get_subject().CN = certname
cert.set_version(2)
cert.gmtime_adj_notBefore(self.cert_not_before)
cert.gmtime_adj_notAfter(self.cert_not_after)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, certname),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
self.key.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
).add_extension(
x509.BasicConstraints(ca=True, path_length=0), critical=True,
).add_extension(
x509.KeyUsage(key_cert_sign=True, crl_sign=True, digital_signature=False,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False, encipher_only=False,
decipher_only=False), critical=True
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(self.key.public_key()), critical=False
).sign(self.key, DEF_HASH_FUNC, default_backend())
return cert
def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC):
# Generate key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Generate cert
self.key = key
cert = self._make_cert(ca_name)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.add_extensions([
crypto.X509Extension(b"basicConstraints",
True,
b"CA:TRUE, pathlen:0"),
crypto.X509Extension(b"keyUsage",
True,
b"keyCertSign, cRLSign"),
crypto.X509Extension(b"subjectKeyIdentifier",
False,
b"hash",
subject=cert),
])
cert.sign(key, hash_func)
# Write cert + key
self.write_pem(ca_file, cert, key)
return cert, key
@ -144,34 +153,44 @@ class CertificateAuthority(object):
host = host.encode('utf-8')
# Generate key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Generate CSR
req = crypto.X509Req()
req.get_subject().CN = host
req.set_pubkey(key)
req.sign(key, hash_func)
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, host.decode('utf-8')),
])
).sign(key, hash_func, default_backend())
# Generate Cert
cert = self._make_cert(host)
cert.set_issuer(root_cert.get_subject())
cert.set_pubkey(req.get_pubkey())
cert_builder = x509.CertificateBuilder().subject_name(
csr.subject
).issuer_name(
root_cert.subject
).public_key(
csr.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
)
if wildcard:
DNS = b'DNS:'
alt_hosts = [DNS + host,
DNS + b'*.' + host]
cert_builder = cert_builder.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(host.decode('utf-8')),
x509.DNSName('*.' + host.decode('utf-8')),
]),
critical=False,
)
alt_hosts = b', '.join(alt_hosts)
cert.add_extensions([
crypto.X509Extension(b'subjectAltName',
False,
alt_hosts)])
cert.sign(root_key, hash_func)
cert = cert_builder.sign(root_key, hash_func, default_backend())
# Write cert + key
self.write_pem(host_filename, cert, key)
@ -179,15 +198,18 @@ class CertificateAuthority(object):
def write_pem(self, filename, cert, key):
with open(filename, 'wb+') as f:
f.write(crypto.dump_privatekey(FILETYPE_PEM, key))
f.write(crypto.dump_certificate(FILETYPE_PEM, cert))
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
f.write(cert.public_bytes(serialization.Encoding.PEM))
def read_pem(self, filename):
with open(filename, 'r') as f:
cert = crypto.load_certificate(FILETYPE_PEM, f.read())
with open(filename, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
f.seek(0)
key = crypto.load_privatekey(FILETYPE_PEM, f.read())
key = serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
return cert, key