Merge pull request #200 from vbanos/pyopenssl-cryptography

Thank you, @vbanos!

Replace PyOpenSSL with cryptography
This commit is contained in:
Barbara Miller 2024-07-27 09:09:29 -07:00 committed by GitHub
commit 701b659510
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 86 additions and 65 deletions

View File

@ -29,7 +29,6 @@ deps = [
'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311', 'doublethink @ git+https://github.com/internetarchive/doublethink.git@Py311',
'urllib3>=1.23', 'urllib3>=1.23',
'requests>=2.0.1', 'requests>=2.0.1',
'pyopenssl',
'PySocks>=1.6.8', 'PySocks>=1.6.8',
'cryptography>=2.3,<40', 'cryptography>=2.3,<40',
'idna', 'idna',

View File

@ -1,14 +1,16 @@
import logging import logging
import os import os
from OpenSSL import crypto
from OpenSSL.SSL import FILETYPE_PEM
import random import random
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import datetime, timedelta
import threading import threading
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
# ================================================================= # =================================================================
# Valid for 3 years from now # Valid for 3 years from now
# Max validity is 39 months: # Max validity is 39 months:
@ -19,7 +21,7 @@ CERTS_DIR = './ca/certs/'
CERT_NAME = 'certauth sample CA' CERT_NAME = 'certauth sample CA'
DEF_HASH_FUNC = 'sha256' DEF_HASH_FUNC = hashes.SHA256()
# ================================================================= # =================================================================
@ -93,47 +95,54 @@ class CertificateAuthority(object):
return certfile return certfile
def get_root_PKCS12(self): def get_root_PKCS12(self):
p12 = crypto.PKCS12() return serialization.pkcs12.serialize_key_and_certificates(
p12.set_certificate(self.cert) name=b"root",
p12.set_privatekey(self.key) key=self.key,
return p12.export() cert=self.cert,
cas=None,
encryption_algorithm=serialization.NoEncryption()
)
def _make_cert(self, certname): def _make_cert(self, certname):
cert = crypto.X509() subject = issuer = x509.Name([
cert.set_serial_number(random.randint(0, 2 ** 64 - 1)) x509.NameAttribute(NameOID.COMMON_NAME, certname),
cert.get_subject().CN = certname ])
cert = x509.CertificateBuilder().subject_name(
cert.set_version(2) subject
cert.gmtime_adj_notBefore(self.cert_not_before) ).issuer_name(
cert.gmtime_adj_notAfter(self.cert_not_after) issuer
).public_key(
self.key.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
).add_extension(
x509.BasicConstraints(ca=True, path_length=0), critical=True,
).add_extension(
x509.KeyUsage(key_cert_sign=True, crl_sign=True, digital_signature=False,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False, encipher_only=False,
decipher_only=False), critical=True
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(self.key.public_key()), critical=False
).sign(self.key, DEF_HASH_FUNC, default_backend())
return cert return cert
def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC): def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC):
# Generate key # Generate key
key = crypto.PKey() key = rsa.generate_private_key(
key.generate_key(crypto.TYPE_RSA, 2048) public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Generate cert # Generate cert
self.key = key
cert = self._make_cert(ca_name) cert = self._make_cert(ca_name)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.add_extensions([
crypto.X509Extension(b"basicConstraints",
True,
b"CA:TRUE, pathlen:0"),
crypto.X509Extension(b"keyUsage",
True,
b"keyCertSign, cRLSign"),
crypto.X509Extension(b"subjectKeyIdentifier",
False,
b"hash",
subject=cert),
])
cert.sign(key, hash_func)
# Write cert + key # Write cert + key
self.write_pem(ca_file, cert, key) self.write_pem(ca_file, cert, key)
return cert, key return cert, key
@ -144,34 +153,44 @@ class CertificateAuthority(object):
host = host.encode('utf-8') host = host.encode('utf-8')
# Generate key # Generate key
key = crypto.PKey() key = rsa.generate_private_key(
key.generate_key(crypto.TYPE_RSA, 2048) public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Generate CSR # Generate CSR
req = crypto.X509Req() csr = x509.CertificateSigningRequestBuilder().subject_name(
req.get_subject().CN = host x509.Name([
req.set_pubkey(key) x509.NameAttribute(NameOID.COMMON_NAME, host.decode('utf-8')),
req.sign(key, hash_func) ])
).sign(key, hash_func, default_backend())
# Generate Cert # Generate Cert
cert = self._make_cert(host) cert_builder = x509.CertificateBuilder().subject_name(
csr.subject
cert.set_issuer(root_cert.get_subject()) ).issuer_name(
cert.set_pubkey(req.get_pubkey()) root_cert.subject
).public_key(
csr.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
)
if wildcard: if wildcard:
DNS = b'DNS:' cert_builder = cert_builder.add_extension(
alt_hosts = [DNS + host, x509.SubjectAlternativeName([
DNS + b'*.' + host] x509.DNSName(host.decode('utf-8')),
x509.DNSName('*.' + host.decode('utf-8')),
]),
critical=False,
)
alt_hosts = b', '.join(alt_hosts) cert = cert_builder.sign(root_key, hash_func, default_backend())
cert.add_extensions([
crypto.X509Extension(b'subjectAltName',
False,
alt_hosts)])
cert.sign(root_key, hash_func)
# Write cert + key # Write cert + key
self.write_pem(host_filename, cert, key) self.write_pem(host_filename, cert, key)
@ -179,15 +198,18 @@ class CertificateAuthority(object):
def write_pem(self, filename, cert, key): def write_pem(self, filename, cert, key):
with open(filename, 'wb+') as f: with open(filename, 'wb+') as f:
f.write(crypto.dump_privatekey(FILETYPE_PEM, key)) f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
f.write(crypto.dump_certificate(FILETYPE_PEM, cert)) format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
f.write(cert.public_bytes(serialization.Encoding.PEM))
def read_pem(self, filename): def read_pem(self, filename):
with open(filename, 'r') as f: with open(filename, 'rb') as f:
cert = crypto.load_certificate(FILETYPE_PEM, f.read()) cert = x509.load_pem_x509_certificate(f.read(), default_backend())
f.seek(0) f.seek(0)
key = crypto.load_privatekey(FILETYPE_PEM, f.read()) key = serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
return cert, key return cert, key