mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
test https server, and request handler... next step is to use them for actual tests
This commit is contained in:
parent
bfd1cf432e
commit
c76d9b88d3
@ -1,53 +1,140 @@
|
|||||||
# vim: set sw=4 et:
|
# vim: set sw=4 et:
|
||||||
|
|
||||||
|
from warcprox import warcprox
|
||||||
import unittest
|
import unittest
|
||||||
import BaseHTTPServer
|
import BaseHTTPServer
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from warcprox import warcprox
|
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import ssl
|
||||||
|
import re
|
||||||
|
import tempfile
|
||||||
|
import OpenSSL
|
||||||
|
import os
|
||||||
|
|
||||||
|
class TestHttpRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||||
|
logger = logging.getLogger('TestHttpRequestHandler')
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
self.logger.info('GET {}'.format(self.path))
|
||||||
|
|
||||||
|
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
|
||||||
|
if m is not None:
|
||||||
|
special_header = 'warcprox-test-header: {}!'.format(m.group(1))
|
||||||
|
payload = 'I am the warcprox test payload! {}!\n'.format(10*m.group(2))
|
||||||
|
headers = ('HTTP/1.1 200 OK\r\n'
|
||||||
|
+ 'Content-Type: text/plain\r\n'
|
||||||
|
+ '{}\r\n'
|
||||||
|
+ 'Content-Length: {}\r\n'
|
||||||
|
+ '\r\n').format(special_header, len(payload))
|
||||||
|
else:
|
||||||
|
payload = '404 Not Found\n'
|
||||||
|
headers = ('HTTP/1.1 404 Not Found\r\n'
|
||||||
|
+ 'Content-Type: text/plain\r\n'
|
||||||
|
+ 'Content-Length: {}\r\n'
|
||||||
|
+ '\r\n').format(len(payload))
|
||||||
|
|
||||||
|
self.connection.sendall(headers)
|
||||||
|
self.connection.sendall(payload)
|
||||||
|
|
||||||
|
|
||||||
class WarcproxTest(unittest.TestCase):
|
class WarcproxTest(unittest.TestCase):
|
||||||
logger = logging.getLogger('WarcproxTest')
|
logger = logging.getLogger('WarcproxTest')
|
||||||
|
|
||||||
|
def __init__(self, methodName='runTest'):
|
||||||
|
self.__cert = None
|
||||||
|
unittest.TestCase.__init__(self, methodName)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _cert(self):
|
||||||
|
if self.__cert is None:
|
||||||
|
f = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
try:
|
||||||
|
key = OpenSSL.crypto.PKey()
|
||||||
|
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
||||||
|
req = OpenSSL.crypto.X509Req()
|
||||||
|
req.get_subject().CN = 'localhost'
|
||||||
|
req.set_pubkey(key)
|
||||||
|
req.sign(key, 'sha1')
|
||||||
|
cert = OpenSSL.crypto.X509()
|
||||||
|
cert.set_subject(req.get_subject())
|
||||||
|
cert.set_serial_number(0)
|
||||||
|
cert.gmtime_adj_notBefore(0)
|
||||||
|
cert.gmtime_adj_notAfter(2*60*60) # valid for 2hrs
|
||||||
|
cert.set_issuer(cert.get_subject())
|
||||||
|
cert.set_pubkey(req.get_pubkey())
|
||||||
|
cert.sign(key, 'sha1')
|
||||||
|
|
||||||
|
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
|
||||||
|
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
|
||||||
|
|
||||||
|
logging.info('generated self-signed certificate {}'.format(f.name))
|
||||||
|
self.__cert = f.name
|
||||||
|
finally:
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
return self.__cert
|
||||||
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
|
||||||
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||||
|
|
||||||
self.httpd = BaseHTTPServer.HTTPServer(('localhost', 0),
|
# start test http server
|
||||||
RequestHandlerClass=BaseHTTPServer.BaseHTTPRequestHandler)
|
self.http_daemon = BaseHTTPServer.HTTPServer(('localhost', 0),
|
||||||
self.logger.info('starting httpd on {}:{}'.format(self.httpd.server_address[0], self.httpd.server_address[1]))
|
RequestHandlerClass=TestHttpRequestHandler)
|
||||||
self.httpd_thread = threading.Thread(name='HttpdThread',
|
self.logger.info('starting http_daemon on {}:{}'.format(self.http_daemon.server_address[0], self.http_daemon.server_address[1]))
|
||||||
target=self.httpd.serve_forever)
|
self.http_daemon_thread = threading.Thread(name='HttpdThread',
|
||||||
self.httpd_thread.start()
|
target=self.http_daemon.serve_forever)
|
||||||
|
self.http_daemon_thread.start()
|
||||||
|
|
||||||
|
# start test https
|
||||||
|
# http://www.piware.de/2011/01/creating-an-https-server-in-python/
|
||||||
|
self.https_daemon = BaseHTTPServer.HTTPServer(('localhost', 0),
|
||||||
|
RequestHandlerClass=TestHttpRequestHandler)
|
||||||
|
# self.https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True)
|
||||||
|
self.https_daemon.socket = ssl.wrap_socket(self.https_daemon.socket, certfile=self._cert, server_side=True)
|
||||||
|
self.logger.info('starting https_daemon on {}:{}'.format(self.https_daemon.server_address[0], self.https_daemon.server_address[1]))
|
||||||
|
self.https_daemon_thread = threading.Thread(name='HttpdThread',
|
||||||
|
target=self.https_daemon.serve_forever)
|
||||||
|
self.https_daemon_thread.start()
|
||||||
|
|
||||||
|
# start warcprox
|
||||||
self.warcprox = warcprox.WarcproxController()
|
self.warcprox = warcprox.WarcproxController()
|
||||||
self.logger.info('starting warcprox')
|
self.logger.info('starting warcprox')
|
||||||
self.warcprox_thread = threading.Thread(name='WarcproxThread',
|
self.warcprox_thread = threading.Thread(name='WarcproxThread',
|
||||||
target=self.warcprox.run_until_shutdown)
|
target=self.warcprox.run_until_shutdown)
|
||||||
self.warcprox_thread.start()
|
self.warcprox_thread.start()
|
||||||
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.logger.info('stopping warcprox')
|
self.logger.info('stopping warcprox')
|
||||||
self.warcprox.stop.set()
|
self.warcprox.stop.set()
|
||||||
|
|
||||||
self.logger.info('stopping httpd')
|
self.logger.info('stopping http and https daemons')
|
||||||
self.httpd.shutdown()
|
self.http_daemon.shutdown()
|
||||||
self.httpd.server_close()
|
self.https_daemon.shutdown()
|
||||||
|
self.http_daemon.server_close()
|
||||||
|
self.https_daemon.server_close()
|
||||||
|
|
||||||
# Have to wait for threads to finish or the threads will try to use
|
# Have to wait for threads to finish or the threads will try to use
|
||||||
# variables that have been deleted, resulting in errors like this:
|
# variables that no longer exist, resulting in errors like this:
|
||||||
# File "/usr/lib/python2.7/SocketServer.py", line 235, in serve_forever
|
# File "/usr/lib/python2.7/SocketServer.py", line 235, in serve_forever
|
||||||
# r, w, e = _eintr_retry(select.select, [self], [], [],
|
# r, w, e = _eintr_retry(select.select, [self], [], [],
|
||||||
# AttributeError: 'NoneType' object has no attribute 'select'
|
# AttributeError: 'NoneType' object has no attribute 'select'
|
||||||
self.httpd_thread.join()
|
self.http_daemon_thread.join()
|
||||||
|
self.https_daemon_thread.join()
|
||||||
self.warcprox_thread.join()
|
self.warcprox_thread.join()
|
||||||
|
|
||||||
|
os.unlink(self._cert)
|
||||||
|
self.__cert = None
|
||||||
|
|
||||||
|
|
||||||
def test_something(self):
|
def test_something(self):
|
||||||
self.logger.info('sleeping for 5 seconds...')
|
self.logger.info('sleeping for 100 seconds...')
|
||||||
try:
|
try:
|
||||||
time.sleep(5)
|
time.sleep(100)
|
||||||
except:
|
except:
|
||||||
self.logger.info('interrupted')
|
self.logger.info('interrupted')
|
||||||
self.logger.info('finished sleeping')
|
self.logger.info('finished sleeping')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user