test_archive_and_playback_http_url

This commit is contained in:
Noah Levitt 2013-11-20 12:06:29 -08:00
parent b2e45568f6
commit 25464dee80
2 changed files with 120 additions and 20 deletions

View File

@ -13,6 +13,7 @@ setuptools.setup(name='warcprox',
license='GPL',
packages=['warcprox'],
install_requires=['pyopenssl', 'warctools'], # gdbm/dbhash?
tests_require=['requests'],
scripts=['bin/dump-anydbm', 'bin/warcprox'],
zip_safe=False,
test_suite='warcprox.tests')

139
warcprox/tests/test_warcproxy.py Normal file → Executable file
View File

@ -1,3 +1,4 @@
#!/usr/bin/env python
# vim: set sw=4 et:
from warcprox import warcprox
@ -12,6 +13,9 @@ import re
import tempfile
import OpenSSL
import os
import shutil
import Queue
import requests
class TestHttpRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
logger = logging.getLogger('TestHttpRequestHandler')
@ -49,7 +53,7 @@ class WarcproxTest(unittest.TestCase):
@property
def _cert(self):
if self.__cert is None:
f = tempfile.NamedTemporaryFile(delete=False)
f = tempfile.NamedTemporaryFile(prefix='warcprox-test', suffix='-https.pem', delete=False)
try:
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
@ -77,37 +81,76 @@ class WarcproxTest(unittest.TestCase):
return self.__cert
def setUp(self):
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
# start test http server
def _start_http_servers(self):
self.http_daemon = BaseHTTPServer.HTTPServer(('localhost', 0),
RequestHandlerClass=TestHttpRequestHandler)
self.logger.info('starting http_daemon on {}:{}'.format(self.http_daemon.server_address[0], self.http_daemon.server_address[1]))
self.logger.info('starting http://{}:{}'.format(self.http_daemon.server_address[0], self.http_daemon.server_address[1]))
self.http_daemon_thread = threading.Thread(name='HttpdThread',
target=self.http_daemon.serve_forever)
self.http_daemon_thread.start()
# start test https
# http://www.piware.de/2011/01/creating-an-https-server-in-python/
self.https_daemon = BaseHTTPServer.HTTPServer(('localhost', 0),
RequestHandlerClass=TestHttpRequestHandler)
# self.https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True)
self.https_daemon.socket = ssl.wrap_socket(self.https_daemon.socket, certfile=self._cert, server_side=True)
self.logger.info('starting https_daemon on {}:{}'.format(self.https_daemon.server_address[0], self.https_daemon.server_address[1]))
self.logger.info('starting https://{}:{}'.format(self.https_daemon.server_address[0], self.https_daemon.server_address[1]))
self.https_daemon_thread = threading.Thread(name='HttpdThread',
target=self.https_daemon.serve_forever)
self.https_daemon_thread.start()
# start warcprox
self.warcprox = warcprox.WarcproxController()
def _start_warcprox(self):
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-', suffix='-ca.pem', delete=True)
f.close() # delete it, or CertificateAuthority will try to read it
self._ca_file = f.name
self._ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
ca = warcprox.CertificateAuthority(self._ca_file, self._ca_dir)
recorded_url_q = Queue.Queue()
proxy = warcprox.WarcProxy(server_address=('localhost', 0), ca=ca,
recorded_url_q=recorded_url_q)
self._warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-warcs')
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-', suffix='-playback-index.db', delete=False)
f.close()
self._playback_index_db_file = f.name
playback_index_db = warcprox.PlaybackIndexDb(self._playback_index_db_file)
playback_proxy = warcprox.PlaybackProxy(server_address=('localhost', 0), ca=ca,
playback_index_db=playback_index_db, warcs_dir=self._warcs_dir)
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-', suffix='-dedup.db', delete=False)
f.close()
self._dedup_db_file = f.name
dedup_db = warcprox.DedupDb(self._dedup_db_file)
warc_writer = warcprox.WarcWriterThread(recorded_url_q=recorded_url_q,
directory=self._warcs_dir, port=proxy.server_port,
dedup_db=dedup_db, playback_index_db=playback_index_db)
self.warcprox = warcprox.WarcproxController(proxy, warc_writer, playback_proxy)
self.logger.info('starting warcprox')
self.warcprox_thread = threading.Thread(name='WarcproxThread',
target=self.warcprox.run_until_shutdown)
self.warcprox_thread.start()
def setUp(self):
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
self._start_http_servers()
self._start_warcprox()
archiving_proxy = 'http://localhost:{}'.format(self.warcprox.proxy.server_port)
self.archiving_proxies = {'http':archiving_proxy, 'https':archiving_proxy}
playback_proxy = 'http://localhost:{}'.format(self.warcprox.playback_proxy.server_port)
self.playback_proxies = {'http':playback_proxy, 'https':playback_proxy}
def tearDown(self):
self.logger.info('stopping warcprox')
self.warcprox.stop.set()
@ -127,17 +170,73 @@ class WarcproxTest(unittest.TestCase):
self.https_daemon_thread.join()
self.warcprox_thread.join()
os.unlink(self._cert)
self.__cert = None
for f in (self.__cert, self._ca_file, self._ca_dir, self._warcs_dir, self._playback_index_db_file, self._dedup_db_file):
if os.path.isdir(f):
logging.info('deleting directory {}'.format(f))
shutil.rmtree(f)
else:
logging.info('deleting file {}'.format(f))
os.unlink(f)
def test_httpds_no_proxy(self):
url = 'http://localhost:{}/'.format(self.http_daemon.server_port)
response = requests.get(url)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.content, '404 Not Found\n')
url = 'https://localhost:{}/'.format(self.https_daemon.server_port)
response = requests.get(url, verify=False)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.content, '404 Not Found\n')
url = 'http://localhost:{}/a/b'.format(self.http_daemon.server_port)
response = requests.get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
self.assertEqual(response.content, 'I am the warcprox test payload! bbbbbbbbbb!\n')
url = 'https://localhost:{}/c/d'.format(self.https_daemon.server_port)
response = requests.get(url, verify=False)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers['warcprox-test-header'], 'c!')
self.assertEqual(response.content, 'I am the warcprox test payload! dddddddddd!\n')
### # maybe useful checks, but arduous to include this much detail, will rely on the integration tests instead
### playback_index_lookup = self.warcprox.playback_proxy.playback_index_db.lookup_latest(url)
### self.assertEqual(playback_index_lookup, (None,None))
### playback_index_lookup = self.warcprox.playback_proxy.playback_index_db.lookup_latest(url)
### self.assertIsNotNone(playback_index_lookup[0])
### self.assertIsNotNone(playback_index_lookup[1])
### self.assertTrue(re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', playback_index_lookup[0]))
### self.assertEqual(type(playback_index_lookup[1]), dict)
### self.assertEqual(type(playback_index_lookup[1]['o']), int)
### self.assertTrue(re.match(r'^WARCPROX-\d{17}-00000-\d+-.*-\d+\.warc$', playback_index_lookup[1]['f']))
### dedup_db_lookup = self.warcprox.warc_writer.dedup_db.lookup('sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079')
### assertEqual(dedup_db_lookup['u'], url)
def test_archive_and_playback_http_url(self):
url = 'http://localhost:{}/a/b'.format(self.http_daemon.server_port)
# ensure playback fails before archiving
response = requests.get(url, proxies=self.playback_proxies)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.content, '404 Not in Archive\n')
# archive
response = requests.get(url, proxies=self.archiving_proxies)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
self.assertEqual(response.content, 'I am the warcprox test payload! bbbbbbbbbb!\n')
# check playback
response = requests.get(url, proxies=self.playback_proxies)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
self.assertEqual(response.content, 'I am the warcprox test payload! bbbbbbbbbb!\n')
def test_something(self):
self.logger.info('sleeping for 100 seconds...')
try:
time.sleep(100)
except:
self.logger.info('interrupted')
self.logger.info('finished sleeping')
if __name__ == '__main__':
unittest.main()