mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
convert test_warcprox.py to py.test with fixtures
This commit is contained in:
parent
d38ab08086
commit
d3d23f9878
@ -1,7 +1,7 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# vim: set sw=4 et:
|
# vim: set sw=4 et:
|
||||||
|
|
||||||
import unittest
|
import pytest
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
@ -13,6 +13,7 @@ import OpenSSL
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import requests
|
import requests
|
||||||
|
import re
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import http.server as http_server
|
import http.server as http_server
|
||||||
@ -32,11 +33,9 @@ import warcprox.playback
|
|||||||
import warcprox.warcwriter
|
import warcprox.warcwriter
|
||||||
import warcprox.dedup
|
import warcprox.dedup
|
||||||
|
|
||||||
class TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
|
class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
|
||||||
logger = logging.getLogger('TestHttpRequestHandler')
|
|
||||||
|
|
||||||
def do_GET(self):
|
def do_GET(self):
|
||||||
self.logger.info('GET {}'.format(self.path))
|
logging.info('GET {}'.format(self.path))
|
||||||
|
|
||||||
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
|
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
|
||||||
if m is not None:
|
if m is not None:
|
||||||
@ -57,358 +56,360 @@ class TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
self.connection.sendall(headers)
|
self.connection.sendall(headers)
|
||||||
self.connection.sendall(payload)
|
self.connection.sendall(payload)
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def cert(request):
|
||||||
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-https-', suffix='.pem', delete=False)
|
||||||
|
|
||||||
class WarcproxTest(unittest.TestCase):
|
def fin():
|
||||||
logger = logging.getLogger('WarcproxTest')
|
logging.info("deleting file %s", f.name)
|
||||||
|
os.unlink(f.name)
|
||||||
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
def __init__(self, methodName='runTest'):
|
try:
|
||||||
self.__cert = None
|
key = OpenSSL.crypto.PKey()
|
||||||
unittest.TestCase.__init__(self, methodName)
|
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
||||||
|
req = OpenSSL.crypto.X509Req()
|
||||||
|
req.get_subject().CN = 'localhost'
|
||||||
|
req.set_pubkey(key)
|
||||||
|
req.sign(key, 'sha1')
|
||||||
|
cert = OpenSSL.crypto.X509()
|
||||||
|
cert.set_subject(req.get_subject())
|
||||||
|
cert.set_serial_number(0)
|
||||||
|
cert.gmtime_adj_notBefore(0)
|
||||||
|
cert.gmtime_adj_notAfter(2*60*60) # valid for 2hrs
|
||||||
|
cert.set_issuer(cert.get_subject())
|
||||||
|
cert.set_pubkey(req.get_pubkey())
|
||||||
|
cert.sign(key, 'sha1')
|
||||||
|
|
||||||
@property
|
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
|
||||||
def _cert(self):
|
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
|
||||||
if self.__cert is None:
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-https-', suffix='.pem', delete=False)
|
|
||||||
try:
|
|
||||||
key = OpenSSL.crypto.PKey()
|
|
||||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
|
||||||
req = OpenSSL.crypto.X509Req()
|
|
||||||
req.get_subject().CN = 'localhost'
|
|
||||||
req.set_pubkey(key)
|
|
||||||
req.sign(key, 'sha1')
|
|
||||||
cert = OpenSSL.crypto.X509()
|
|
||||||
cert.set_subject(req.get_subject())
|
|
||||||
cert.set_serial_number(0)
|
|
||||||
cert.gmtime_adj_notBefore(0)
|
|
||||||
cert.gmtime_adj_notAfter(2*60*60) # valid for 2hrs
|
|
||||||
cert.set_issuer(cert.get_subject())
|
|
||||||
cert.set_pubkey(req.get_pubkey())
|
|
||||||
cert.sign(key, 'sha1')
|
|
||||||
|
|
||||||
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
|
logging.info('generated self-signed certificate {}'.format(f.name))
|
||||||
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
|
return f.name
|
||||||
|
finally:
|
||||||
self.logger.info('generated self-signed certificate {}'.format(f.name))
|
|
||||||
self.__cert = f.name
|
|
||||||
finally:
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
return self.__cert
|
|
||||||
|
|
||||||
|
|
||||||
def _start_http_servers(self):
|
|
||||||
self.http_daemon = http_server.HTTPServer(('localhost', 0),
|
|
||||||
RequestHandlerClass=TestHttpRequestHandler)
|
|
||||||
self.logger.info('starting http://{}:{}'.format(self.http_daemon.server_address[0], self.http_daemon.server_address[1]))
|
|
||||||
self.http_daemon_thread = threading.Thread(name='HttpdThread',
|
|
||||||
target=self.http_daemon.serve_forever)
|
|
||||||
self.http_daemon_thread.start()
|
|
||||||
|
|
||||||
# http://www.piware.de/2011/01/creating-an-https-server-in-python/
|
|
||||||
self.https_daemon = http_server.HTTPServer(('localhost', 0),
|
|
||||||
RequestHandlerClass=TestHttpRequestHandler)
|
|
||||||
# self.https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True)
|
|
||||||
self.https_daemon.socket = ssl.wrap_socket(self.https_daemon.socket, certfile=self._cert, server_side=True)
|
|
||||||
self.logger.info('starting https://{}:{}'.format(self.https_daemon.server_address[0], self.https_daemon.server_address[1]))
|
|
||||||
self.https_daemon_thread = threading.Thread(name='HttpdThread',
|
|
||||||
target=self.https_daemon.serve_forever)
|
|
||||||
self.https_daemon_thread.start()
|
|
||||||
|
|
||||||
|
|
||||||
def _start_warcprox(self):
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
|
|
||||||
f.close() # delete it, or CertificateAuthority will try to read it
|
|
||||||
self._ca_file = f.name
|
|
||||||
self._ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
|
|
||||||
ca = certauth.certauth.CertificateAuthority(self._ca_file, self._ca_dir, 'warcprox-test')
|
|
||||||
|
|
||||||
recorded_url_q = queue.Queue()
|
|
||||||
|
|
||||||
proxy = warcprox.warcprox.WarcProxy(server_address=('localhost', 0), ca=ca,
|
|
||||||
recorded_url_q=recorded_url_q)
|
|
||||||
|
|
||||||
self._warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
|
|
||||||
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
|
|
||||||
f.close()
|
f.close()
|
||||||
self._playback_index_db_file = f.name
|
|
||||||
playback_index_db = warcprox.playback.PlaybackIndexDb(self._playback_index_db_file)
|
|
||||||
playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca,
|
|
||||||
playback_index_db=playback_index_db, warcs_dir=self._warcs_dir)
|
|
||||||
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
|
@pytest.fixture()
|
||||||
f.close()
|
def http_daemon(request):
|
||||||
self._dedup_db_file = f.name
|
http_daemon = http_server.HTTPServer(('localhost', 0),
|
||||||
dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file)
|
RequestHandlerClass=_TestHttpRequestHandler)
|
||||||
|
logging.info('starting http://{}:{}'.format(http_daemon.server_address[0], http_daemon.server_address[1]))
|
||||||
|
http_daemon_thread = threading.Thread(name='HttpDaemonThread',
|
||||||
|
target=http_daemon.serve_forever)
|
||||||
|
http_daemon_thread.start()
|
||||||
|
|
||||||
warc_writer = warcprox.warcwriter.WarcWriter(directory=self._warcs_dir,
|
def fin():
|
||||||
port=proxy.server_port, dedup_db=dedup_db,
|
logging.info("stopping http daemon")
|
||||||
playback_index_db=playback_index_db)
|
http_daemon.shutdown()
|
||||||
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
|
http_daemon.server_close()
|
||||||
warc_writer=warc_writer)
|
http_daemon_thread.join()
|
||||||
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
self.warcprox = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
return http_daemon
|
||||||
self.logger.info('starting warcprox')
|
|
||||||
self.warcprox_thread = threading.Thread(name='WarcproxThread',
|
|
||||||
target=self.warcprox.run_until_shutdown)
|
|
||||||
self.warcprox_thread.start()
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def https_daemon(request, cert):
|
||||||
|
# http://www.piware.de/2011/01/creating-an-https-server-in-python/
|
||||||
|
https_daemon = http_server.HTTPServer(('localhost', 0),
|
||||||
|
RequestHandlerClass=_TestHttpRequestHandler)
|
||||||
|
# https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True)
|
||||||
|
https_daemon.socket = ssl.wrap_socket(https_daemon.socket, certfile=cert, server_side=True)
|
||||||
|
logging.info('starting https://{}:{}'.format(https_daemon.server_address[0], https_daemon.server_address[1]))
|
||||||
|
https_daemon_thread = threading.Thread(name='HttpsDaemonThread',
|
||||||
|
target=https_daemon.serve_forever)
|
||||||
|
https_daemon_thread.start()
|
||||||
|
|
||||||
def setUp(self):
|
def fin():
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
|
logging.info("stopping https daemon")
|
||||||
format='%(asctime)s %(levelname)s %(process)d %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
https_daemon.shutdown()
|
||||||
|
https_daemon.server_close()
|
||||||
|
https_daemon_thread.join()
|
||||||
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
self._start_http_servers()
|
return https_daemon
|
||||||
self._start_warcprox()
|
|
||||||
|
|
||||||
archiving_proxy = 'http://localhost:{}'.format(self.warcprox.proxy.server_port)
|
@pytest.fixture()
|
||||||
self.archiving_proxies = {'http':archiving_proxy, 'https':archiving_proxy}
|
def warcprox_(request):
|
||||||
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
|
||||||
|
f.close() # delete it, or CertificateAuthority will try to read it
|
||||||
|
ca_file = f.name
|
||||||
|
ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
|
||||||
|
ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test')
|
||||||
|
|
||||||
playback_proxy = 'http://localhost:{}'.format(self.warcprox.playback_proxy.server_port)
|
recorded_url_q = queue.Queue()
|
||||||
self.playback_proxies = {'http':playback_proxy, 'https':playback_proxy}
|
|
||||||
|
|
||||||
|
proxy = warcprox.warcprox.WarcProxy(server_address=('localhost', 0), ca=ca,
|
||||||
|
recorded_url_q=recorded_url_q)
|
||||||
|
|
||||||
def tearDown(self):
|
warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
|
||||||
self.logger.info('stopping warcprox')
|
|
||||||
self.warcprox.stop.set()
|
|
||||||
|
|
||||||
self.logger.info('stopping http and https daemons')
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
|
||||||
self.http_daemon.shutdown()
|
f.close()
|
||||||
self.https_daemon.shutdown()
|
playback_index_db_file = f.name
|
||||||
self.http_daemon.server_close()
|
playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file)
|
||||||
self.https_daemon.server_close()
|
playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca,
|
||||||
|
playback_index_db=playback_index_db, warcs_dir=warcs_dir)
|
||||||
|
|
||||||
# Have to wait for threads to finish or the threads will try to use
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
|
||||||
# variables that no longer exist, resulting in errors like this:
|
f.close()
|
||||||
# File "/usr/lib/python2.7/SocketServer.py", line 235, in serve_forever
|
dedup_db_file = f.name
|
||||||
# r, w, e = _eintr_retry(select.select, [self], [], [],
|
dedup_db = warcprox.dedup.DedupDb(dedup_db_file)
|
||||||
# AttributeError: 'NoneType' object has no attribute 'select'
|
|
||||||
self.http_daemon_thread.join()
|
|
||||||
self.https_daemon_thread.join()
|
|
||||||
self.warcprox_thread.join()
|
|
||||||
|
|
||||||
for f in (self.__cert, self._ca_file, self._ca_dir, self._warcs_dir, self._playback_index_db_file, self._dedup_db_file):
|
default_warc_writer = warcprox.warcwriter.WarcWriter(directory=warcs_dir,
|
||||||
|
port=proxy.server_port, dedup_db=dedup_db,
|
||||||
|
playback_index_db=playback_index_db)
|
||||||
|
writer_pool = warcprox.warcwriter.WarcWriterPool(default_warc_writer)
|
||||||
|
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
|
||||||
|
writer_pool=writer_pool)
|
||||||
|
|
||||||
|
warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
||||||
|
logging.info('starting warcprox')
|
||||||
|
warcprox_thread = threading.Thread(name='WarcproxThread',
|
||||||
|
target=warcprox_.run_until_shutdown)
|
||||||
|
warcprox_thread.start()
|
||||||
|
|
||||||
|
def fin():
|
||||||
|
logging.info('stopping warcprox')
|
||||||
|
warcprox_.stop.set()
|
||||||
|
warcprox_thread.join()
|
||||||
|
for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file, dedup_db_file):
|
||||||
if os.path.isdir(f):
|
if os.path.isdir(f):
|
||||||
self.logger.info('deleting directory {}'.format(f))
|
logging.info('deleting directory {}'.format(f))
|
||||||
shutil.rmtree(f)
|
shutil.rmtree(f)
|
||||||
else:
|
else:
|
||||||
self.logger.info('deleting file {}'.format(f))
|
logging.info('deleting file {}'.format(f))
|
||||||
os.unlink(f)
|
os.unlink(f)
|
||||||
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
|
return warcprox_
|
||||||
|
|
||||||
def _test_httpds_no_proxy(self):
|
@pytest.fixture()
|
||||||
url = 'http://localhost:{}/'.format(self.http_daemon.server_port)
|
def archiving_proxies(warcprox_):
|
||||||
response = requests.get(url)
|
archiving_proxy = 'http://localhost:{}'.format(warcprox_.proxy.server_port)
|
||||||
self.assertEqual(response.status_code, 404)
|
return {'http':archiving_proxy, 'https':archiving_proxy}
|
||||||
self.assertEqual(response.content, b'404 Not Found\n')
|
|
||||||
|
|
||||||
url = 'https://localhost:{}/'.format(self.https_daemon.server_port)
|
@pytest.fixture()
|
||||||
response = requests.get(url, verify=False)
|
def playback_proxies(warcprox_):
|
||||||
self.assertEqual(response.status_code, 404)
|
playback_proxy = 'http://localhost:{}'.format(warcprox_.playback_proxy.server_port)
|
||||||
self.assertEqual(response.content, b'404 Not Found\n')
|
return {'http':playback_proxy, 'https':playback_proxy}
|
||||||
|
|
||||||
url = 'http://localhost:{}/a/b'.format(self.http_daemon.server_port)
|
# def tearDown(self):
|
||||||
response = requests.get(url)
|
# logging.info('stopping warcprox')
|
||||||
self.assertEqual(response.status_code, 200)
|
# self.warcprox.stop.set()
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
|
#
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! bbbbbbbbbb!\n')
|
# logging.info('stopping http and https daemons')
|
||||||
|
# self.http_daemon.shutdown()
|
||||||
|
# self.https_daemon.shutdown()
|
||||||
|
# self.http_daemon.server_close()
|
||||||
|
# self.https_daemon.server_close()
|
||||||
|
#
|
||||||
|
# self.http_daemon_thread.join()
|
||||||
|
# self.https_daemon_thread.join()
|
||||||
|
# self.warcprox_thread.join()
|
||||||
|
#
|
||||||
|
# for f in (self.__cert, self._ca_file, self._ca_dir, self._warcs_dir, self._playback_index_db_file, self._dedup_db_file):
|
||||||
|
# if os.path.isdir(f):
|
||||||
|
# logging.info('deleting directory {}'.format(f))
|
||||||
|
# shutil.rmtree(f)
|
||||||
|
# else:
|
||||||
|
# logging.info('deleting file {}'.format(f))
|
||||||
|
# os.unlink(f)
|
||||||
|
|
||||||
url = 'https://localhost:{}/c/d'.format(self.https_daemon.server_port)
|
def test_httpds_no_proxy(http_daemon, https_daemon):
|
||||||
response = requests.get(url, verify=False)
|
url = 'http://localhost:{}/'.format(http_daemon.server_port)
|
||||||
self.assertEqual(response.status_code, 200)
|
response = requests.get(url)
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'c!')
|
assert response.status_code == 404
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! dddddddddd!\n')
|
assert response.content == b'404 Not Found\n'
|
||||||
|
|
||||||
|
url = 'https://localhost:{}/'.format(https_daemon.server_port)
|
||||||
|
response = requests.get(url, verify=False)
|
||||||
|
assert response.status_code == 404
|
||||||
|
assert response.content == b'404 Not Found\n'
|
||||||
|
|
||||||
def poll_playback_until(self, url, status, timeout_sec):
|
url = 'http://localhost:{}/a/b'.format(http_daemon.server_port)
|
||||||
start = time.time()
|
response = requests.get(url)
|
||||||
# check playback (warc writing is asynchronous, give it up to 10 sec)
|
assert response.status_code == 200
|
||||||
while time.time() - start < timeout_sec:
|
assert response.headers['warcprox-test-header'] == 'a!'
|
||||||
response = requests.get(url, proxies=self.playback_proxies, verify=False)
|
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
|
||||||
if response.status_code == status:
|
|
||||||
break
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
return response
|
url = 'https://localhost:{}/c/d'.format(https_daemon.server_port)
|
||||||
|
response = requests.get(url, verify=False)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'c!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
|
||||||
|
|
||||||
|
def _poll_playback_until(playback_proxies, url, status, timeout_sec):
|
||||||
|
start = time.time()
|
||||||
|
# check playback (warc writing is asynchronous, give it up to 10 sec)
|
||||||
|
while time.time() - start < timeout_sec:
|
||||||
|
response = requests.get(url, proxies=playback_proxies, verify=False)
|
||||||
|
if response.status_code == status:
|
||||||
|
break
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
def _test_archive_and_playback_http_url(self):
|
return response
|
||||||
url = 'http://localhost:{}/a/b'.format(self.http_daemon.server_port)
|
|
||||||
|
|
||||||
# ensure playback fails before archiving
|
def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies):
|
||||||
response = requests.get(url, proxies=self.playback_proxies)
|
url = 'http://localhost:{}/a/b'.format(http_daemon.server_port)
|
||||||
self.assertEqual(response.status_code, 404)
|
|
||||||
self.assertEqual(response.content, b'404 Not in Archive\n')
|
|
||||||
|
|
||||||
# archive
|
# ensure playback fails before archiving
|
||||||
response = requests.get(url, proxies=self.archiving_proxies)
|
response = requests.get(url, proxies=playback_proxies)
|
||||||
self.assertEqual(response.status_code, 200)
|
assert response.status_code == 404
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
|
assert response.content == b'404 Not in Archive\n'
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! bbbbbbbbbb!\n')
|
|
||||||
|
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
# archive
|
||||||
self.assertEqual(response.status_code, 200)
|
response = requests.get(url, proxies=archiving_proxies)
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'a!')
|
assert response.status_code == 200
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! bbbbbbbbbb!\n')
|
assert response.headers['warcprox-test-header'] == 'a!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
|
||||||
|
|
||||||
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'a!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
|
||||||
|
|
||||||
def _test_archive_and_playback_https_url(self):
|
def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies):
|
||||||
url = 'https://localhost:{}/c/d'.format(self.https_daemon.server_port)
|
url = 'https://localhost:{}/c/d'.format(https_daemon.server_port)
|
||||||
|
|
||||||
# ensure playback fails before archiving
|
# ensure playback fails before archiving
|
||||||
response = requests.get(url, proxies=self.playback_proxies, verify=False)
|
response = requests.get(url, proxies=playback_proxies, verify=False)
|
||||||
self.assertEqual(response.status_code, 404)
|
assert response.status_code == 404
|
||||||
self.assertEqual(response.content, b'404 Not in Archive\n')
|
assert response.content == b'404 Not in Archive\n'
|
||||||
|
|
||||||
# fetch & archive response
|
# fetch & archive response
|
||||||
response = requests.get(url, proxies=self.archiving_proxies, verify=False)
|
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||||
self.assertEqual(response.status_code, 200)
|
assert response.status_code == 200
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'c!')
|
assert response.headers['warcprox-test-header'] == 'c!'
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! dddddddddd!\n')
|
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
|
||||||
|
|
||||||
# test playback
|
# test playback
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
self.assertEqual(response.status_code, 200)
|
assert response.status_code == 200
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'c!')
|
assert response.headers['warcprox-test-header'] == 'c!'
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! dddddddddd!\n')
|
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
|
||||||
|
|
||||||
|
# test dedup of same http url with same payload
|
||||||
|
def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies):
|
||||||
|
url = 'http://localhost:{}/e/f'.format(http_daemon.server_port)
|
||||||
|
|
||||||
# test dedup of same http url with same payload
|
# ensure playback fails before archiving
|
||||||
def _test_dedup_http(self):
|
response = requests.get(url, proxies=playback_proxies, verify=False)
|
||||||
url = 'http://localhost:{}/e/f'.format(self.http_daemon.server_port)
|
assert response.status_code == 404
|
||||||
|
assert response.content == b'404 Not in Archive\n'
|
||||||
|
|
||||||
# ensure playback fails before archiving
|
# check not in dedup db
|
||||||
response = requests.get(url, proxies=self.playback_proxies, verify=False)
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
||||||
self.assertEqual(response.status_code, 404)
|
assert dedup_lookup is None
|
||||||
self.assertEqual(response.content, b'404 Not in Archive\n')
|
|
||||||
|
|
||||||
# check not in dedup db
|
# archive
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||||
self.assertIsNone(dedup_lookup)
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'e!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
|
||||||
|
|
||||||
# archive
|
# test playback
|
||||||
response = requests.get(url, proxies=self.archiving_proxies, verify=False)
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
self.assertEqual(response.status_code, 200)
|
assert response.status_code == 200
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'e!')
|
assert response.headers['warcprox-test-header'] == 'e!'
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! ffffffffff!\n')
|
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
|
||||||
|
|
||||||
# test playback
|
# check in dedup db
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
|
||||||
self.assertEqual(response.status_code, 200)
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'e!')
|
assert dedup_lookup['u'] == url.encode('ascii')
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! ffffffffff!\n')
|
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['i'])
|
||||||
|
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['d'])
|
||||||
|
record_id = dedup_lookup['i']
|
||||||
|
dedup_date = dedup_lookup['d']
|
||||||
|
|
||||||
# check in dedup db
|
# need revisit to have a later timestamp than original, else playing
|
||||||
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
|
# back the latest record might not hit the revisit
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
time.sleep(1.5)
|
||||||
self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
|
|
||||||
self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$')
|
|
||||||
self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$')
|
|
||||||
record_id = dedup_lookup['i']
|
|
||||||
dedup_date = dedup_lookup['d']
|
|
||||||
|
|
||||||
# need revisit to have a later timestamp than original, else playing
|
# fetch & archive revisit
|
||||||
# back the latest record might not hit the revisit
|
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||||
time.sleep(1.5)
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'e!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
|
||||||
|
|
||||||
# fetch & archive revisit
|
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\
|
||||||
response = requests.get(url, proxies=self.archiving_proxies, verify=False)
|
time.sleep(2.0)
|
||||||
self.assertEqual(response.status_code, 200)
|
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'e!')
|
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! ffffffffff!\n')
|
|
||||||
|
|
||||||
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\
|
# check in dedup db (no change from prev)
|
||||||
time.sleep(2.0)
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
||||||
|
assert dedup_lookup['u'] == url.encode('ascii')
|
||||||
|
assert dedup_lookup['i'] == record_id
|
||||||
|
assert dedup_lookup['d'] == dedup_date
|
||||||
|
|
||||||
# check in dedup db (no change from prev)
|
# test playback
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
|
logging.debug('testing playback of revisit of {}'.format(url))
|
||||||
self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
self.assertEqual(dedup_lookup['i'], record_id)
|
assert response.status_code == 200
|
||||||
self.assertEqual(dedup_lookup['d'], dedup_date)
|
assert response.headers['warcprox-test-header'] == 'e!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
|
||||||
|
# XXX how to check dedup was used?
|
||||||
|
|
||||||
# test playback
|
# test dedup of same https url with same payload
|
||||||
self.logger.debug('testing playback of revisit of {}'.format(url))
|
def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies):
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
url = 'https://localhost:{}/g/h'.format(https_daemon.server_port)
|
||||||
self.assertEqual(response.status_code, 200)
|
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'e!')
|
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! ffffffffff!\n')
|
|
||||||
# XXX how to check dedup was used?
|
|
||||||
|
|
||||||
|
# ensure playback fails before archiving
|
||||||
|
response = requests.get(url, proxies=playback_proxies, verify=False)
|
||||||
|
assert response.status_code == 404
|
||||||
|
assert response.content == b'404 Not in Archive\n'
|
||||||
|
|
||||||
# test dedup of same https url with same payload
|
# check not in dedup db
|
||||||
def _test_dedup_https(self):
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
||||||
url = 'https://localhost:{}/g/h'.format(self.https_daemon.server_port)
|
assert dedup_lookup is None
|
||||||
|
|
||||||
# ensure playback fails before archiving
|
# archive
|
||||||
response = requests.get(url, proxies=self.playback_proxies, verify=False)
|
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||||
self.assertEqual(response.status_code, 404)
|
assert response.status_code == 200
|
||||||
self.assertEqual(response.content, b'404 Not in Archive\n')
|
assert response.headers['warcprox-test-header'] == 'g!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
|
||||||
|
|
||||||
# check not in dedup db
|
# test playback
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
self.assertIsNone(dedup_lookup)
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'g!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
|
||||||
|
|
||||||
# archive
|
# check in dedup db
|
||||||
response = requests.get(url, proxies=self.archiving_proxies, verify=False)
|
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
|
||||||
self.assertEqual(response.status_code, 200)
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'g!')
|
assert dedup_lookup['u'] == url.encode('ascii')
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! hhhhhhhhhh!\n')
|
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['i'])
|
||||||
|
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['d'])
|
||||||
|
record_id = dedup_lookup['i']
|
||||||
|
dedup_date = dedup_lookup['d']
|
||||||
|
|
||||||
# test playback
|
# need revisit to have a later timestamp than original, else playing
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
# back the latest record might not hit the revisit
|
||||||
self.assertEqual(response.status_code, 200)
|
time.sleep(1.5)
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'g!')
|
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! hhhhhhhhhh!\n')
|
|
||||||
|
|
||||||
# check in dedup db
|
# fetch & archive revisit
|
||||||
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
|
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
assert response.status_code == 200
|
||||||
self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
|
assert response.headers['warcprox-test-header'] == 'g!'
|
||||||
self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$')
|
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
|
||||||
self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$')
|
|
||||||
record_id = dedup_lookup['i']
|
|
||||||
dedup_date = dedup_lookup['d']
|
|
||||||
|
|
||||||
# need revisit to have a later timestamp than original, else playing
|
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\
|
||||||
# back the latest record might not hit the revisit
|
time.sleep(2.0)
|
||||||
time.sleep(1.5)
|
|
||||||
|
|
||||||
# fetch & archive revisit
|
# check in dedup db (no change from prev)
|
||||||
response = requests.get(url, proxies=self.archiving_proxies, verify=False)
|
dedup_lookup = warcprox_.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
||||||
self.assertEqual(response.status_code, 200)
|
assert dedup_lookup['u'] == url.encode('ascii')
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'g!')
|
assert dedup_lookup['i'] == record_id
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! hhhhhhhhhh!\n')
|
assert dedup_lookup['d'] == dedup_date
|
||||||
|
|
||||||
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\
|
|
||||||
time.sleep(2.0)
|
|
||||||
|
|
||||||
# check in dedup db (no change from prev)
|
|
||||||
dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
|
|
||||||
self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
|
|
||||||
self.assertEqual(dedup_lookup['i'], record_id)
|
|
||||||
self.assertEqual(dedup_lookup['d'], dedup_date)
|
|
||||||
|
|
||||||
# test playback
|
|
||||||
self.logger.debug('testing playback of revisit of {}'.format(url))
|
|
||||||
response = self.poll_playback_until(url, status=200, timeout_sec=10)
|
|
||||||
self.assertEqual(response.status_code, 200)
|
|
||||||
self.assertEqual(response.headers['warcprox-test-header'], 'g!')
|
|
||||||
self.assertEqual(response.content, b'I am the warcprox test payload! hhhhhhhhhh!\n')
|
|
||||||
# XXX how to check dedup was used?
|
|
||||||
|
|
||||||
|
|
||||||
# run everything from here, otherwise it wants to setUp() and tearDown
|
|
||||||
# around each test
|
|
||||||
def runTest(self):
|
|
||||||
self._test_httpds_no_proxy()
|
|
||||||
self._test_archive_and_playback_http_url()
|
|
||||||
self._test_archive_and_playback_https_url()
|
|
||||||
self._test_dedup_http()
|
|
||||||
self._test_dedup_https()
|
|
||||||
# self._test_dedup_mixed_http()
|
|
||||||
# self._test_dedup_mixed_https()
|
|
||||||
|
|
||||||
|
# test playback
|
||||||
|
logging.debug('testing playback of revisit of {}'.format(url))
|
||||||
|
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'g!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
|
||||||
|
# XXX how to check dedup was used?
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
pytest.main()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user