diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 2285d8b..404279d 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -18,6 +18,10 @@ import json import random import rethinkstuff from hanzo import warctools +import warnings +import pprint +import traceback +import signal try: import http.server as http_server @@ -35,6 +39,25 @@ import warcprox logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') +logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) +warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning) +warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) + +def dump_state(signum=None, frame=None): + pp = pprint.PrettyPrinter(indent=4) + state_strs = [] + + for th in threading.enumerate(): + try: + state_strs.append(str(th)) + except AssertionError: + state_strs.append("") + stack = traceback.format_stack(sys._current_frames()[th.ident]) + state_strs.append("".join(stack)) + + logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + +signal.signal(signal.SIGQUIT, dump_state) class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): def do_GET(self): @@ -147,6 +170,7 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): def fin(): if captures_db: + captures_db.close() logging.info('dropping rethinkdb database {}'.format(db)) result = captures_db.r.db_drop(db).run() logging.info("result=%s", result) @@ -168,6 +192,7 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): def fin(): if rethinkdb_servers: + ddb.close() if not captures_db: logging.info('dropping rethinkdb database {}'.format(db)) result = ddb.r.db_drop(db).run() @@ -208,6 +233,7 @@ def stats_db(request, rethinkdb_servers): sdb = warcprox.stats.StatsDb(stats_db_file) def fin(): + sdb.close() if rethinkdb_servers: logging.info('dropping rethinkdb database {}'.format(db)) result = sdb.r.db_drop(db).run() @@ -396,6 +422,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') @@ -417,10 +449,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while (not warcprox_.warc_writer_thread.idle - or (warcprox_.proxy.stats_db - and hasattr(warcprox_.proxy.stats_db, "_executor") - and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): + while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) time.sleep(0.5) @@ -463,6 +492,12 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') @@ -484,14 +519,10 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while (not warcprox_.warc_writer_thread.idle - or (warcprox_.proxy.stats_db - and hasattr(warcprox_.proxy.stats_db, "_executor") - and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): + while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) time.sleep(0.5) - # check in dedup db (no change from prev) dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') @@ -511,7 +542,18 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}} headers = {"Warcprox-Meta": json.dumps(request_meta)} - for i in range(10): + response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'i!' + assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + for i in range(9): response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'i!' @@ -519,10 +561,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while (not warcprox_.warc_writer_thread.idle - or (warcprox_.proxy.stats_db - and hasattr(warcprox_.proxy.stats_db, "_executor") - and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): + while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) time.sleep(0.5) @@ -547,10 +586,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while (not warcprox_.warc_writer_thread.idle - or (warcprox_.proxy.stats_db - and hasattr(warcprox_.proxy.stats_db, "_executor") - and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): + while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) time.sleep(0.5) @@ -660,7 +696,6 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() - if __name__ == '__main__': pytest.main()