i don't think we really need the CA, just use same cert for everything, cert won't verify anyway so host doesn't have to match

This commit is contained in:
Noah Levitt 2013-10-15 11:43:45 -07:00
parent 5b26a02549
commit 92e5d79ea7

121
src/warcprox/proxy.py Normal file → Executable file
View File

@ -18,9 +18,8 @@ from ssl import wrap_socket
from socket import socket
from sys import argv
from OpenSSL.crypto import (X509Extension, X509, dump_privatekey, dump_certificate, load_certificate, load_privatekey,
PKey, TYPE_RSA, X509Req)
from OpenSSL.SSL import FILETYPE_PEM
import OpenSSL.crypto
import OpenSSL.SSL
import logging
import sys
import ssl
@ -32,6 +31,7 @@ from datetime import datetime
import time
import Queue
import threading
import os.path
__author__ = 'Nadeem Douba'
__copyright__ = 'Copyright 2012, PyMiProxy Project'
@ -44,7 +44,6 @@ __email__ = 'ndouba@gmail.com'
__status__ = 'Development'
__all__ = [
'CertificateAuthority',
'ProxyHandler',
'RequestInterceptorPlugin',
'ResponseInterceptorPlugin',
@ -54,91 +53,6 @@ __all__ = [
]
class CertificateAuthority(object):
def __init__(self, ca_file='ca.pem', cache_dir=gettempdir()):
self.ca_file = ca_file
self.cache_dir = cache_dir
self._serial = self._get_serial()
if not path.exists(ca_file):
self._generate_ca()
else:
self._read_ca(ca_file)
def _get_serial(self):
s = 1
for c in filter(lambda x: x.startswith('.pymp_'), listdir(self.cache_dir)):
c = load_certificate(FILETYPE_PEM, open(path.sep.join([self.cache_dir, c])).read())
sc = c.get_serial_number()
if sc > s:
s = sc
del c
return s
def _generate_ca(self):
# Generate key
self.key = PKey()
self.key.generate_key(TYPE_RSA, 2048)
# Generate certificate
self.cert = X509()
self.cert.set_version(3)
self.cert.set_serial_number(1)
self.cert.get_subject().CN = 'ca.mitm.com'
self.cert.gmtime_adj_notBefore(0)
self.cert.gmtime_adj_notAfter(315360000)
self.cert.set_issuer(self.cert.get_subject())
self.cert.set_pubkey(self.key)
self.cert.add_extensions([
X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"),
X509Extension("keyUsage", True, "keyCertSign, cRLSign"),
X509Extension("subjectKeyIdentifier", False, "hash", subject=self.cert),
])
self.cert.sign(self.key, "sha1")
with open(self.ca_file, 'wb+') as f:
f.write(dump_privatekey(FILETYPE_PEM, self.key))
f.write(dump_certificate(FILETYPE_PEM, self.cert))
def _read_ca(self, file):
self.cert = load_certificate(FILETYPE_PEM, open(file).read())
self.key = load_privatekey(FILETYPE_PEM, open(file).read())
def __getitem__(self, cn):
cnp = path.sep.join([self.cache_dir, '.pymp_%s.pem' % cn])
if not path.exists(cnp):
# create certificate
key = PKey()
key.generate_key(TYPE_RSA, 2048)
# Generate CSR
req = X509Req()
req.get_subject().CN = cn
req.set_pubkey(key)
req.sign(key, 'sha1')
# Sign CSR
cert = X509()
cert.set_subject(req.get_subject())
cert.set_serial_number(self.serial)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(31536000)
cert.set_issuer(self.cert.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(self.key, 'sha1')
with open(cnp, 'wb+') as f:
f.write(dump_privatekey(FILETYPE_PEM, key))
f.write(dump_certificate(FILETYPE_PEM, cert))
return cnp
@property
def serial(self):
self._serial += 1
return self._serial
class UnsupportedSchemeException(Exception):
pass
@ -182,7 +96,7 @@ class ProxyHandler(BaseHTTPRequestHandler):
def _transition_to_ssl(self):
self.request = wrap_socket(self.request, server_side=True, certfile=self.server.ca[self.path.split(':')[0]])
self.request = wrap_socket(self.request, server_side=True, certfile=self.server.certfile)
def do_CONNECT(self):
@ -293,11 +207,34 @@ class InvalidInterceptorPluginException(Exception):
class MitmProxy(HTTPServer):
def __init__(self, server_address=('', 8080), RequestHandlerClass=ProxyHandler, bind_and_activate=True, ca_file='ca.pem'):
def __init__(self, server_address=('', 8080), RequestHandlerClass=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'):
HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
self.ca = CertificateAuthority(ca_file)
self._res_plugins = []
self._req_plugins = []
self.certfile = certfile
if not os.path.exists(certfile):
self._generate_cert(certfile)
def _generate_cert(self, certfile):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
cert = OpenSSL.crypto.X509()
cert.set_version(3)
cert.set_serial_number(1)
cert.get_subject().CN = 'warcprox man-in-the-middle archiving http/s proxy'
cert.gmtime_adj_notBefore(0) # now
cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, "sha1")
with open(certfile, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
def register_interceptor(self, interceptor_class):
if not issubclass(interceptor_class, InterceptorPlugin):