From 18cc818cf0e23a56c6d27a4a564ab67ff3367c42 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Thu, 5 Nov 2015 02:55:18 +0000
Subject: [PATCH] more timing tweaks to make sure tests pass, improved logging
 etc

---
 tests/test_warcprox.py | 73 +++++++++++++++++++++++++++++++-----------
 1 file changed, 54 insertions(+), 19 deletions(-)

diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py
index 2285d8b..404279d 100755
--- a/tests/test_warcprox.py
+++ b/tests/test_warcprox.py
@@ -18,6 +18,10 @@ import json
 import random
 import rethinkstuff
 from hanzo import warctools
+import warnings
+import pprint
+import traceback
+import signal
 
 try:
     import http.server as http_server
@@ -35,6 +39,25 @@ import warcprox
 
 logging.basicConfig(stream=sys.stdout, level=logging.INFO,
         format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
+logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
+warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
+warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
+
+def dump_state(signum=None, frame=None):
+    pp = pprint.PrettyPrinter(indent=4)
+    state_strs = []
+
+    for th in threading.enumerate():
+        try:
+            state_strs.append(str(th))
+        except AssertionError:
+            state_strs.append("<n/a:AssertionError>")
+        stack = traceback.format_stack(sys._current_frames()[th.ident])
+        state_strs.append("".join(stack))
+
+    logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
+
+signal.signal(signal.SIGQUIT, dump_state)
 
 class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
     def do_GET(self):
@@ -147,6 +170,7 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
 
     def fin():
         if captures_db:
+            captures_db.close()
             logging.info('dropping rethinkdb database {}'.format(db))
             result = captures_db.r.db_drop(db).run()
             logging.info("result=%s", result)
@@ -168,6 +192,7 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db):
 
     def fin():
         if rethinkdb_servers:
+            ddb.close()
             if not captures_db:
                 logging.info('dropping rethinkdb database {}'.format(db))
                 result = ddb.r.db_drop(db).run()
@@ -208,6 +233,7 @@ def stats_db(request, rethinkdb_servers):
         sdb = warcprox.stats.StatsDb(stats_db_file)
 
     def fin():
+        sdb.close()
         if rethinkdb_servers:
             logging.info('dropping rethinkdb database {}'.format(db))
             result = sdb.r.db_drop(db).run()
@@ -396,6 +422,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
     assert response.headers['warcprox-test-header'] == 'e!'
     assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
 
+    # wait for writer thread to process
+    time.sleep(0.5)
+    while not warcprox_.warc_writer_thread.idle:
+        time.sleep(0.5)
+    time.sleep(0.5)
+
     # check in dedup db
     # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
     dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
@@ -417,10 +449,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
 
     # wait for writer thread to process
     time.sleep(0.5)
-    while (not warcprox_.warc_writer_thread.idle
-            or (warcprox_.proxy.stats_db
-                and hasattr(warcprox_.proxy.stats_db, "_executor")
-                and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
+    while not warcprox_.warc_writer_thread.idle:
         time.sleep(0.5)
     time.sleep(0.5)
 
@@ -463,6 +492,12 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
     assert response.headers['warcprox-test-header'] == 'g!'
     assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
 
+    # wait for writer thread to process
+    time.sleep(0.5)
+    while not warcprox_.warc_writer_thread.idle:
+        time.sleep(0.5)
+    time.sleep(0.5)
+
     # check in dedup db
     # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
     dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
@@ -484,14 +519,10 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
 
     # wait for writer thread to process
     time.sleep(0.5)
-    while (not warcprox_.warc_writer_thread.idle
-            or (warcprox_.proxy.stats_db
-                and hasattr(warcprox_.proxy.stats_db, "_executor")
-                and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
+    while not warcprox_.warc_writer_thread.idle:
         time.sleep(0.5)
     time.sleep(0.5)
 
-
     # check in dedup db (no change from prev)
     dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
     assert dedup_lookup['url'] == url.encode('ascii')
@@ -511,7 +542,18 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
     request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}}
     headers = {"Warcprox-Meta": json.dumps(request_meta)}
 
-    for i in range(10):
+    response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
+    assert response.status_code == 200
+    assert response.headers['warcprox-test-header'] == 'i!'
+    assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
+
+    # wait for writer thread to process
+    time.sleep(0.5)
+    while not warcprox_.warc_writer_thread.idle:
+        time.sleep(0.5)
+    time.sleep(0.5)
+
+    for i in range(9):
         response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
         assert response.status_code == 200
         assert response.headers['warcprox-test-header'] == 'i!'
@@ -519,10 +561,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
 
     # wait for writer thread to process
     time.sleep(0.5)
-    while (not warcprox_.warc_writer_thread.idle
-            or (warcprox_.proxy.stats_db
-                and hasattr(warcprox_.proxy.stats_db, "_executor")
-                and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
+    while not warcprox_.warc_writer_thread.idle:
         time.sleep(0.5)
     time.sleep(0.5)
 
@@ -547,10 +586,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
 
     # wait for writer thread to process
     time.sleep(0.5)
-    while (not warcprox_.warc_writer_thread.idle
-            or (warcprox_.proxy.stats_db
-                and hasattr(warcprox_.proxy.stats_db, "_executor")
-                and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
+    while not warcprox_.warc_writer_thread.idle:
         time.sleep(0.5)
     time.sleep(0.5)
 
@@ -660,7 +696,6 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
     finally:
         fh.close()
 
-
 if __name__ == '__main__':
     pytest.main()