diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index 8267806..4491a8b 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -215,9 +215,9 @@ if __name__ == '__main__': args.cacert = os.path.join(tmpdir, 'benchmark-warcprox-ca.pem') args.certs_dir = os.path.join(tmpdir, 'benchmark-warcprox-ca') args.directory = os.path.join(tmpdir, 'warcs') - if args.rethinkdb_servers: - args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % ( - datetime.datetime.utcnow()) + # if args.rethinkdb_servers: + # args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % ( + # datetime.datetime.utcnow()) start_servers() logging.info( @@ -247,7 +247,9 @@ if __name__ == '__main__': logging.info('SKIPPED') logging.info('===== baseline benchmark finished =====') - warcprox_controller = warcprox.main.init_controller(args) + options = warcprox.Options(**vars(args)) + warcprox_controller = warcprox.controller.WarcproxController(options) + warcprox_controller_thread = threading.Thread( target=warcprox_controller.run_until_shutdown) warcprox_controller_thread.start() diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index f5175b9..d091542 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -96,6 +96,14 @@ logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning) warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) +def wait(callback, timeout=10): + start = time.time() + while time.time() - start < timeout: + if callback(): + return + time.sleep(0.1) + raise Exception('timed out waiting for %s to return truthy' % callback) + # monkey patch dns lookup so we can test domain inheritance on localhost orig_getaddrinfo = socket.getaddrinfo orig_gethostbyname = socket.gethostbyname @@ -339,6 +347,9 @@ def warcprox_(request): logging.info('changing to working directory %r', work_dir) os.chdir(work_dir) + # we can't wait around all day in the tests + warcprox.BaseBatchPostfetchProcessor.MAX_BATCH_SEC = 0.5 + argv = ['warcprox', '--method-filter=GET', '--method-filter=POST', @@ -357,9 +368,12 @@ def warcprox_(request): argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url')) args = warcprox.main.parse_args(argv) - warcprox_ = warcprox.main.init_controller(args) + + options = warcprox.Options(**vars(args)) + warcprox_ = warcprox.controller.WarcproxController(options) logging.info('starting warcprox') + warcprox_.start() warcprox_thread = threading.Thread( name='WarcproxThread', target=warcprox_.run_until_shutdown) warcprox_thread.start() @@ -431,17 +445,9 @@ def test_httpds_no_proxy(http_daemon, https_daemon): assert response.headers['warcprox-test-header'] == 'c!' assert response.content == b'I am the warcprox test payload! dddddddddd!\n' -def _poll_playback_until(playback_proxies, url, status, timeout_sec): - start = time.time() - # check playback (warc writing is asynchronous, give it up to 10 sec) - while time.time() - start < timeout_sec: - response = requests.get(url, proxies=playback_proxies, verify=False) - if response.status_code == status: - break - time.sleep(0.5) - return response +def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies, warcprox_): + urls_before = warcprox_.proxy.running_stats.urls -def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies): url = 'http://localhost:{}/a/b'.format(http_daemon.server_port) # ensure playback fails before archiving @@ -455,12 +461,17 @@ def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_ assert response.headers['warcprox-test-header'] == 'a!' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'a!' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' -def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies): +def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies, warcprox_): + urls_before = warcprox_.proxy.running_stats.urls + url = 'https://localhost:{}/c/d'.format(https_daemon.server_port) # ensure playback fails before archiving @@ -474,14 +485,19 @@ def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playbac assert response.headers['warcprox-test-header'] == 'c!' assert response.content == b'I am the warcprox test payload! dddddddddd!\n' + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + # test playback - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'c!' assert response.content == b'I am the warcprox test payload! dddddddddd!\n' # test dedup of same http url with same payload def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'http://localhost:{}/e/f'.format(http_daemon.server_port) # ensure playback fails before archiving @@ -490,8 +506,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( - b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup is None # archive @@ -500,21 +516,17 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # test playback - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) - # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') @@ -525,7 +537,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # need revisit to have a later timestamp than original, else playing # back the latest record might not hit the revisit - time.sleep(1.5) + time.sleep(1.1) # fetch & archive revisit response = requests.get(url, proxies=archiving_proxies, verify=False) @@ -533,14 +545,11 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id @@ -548,7 +557,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # test playback logging.debug('testing playback of revisit of {}'.format(url)) - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' @@ -556,6 +565,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # test dedup of same https url with same payload def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'https://localhost:{}/g/h'.format(https_daemon.server_port) # ensure playback fails before archiving @@ -564,7 +575,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup is None @@ -574,21 +585,18 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + # test playback - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) - # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') @@ -599,7 +607,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # need revisit to have a later timestamp than original, else playing # back the latest record might not hit the revisit - time.sleep(1.5) + time.sleep(1.1) # fetch & archive revisit response = requests.get(url, proxies=archiving_proxies, verify=False) @@ -607,14 +615,11 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id @@ -622,13 +627,15 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # test playback logging.debug('testing playback of revisit of {}'.format(url)) - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' # XXX how to check dedup was used? def test_limits(http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket/total/urls":10}} headers = {"Warcprox-Meta": json.dumps(request_meta)} @@ -638,11 +645,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.headers['warcprox-test-header'] == 'i!' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) for i in range(9): response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) @@ -650,11 +654,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.headers['warcprox-test-header'] == 'i!' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(2.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 10) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 @@ -665,6 +666,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n" def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) request_meta = {"accept": ["capture-metadata"]} headers = {"Warcprox-Meta": json.dumps(request_meta)} @@ -680,7 +683,12 @@ def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): except ValueError: pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) + # wait for postfetch chain (or subsequent test could fail) + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) @@ -691,15 +699,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert response.headers['warcprox-test-header'] == 'k!' assert response.content == b'I am the warcprox test payload! llllllllll!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check url1 in dedup db bucket_a - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a') + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") + assert dedup_lookup assert dedup_lookup['url'] == url1.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -707,7 +714,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup is None @@ -718,14 +725,11 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert response.headers['warcprox-test-header'] == 'k!' assert response.content == b'I am the warcprox test payload! llllllllll!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) # check url2 in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) @@ -740,11 +744,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert response.headers['warcprox-test-header'] == 'k!' assert response.content == b'I am the warcprox test payload! llllllllll!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} @@ -753,17 +754,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert response.headers['warcprox-test-header'] == 'k!' assert response.content == b'I am the warcprox test payload! llllllllll!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) # close the warc - assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] + assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer() + warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -821,6 +819,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, fh.close() def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + rules = [ { "domain": "localhost", @@ -857,6 +857,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 200 + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + # blocked by SURT_MATCH url = 'http://localhost:{}/fuh/guh'.format(http_daemon.server_port) response = requests.get( @@ -872,6 +875,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): # 404 because server set up at the top of this file doesn't handle this url assert response.status_code == 404 + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + # not blocked because surt scheme does not match (differs from heritrix # behavior where https urls are coerced to http surt form) url = 'https://localhost:{}/fuh/guh'.format(https_daemon.server_port) @@ -880,6 +886,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): verify=False) assert response.status_code == 200 + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) + # blocked by blanket domain block url = 'http://bad.domain.com/' response = requests.get( @@ -932,6 +941,8 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): def test_domain_doc_soft_limit( http_daemon, https_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + request_meta = { "stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]}, "soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10}, @@ -946,11 +957,8 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # make sure stats from different domain don't count url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port) @@ -961,15 +969,10 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11) # (2) same host but different scheme and port: domain limit applies - # url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True, @@ -988,12 +991,12 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + time.sleep(3) + logging.info( + 'warcprox_.proxy.running_stats.urls - urls_before = %s', + warcprox_.proxy.running_stats.urls - urls_before) + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 19) # (10) response = requests.get( @@ -1003,12 +1006,8 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) # (11) back to http, and this is the 11th request url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port) @@ -1030,6 +1029,9 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21) + # https also blocked url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1056,6 +1058,8 @@ def test_domain_doc_soft_limit( def test_domain_data_soft_limit( http_daemon, https_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + # using idn request_meta = { "stats": {"buckets": [{"bucket":"test_domain_data_limit_bucket","tally-domains":['ÞzZ.LOCALhost']}]}, @@ -1071,12 +1075,8 @@ def test_domain_data_soft_limit( assert response.headers['warcprox-test-header'] == 'y!' assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # duplicate, does not count toward limit url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port) @@ -1087,12 +1087,8 @@ def test_domain_data_soft_limit( assert response.headers['warcprox-test-header'] == 'y!' assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) # novel, pushes stats over the limit url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port) @@ -1103,12 +1099,8 @@ def test_domain_data_soft_limit( assert response.headers['warcprox-test-header'] == 'z!' assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n' - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) - time.sleep(2.0) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # make sure limit doesn't get applied to a different host url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port) @@ -1118,6 +1110,9 @@ def test_domain_data_soft_limit( assert response.headers['warcprox-test-header'] == 'z!' assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n' + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) + # blocked because we're over the limit now url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port) response = requests.get( @@ -1149,7 +1144,9 @@ def test_domain_data_soft_limit( # connection to the internet, and relies on a third party site (facebook) being # up and behaving a certain way @pytest.mark.xfail -def test_tor_onion(archiving_proxies): +def test_tor_onion(archiving_proxies, warcprox_): + urls_before = warcprox_.proxy.running_stats.urls + response = requests.get('http://www.facebookcorewwwi.onion/', proxies=archiving_proxies, verify=False, allow_redirects=False) assert response.status_code == 302 @@ -1158,7 +1155,12 @@ def test_tor_onion(archiving_proxies): proxies=archiving_proxies, verify=False, allow_redirects=False) assert response.status_code == 200 -def test_missing_content_length(archiving_proxies, http_daemon, https_daemon): + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + +def test_missing_content_length(archiving_proxies, http_daemon, https_daemon, warcprox_): + urls_before = warcprox_.proxy.running_stats.urls + # double-check that our test http server is responding as expected url = 'http://localhost:%s/missing-content-length' % ( http_daemon.server_port) @@ -1195,8 +1197,14 @@ def test_missing_content_length(archiving_proxies, http_daemon, https_daemon): b'This response is missing a Content-Length http header.') assert not 'content-length' in response.headers + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + def test_method_filter( - https_daemon, http_daemon, archiving_proxies, playback_proxies): + warcprox_, https_daemon, http_daemon, archiving_proxies, + playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + # we've configured warcprox with method_filters=['GET','POST'] so HEAD # requests should not be archived @@ -1207,7 +1215,10 @@ def test_method_filter( assert response.headers['warcprox-test-header'] == 'z!' assert response.content == b'' - response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 404 assert response.content == b'404 Not in Archive\n' @@ -1224,13 +1235,17 @@ def test_method_filter( headers=headers, proxies=archiving_proxies) assert response.status_code == 204 - response = _poll_playback_until( - playback_proxies, url, status=200, timeout_sec=10) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + + response = requests.get(url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert response.content == payload def test_dedup_ok_flag( https_daemon, http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + if not warcprox_.options.rethinkdb_big_table: # this feature is n/a unless using rethinkdb big table return @@ -1238,7 +1253,7 @@ def test_dedup_ok_flag( url = 'http://localhost:{}/z/b'.format(http_daemon.server_port) # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1252,13 +1267,11 @@ def test_dedup_ok_flag( assert response.headers['warcprox-test-header'] == 'z!' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that dedup db doesn't give us anything for this - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1273,19 +1286,17 @@ def test_dedup_ok_flag( assert response.headers['warcprox-test-header'] == 'z!' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) # check that dedup db gives us something for this - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup # inspect what's in rethinkdb more closely - rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db + rethink_captures = warcprox_.dedup_db.captures_db results_iter = rethink_captures.rr.table(rethink_captures.table).get_all( ['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response', 'test_dedup_ok_flag'], index='sha1_warc_type').order_by( @@ -1310,7 +1321,8 @@ def test_status_api(warcprox_): 'role', 'version', 'host', 'address', 'port', 'pid', 'load', 'queued_urls', 'queue_max_size', 'seconds_behind', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', - 'active_requests',} + 'active_requests','start_time','urls_processed', + 'warc_bytes_written','postfetch_chain',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port @@ -1331,7 +1343,8 @@ def test_svcreg_status(warcprox_): 'queued_urls', 'queue_max_size', 'seconds_behind', 'first_heartbeat', 'ttl', 'last_heartbeat', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', - 'rates_15min', 'active_requests',} + 'rates_15min', 'active_requests','start_time','urls_processed', + 'warc_bytes_written','postfetch_chain',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port @@ -1366,32 +1379,51 @@ def test_controller_with_defaults(): assert controller.proxy.server_address == ('127.0.0.1', 8000) assert controller.proxy.server_port == 8000 assert controller.proxy.running_stats - for wwt in controller.warc_writer_threads: - assert wwt - assert wwt.recorded_url_q - assert wwt.recorded_url_q is controller.proxy.recorded_url_q - assert wwt.writer_pool - assert wwt.writer_pool.default_warc_writer - assert wwt.writer_pool.default_warc_writer.directory == './warcs' - assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None - assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' - assert wwt.writer_pool.default_warc_writer.gzip is False - assert wwt.writer_pool.default_warc_writer.record_builder - assert not wwt.writer_pool.default_warc_writer.record_builder.base32 - assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + assert not controller.proxy.stats_db + wwt = controller.warc_writer_thread + assert wwt + assert wwt.inq + assert wwt.outq + assert wwt.writer_pool + assert wwt.writer_pool.default_warc_writer + assert wwt.writer_pool.default_warc_writer.directory == './warcs' + assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None + assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 + assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' + assert wwt.writer_pool.default_warc_writer.gzip is False + assert wwt.writer_pool.default_warc_writer.record_builder + assert not wwt.writer_pool.default_warc_writer.record_builder.base32 + assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + +def test_load_plugin(): + options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats']) + controller = warcprox.controller.WarcproxController(options) + assert isinstance( + controller._postfetch_chain[-1], + warcprox.ListenerPostfetchProcessor) + assert isinstance( + controller._postfetch_chain[-1].listener, + warcprox.stats.RunningStats) + assert isinstance( + controller._postfetch_chain[-2], + warcprox.ListenerPostfetchProcessor) + assert isinstance( + controller._postfetch_chain[-2].listener, + warcprox.stats.RunningStats) def test_choose_a_port_for_me(warcprox_): options = warcprox.Options() options.port = 0 - controller = warcprox.controller.WarcproxController( - service_registry=warcprox_.service_registry, options=options) + if warcprox_.service_registry: + options.rethinkdb_services_url = 'rethinkdb://localhost/test0/services' + controller = warcprox.controller.WarcproxController(options) assert controller.proxy.server_port != 0 assert controller.proxy.server_port != 8000 assert controller.proxy.server_address == ( '127.0.0.1', controller.proxy.server_port) th = threading.Thread(target=controller.run_until_shutdown) + controller.start() th.start() try: # check that the status api lists the correct port @@ -1417,16 +1449,21 @@ def test_choose_a_port_for_me(warcprox_): th.join() def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'http://localhost:%s/a/z' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies) assert response.headers['via'] == '1.1 warcprox' - playback_response = _poll_playback_until( - playback_proxies, url, status=200, timeout_sec=10) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + playback_response = requests.get( + url, proxies=playback_proxies, verify=False) assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1449,15 +1486,19 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies): assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' def test_crawl_log(warcprox_, http_daemon, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + try: os.unlink(os.path.join(warcprox_.options.crawl_log_dir, 'crawl.log')) except: pass + # should go to default crawl log url = 'http://localhost:%s/b/aa' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies) assert response.status_code == 200 + # should go to test_crawl_log_1.log url = 'http://localhost:%s/b/bb' % http_daemon.server_port headers = { "Warcprox-Meta": json.dumps({"warc-prefix":"test_crawl_log_1"}), @@ -1466,13 +1507,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): response = requests.get(url, proxies=archiving_proxies, headers=headers) assert response.status_code == 200 - start = time.time() + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log') - while time.time() - start < 10: - if os.path.exists(file) and os.stat(file).st_size > 0: - break - time.sleep(0.5) assert os.path.exists(file) + assert os.stat(file).st_size > 0 assert os.path.exists(os.path.join( warcprox_.options.crawl_log_dir, 'crawl.log')) @@ -1527,13 +1567,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): response = requests.get(url, proxies=archiving_proxies, headers=headers) assert response.status_code == 200 - start = time.time() + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log') - while time.time() - start < 10: - if os.path.exists(file) and os.stat(file).st_size > 0: - break - time.sleep(0.5) assert os.path.exists(file) + assert os.stat(file).st_size > 0 crawl_log_2 = open(file, 'rb').read() @@ -1557,17 +1596,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert extra_info['contentSize'] == 145 # a request that is not saved to a warc (because of --method-filter) - # currently not logged at all (XXX maybe it should be) url = 'http://localhost:%s/b/cc' % http_daemon.server_port headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} response = requests.head(url, proxies=archiving_proxies, headers=headers) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log') - start = time.time() - while time.time() - start < 10: - if os.path.exists(file) and os.stat(file).st_size > 0: - break - time.sleep(0.5) assert os.path.exists(file) crawl_log_3 = open(file, 'rb').read() @@ -1602,13 +1638,10 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): headers=headers, proxies=archiving_proxies) assert response.status_code == 204 - start = time.time() - file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log') - while time.time() - start < 10: - if os.path.exists(file) and os.stat(file).st_size > 0: - break - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 5) + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log') assert os.path.exists(file) crawl_log_4 = open(file, 'rb').read() @@ -1633,6 +1666,8 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): def test_long_warcprox_meta( warcprox_, http_daemon, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + url = 'http://localhost:%s/b/g' % http_daemon.server_port # create a very long warcprox-meta header @@ -1642,17 +1677,14 @@ def test_long_warcprox_meta( url, proxies=archiving_proxies, headers=headers, verify=False) assert response.status_code == 200 - # wait for writer thread to process - time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): - time.sleep(0.5) - time.sleep(0.5) + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that warcprox-meta was parsed and honored ("warc-prefix" param) - assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] - writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -1672,7 +1704,6 @@ def test_long_warcprox_meta( def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): - url = 'http://localhost:%s/empty-response' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 diff --git a/tests/test_writer.py b/tests/test_writer.py index 61fe108..0a18c33 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -89,27 +89,19 @@ def wait(callback, timeout): raise Exception('timed out waiting for %s to return truthy' % callback) def test_special_dont_write_prefix(): - class NotifyMe: - def __init__(self): - self.the_list = [] - def notify(self, recorded_url, records): - self.the_list.append((recorded_url, records)) - with tempfile.TemporaryDirectory() as tmpdir: logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - q = warcprox.TimestampedQueue(maxsize=1) - listener = NotifyMe() - wwt = warcprox.writerthread.WarcWriterThread( - recorded_url_q=q, options=Options(prefix='-'), - listeners=[listener]) + wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-')) + wwt.inq = warcprox.TimestampedQueue(maxsize=1) + wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: wwt.start() # not to be written due to default prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + wwt.inq.put(RecordedUrl( url='http://example.com/no', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', @@ -118,30 +110,31 @@ def test_special_dont_write_prefix(): # to be written due to warcprox-meta prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + wwt.inq.put(RecordedUrl( url='http://example.com/yes', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', timestamp=datetime.utcnow(), payload_digest=recorder.block_digest, warcprox_meta={'warc-prefix': 'normal-warc-prefix'})) - wait(lambda: len(listener.the_list) == 2, 10.0) - assert not listener.the_list[0][1] - assert listener.the_list[1][1] + recorded_url = wwt.outq.get(timeout=10) + assert not recorded_url.warc_records + recorded_url = wwt.outq.get(timeout=10) + assert recorded_url.warc_records + assert wwt.outq.empty() finally: wwt.stop.set() wwt.join() - q = warcprox.TimestampedQueue(maxsize=1) - listener = NotifyMe() - wwt = warcprox.writerthread.WarcWriterThread( - recorded_url_q=q, listeners=[listener]) + wwt = warcprox.writerthread.WarcWriterThread() + wwt.inq = warcprox.TimestampedQueue(maxsize=1) + wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: wwt.start() # to be written due to default prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + wwt.inq.put(RecordedUrl( url='http://example.com/yes', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', @@ -150,16 +143,18 @@ def test_special_dont_write_prefix(): # not to be written due to warcprox-meta prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + wwt.inq.put(RecordedUrl( url='http://example.com/no', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', timestamp=datetime.utcnow(), payload_digest=recorder.block_digest, warcprox_meta={'warc-prefix': '-'})) - wait(lambda: len(listener.the_list) == 2, 10.0) - assert listener.the_list[0][1] - assert not listener.the_list[1][1] + recorded_url = wwt.outq.get(timeout=10) + assert recorded_url.warc_records + recorded_url = wwt.outq.get(timeout=10) + assert not recorded_url.warc_records + assert wwt.outq.empty() finally: wwt.stop.set() wwt.join() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 256ede9..39b31ea 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,7 +1,7 @@ """ warcprox/__init__.py - warcprox package main file, contains some utility code -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -19,6 +19,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ +import sys +import datetime +import threading +import time +import logging from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution __version__ = _get_distribution('warcprox').version @@ -26,7 +31,6 @@ try: import queue except ImportError: import Queue as queue -import datetime def digest_str(hash_obj, base32=False): import base64 @@ -92,9 +96,149 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) +class BasePostfetchProcessor(threading.Thread): + logger = logging.getLogger("warcprox.BasePostfetchProcessor") + + def __init__(self, options=Options()): + threading.Thread.__init__(self, name=self.__class__.__name__) + self.options = options + self.stop = threading.Event() + # these should be set before thread is started + self.inq = None + self.outq = None + self.profiler = None + + def run(self): + if self.options.profile: + import cProfile + self.profiler = cProfile.Profile() + self.profiler.enable() + self._run() + self.profiler.disable() + else: + self._run() + + def _get_process_put(self): + ''' + Get url(s) from `self.inq`, process url(s), queue to `self.outq`. + + Subclasses must implement this. Implementations may operate on + individual urls, or on batches. + + May raise queue.Empty. + ''' + raise Exception('not implemented') + + def _run(self): + logging.info('%s starting up', self) + self._startup() + while not self.stop.is_set(): + try: + while True: + try: + self._get_process_put() + except queue.Empty: + if self.stop.is_set(): + break + logging.info('%s shutting down', self) + self._shutdown() + except Exception as e: + if isinstance(e, OSError) and e.errno == 28: + # OSError: [Errno 28] No space left on device + self.logger.critical( + 'shutting down due to fatal problem: %s: %s', + e.__class__.__name__, e) + self._shutdown() + sys.exit(1) + + self.logger.critical( + '%s will try to continue after unexpected error', + self.name, exc_info=True) + time.sleep(0.5) + + def _startup(self): + pass + + def _shutdown(self): + pass + +class BaseStandardPostfetchProcessor(BasePostfetchProcessor): + def _get_process_put(self): + recorded_url = self.inq.get(block=True, timeout=0.5) + self._process_url(recorded_url) + if self.outq: + self.outq.put(recorded_url) + + def _process_url(self, recorded_url): + raise Exception('not implemented') + +class BaseBatchPostfetchProcessor(BasePostfetchProcessor): + MAX_BATCH_SIZE = 500 + MAX_BATCH_SEC = 10 + MIN_BATCH_SEC = 2.0 + + def _get_process_put(self): + batch = [] + start = time.time() + + while True: + try: + batch.append(self.inq.get(block=True, timeout=0.5)) + except queue.Empty: + if self.stop.is_set(): + break + # else maybe keep adding to the batch + + if len(batch) >= self.MAX_BATCH_SIZE: + break # full batch + + elapsed = time.time() - start + if elapsed >= self.MAX_BATCH_SEC: + break # been batching for a while + + if (elapsed >= self.MIN_BATCH_SEC and self.outq + and len(self.outq.queue) == 0): + break # next processor is waiting on us + + if not batch: + raise queue.Empty + + self.logger.info( + 'gathered batch of %s in %0.2f sec', + len(batch), time.time() - start) + self._process_batch(batch) + + if self.outq: + for recorded_url in batch: + self.outq.put(recorded_url) + + def _process_batch(self, batch): + raise Exception('not implemented') + +class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): + def __init__(self, listener, options=Options()): + BaseStandardPostfetchProcessor.__init__(self, options) + self.listener = listener + self.name = listener.__class__.__name__ + + def _process_url(self, recorded_url): + return self.listener.notify(recorded_url, recorded_url.warc_records) + + def start(self): + if hasattr(self.listener, 'start'): + self.listener.start() + BaseStandardPostfetchProcessor.start(self) + + def _shutdown(self): + if hasattr(self.listener, 'stop'): + try: + self.listener.stop() + except: + self.logger.error( + '%s raised exception', listener.stop, exc_info=True) + # monkey-patch log levels TRACE and NOTICE TRACE = 5 -import logging def _logger_trace(self, msg, *args, **kwargs): if self.isEnabledFor(TRACE): self._log(TRACE, msg, args, **kwargs) @@ -103,7 +247,6 @@ logging.trace = logging.root.trace logging.addLevelName(TRACE, 'TRACE') NOTICE = (logging.INFO + logging.WARN) // 2 -import logging def _logger_notice(self, msg, *args, **kwargs): if self.isEnabledFor(NOTICE): self._log(NOTICE, msg, args, **kwargs) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 79d6240..e6674a6 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -215,7 +215,7 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup: +class RethinkCapturesDedup(warcprox.dedup.DedupDb): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): diff --git a/warcprox/controller.py b/warcprox/controller.py index 0bf8a4f..dfd930b 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for sending heartbeats to the service registry if configured to do so; also has some memory profiling capabilities -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -27,55 +27,190 @@ from __future__ import absolute_import import logging import threading import time -import warcprox import sys import gc import datetime +import warcprox +import certauth +import functools +import doublethink +import importlib + +class Factory: + @staticmethod + def dedup_db(options): + if options.rethinkdb_dedup_url: + dedup_db = warcprox.dedup.RethinkDedupDb(options=options) + elif options.rethinkdb_big_table_url: + dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) + elif options.rethinkdb_trough_db_url: + dedup_db = warcprox.dedup.TroughDedupDb(options) + elif options.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup( + cdx_url=options.cdxserver_dedup) + elif options.dedup_db_file in (None, '', '/dev/null'): + logging.info('deduplication disabled') + dedup_db = None + else: + dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options) + return dedup_db + + @staticmethod + def stats_processor(options): + # return warcprox.stats.StatsProcessor(options) + if options.rethinkdb_stats_url: + stats_processor = warcprox.stats.RethinkStatsProcessor(options) + elif options.stats_db_file in (None, '', '/dev/null'): + logging.info('statistics tracking disabled') + stats_processor = None + else: + stats_processor = warcprox.stats.StatsProcessor(options) + return stats_processor + + @staticmethod + def warc_writer(options): + return warcprox.writerthread.WarcWriterThread(options) + + @staticmethod + def playback_proxy(ca, options): + if options.playback_port is not None: + playback_index_db = warcprox.playback.PlaybackIndexDb( + options=options) + playback_proxy = warcprox.playback.PlaybackProxy( + ca=ca, playback_index_db=playback_index_db, options=options) + else: + playback_index_db = None + playback_proxy = None + return playback_proxy + + @staticmethod + def crawl_logger(options): + if options.crawl_log_dir: + return warcprox.crawl_log.CrawlLogger( + options.crawl_log_dir, options=options) + else: + return None + + @staticmethod + def plugin(qualname): + try: + (module_name, class_name) = qualname.rsplit('.', 1) + module_ = importlib.import_module(module_name) + class_ = getattr(module_, class_name) + plugin = class_() + plugin.notify # make sure it has this method + return plugin + except Exception as e: + logging.fatal('problem with plugin class %r: %s', qualname, e) + sys.exit(1) + + @staticmethod + def service_registry(options): + if options.rethinkdb_services_url: + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_services_url) + rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + return doublethink.ServiceRegistry(rr, table=parsed.table) + else: + return None class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") HEARTBEAT_INTERVAL = 20.0 - def __init__( - self, proxy=None, warc_writer_threads=None, playback_proxy=None, - service_registry=None, options=warcprox.Options()): + def __init__(self, options=warcprox.Options()): """ - Create warcprox controller. - - If supplied, `proxy` should be an instance of WarcProxy, and - `warc_writer_threads` should be a list of WarcWriterThread instances. - If not supplied, they are created with default values. - - If supplied, playback_proxy should be an instance of PlaybackProxy. If - not supplied, no playback proxy will run. + Create warcprox controller based on `options`. """ - if proxy is not None: - self.proxy = proxy - else: - self.proxy = warcprox.warcproxy.WarcProxy(options=options) - - if warc_writer_threads is not None: - self.warc_writer_threads = warc_writer_threads - else: - self.warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - name='WarcWriterThread%03d' % i, - recorded_url_q=self.proxy.recorded_url_q, - listeners=[self.proxy.running_stats], options=options) - for i in range(int(self.proxy.max_threads ** 0.5))] + self.options = options self.proxy_thread = None self.playback_proxy_thread = None - self.playback_proxy = playback_proxy - self.service_registry = service_registry - self.options = options - self._last_rss = None - self.stop = threading.Event() self._start_stop_lock = threading.Lock() + self.stats_processor = Factory.stats_processor(self.options) + + self.proxy = warcprox.warcproxy.WarcProxy( + self.stats_processor, self.postfetch_status, options) + self.playback_proxy = Factory.playback_proxy( + self.proxy.ca, self.options) + + self.build_postfetch_chain(self.proxy.recorded_url_q) + + self.service_registry = Factory.service_registry(options) + + def postfetch_status(self): + result = {'postfetch_chain': []} + for processor in self._postfetch_chain: + if processor.__class__ == warcprox.ListenerPostfetchProcessor: + name = processor.listener.__class__.__name__ + else: + name = processor.__class__.__name__ + + queued = len(processor.inq.queue) + if hasattr(processor, 'batch'): + queued += len(processor.batch) + + result['postfetch_chain'].append({ + 'processor': name, + 'queued_urls': len(processor.inq.queue)}) + return result + + def chain(self, processor0, processor1): + ''' + Sets `processor0.outq` = `processor1.inq` = `queue.Queue()` + ''' + assert not processor0.outq + assert not processor1.inq + q = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + processor0.outq = q + processor1.inq = q + + def build_postfetch_chain(self, inq): + self._postfetch_chain = [] + + self.dedup_db = Factory.dedup_db(self.options) + + if self.dedup_db: + self._postfetch_chain.append(self.dedup_db.loader()) + + self.warc_writer_thread = Factory.warc_writer(self.options) + self._postfetch_chain.append(self.warc_writer_thread) + + if self.dedup_db: + self._postfetch_chain.append(self.dedup_db.storer()) + + if self.stats_processor: + self._postfetch_chain.append(self.stats_processor) + + if self.playback_proxy: + self._postfetch_chain.append( + warcprox.ListenerPostfetchProcessor( + self.playback_proxy.playback_index_db, self.options)) + + crawl_logger = Factory.crawl_logger(self.options) + if crawl_logger: + self._postfetch_chain.append( + warcprox.ListenerPostfetchProcessor( + crawl_logger, self.options)) + + self._postfetch_chain.append( + warcprox.ListenerPostfetchProcessor( + self.proxy.running_stats, self.options)) + + for qualname in self.options.plugins or []: + plugin = Factory.plugin(qualname) + self._postfetch_chain.append( + warcprox.ListenerPostfetchProcessor(plugin, self.options)) + + # chain them all up + self._postfetch_chain[0].inq = inq + for i in range(1, len(self._postfetch_chain)): + self.chain(self._postfetch_chain[i-1], self._postfetch_chain[i]) + def debug_mem(self): self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) with open("/proc/self/status") as f: @@ -162,35 +297,27 @@ class WarcproxController(object): self.logger.info('warcprox is already running') return - if self.proxy.stats_db: - self.proxy.stats_db.start() self.proxy_thread = threading.Thread( target=self.proxy.serve_forever, name='ProxyThread') self.proxy_thread.start() - assert(all( - wwt.dedup_db is self.warc_writer_threads[0].dedup_db - for wwt in self.warc_writer_threads)) - if any((t.dedup_db for t in self.warc_writer_threads)): - self.warc_writer_threads[0].dedup_db.start() - - for wwt in self.warc_writer_threads: - wwt.start() - - if self.playback_proxy is not None: + if self.playback_proxy: self.playback_proxy_thread = threading.Thread( - target=self.playback_proxy.serve_forever, - name='PlaybackProxyThread') + target=self.playback_proxy.serve_forever, + name='PlaybackProxyThread') self.playback_proxy_thread.start() + for processor in self._postfetch_chain: + processor.start() + def shutdown(self): with self._start_stop_lock: if not self.proxy_thread or not self.proxy_thread.is_alive(): self.logger.info('warcprox is not running') return - for wwt in self.warc_writer_threads: - wwt.stop.set() + for processor in self._postfetch_chain: + processor.stop.set() self.proxy.shutdown() self.proxy.server_close() @@ -200,12 +327,8 @@ class WarcproxController(object): if self.playback_proxy.playback_index_db is not None: self.playback_proxy.playback_index_db.close() - # wait for threads to finish - for wwt in self.warc_writer_threads: - wwt.join() - - if self.proxy.stats_db: - self.proxy.stats_db.stop() + for processor in self._postfetch_chain: + processor.join() self.proxy_thread.join() if self.playback_proxy is not None: @@ -288,18 +411,17 @@ class WarcproxController(object): 'aggregate performance profile of %s proxy threads:\n%s', len(files), buf.getvalue()) - # warc writer threads - files = [] - for wwt in self.warc_writer_threads: - file = os.path.join(tmpdir, '%s.dat' % wwt.ident) - wwt.profiler.dump_stats(file) - files.append(file) - - buf = io.StringIO() - stats = pstats.Stats(*files, stream=buf) - stats.sort_stats('cumulative') - stats.print_stats(0.1) - self.logger.notice( - 'aggregate performance profile of %s warc writer threads:\n%s', - len(self.warc_writer_threads), buf.getvalue()) - + # postfetch processors + for processor in self._postfetch_chain: + if not processor.profiler: + self.logger.notice('%s has no profiling data', processor) + continue + file = os.path.join(tmpdir, '%s.dat' % processor.ident) + processor.profiler.dump_stats(file) + buf = io.StringIO() + stats = pstats.Stats(file, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'performance profile of %s:\n%s', processor, + buf.getvalue()) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 45b8142..950c110 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -32,9 +32,19 @@ import doublethink import datetime import urllib3 from urllib3.exceptions import HTTPError +import collections urllib3.disable_warnings() +class DedupLoader(warcprox.BaseStandardPostfetchProcessor): + def __init__(self, dedup_db, options=warcprox.Options()): + warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + self.dedup_db = dedup_db + + def _process_url(self, recorded_url): + decorate_with_dedup_info( + self.dedup_db, recorded_url, self.options.base32) + class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -61,6 +71,12 @@ class DedupDb(object): conn.commit() conn.close() + def loader(self, *args, **kwargs): + return DedupLoader(self, self.options) + + def storer(self, *args, **kwargs): + return warcprox.ListenerPostfetchProcessor(self, self.options) + def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') @@ -106,20 +122,20 @@ class DedupDb(object): else: self.save(digest_key, records[0]) - def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if (recorded_url.response_recorder and recorded_url.payload_digest and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) + recorded_url.dedup_info = dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, - url=recorded_url.url) + recorded_url.dedup_info = dedup_db.lookup( + digest_key, url=recorded_url.url) -class RethinkDedupDb: +class RethinkDedupDb(DedupDb): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): @@ -181,7 +197,7 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) -class CdxServerDedup(object): +class CdxServerDedup(DedupDb): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") @@ -244,7 +260,82 @@ class CdxServerDedup(object): """ pass -class TroughDedupDb(object): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): + def __init__(self, trough_dedup_db, options=warcprox.Options()): + warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + self.trough_dedup_db = trough_dedup_db + + def _filter_and_bucketize(self, batch): + ''' + Returns `{bucket: [recorded_url, ...]}`, excluding urls that should + have dedup info stored. + ''' + buckets = collections.defaultdict(list) + for recorded_url in batch: + if (recorded_url.warc_records + and recorded_url.warc_records[0].type == b'response' + and recorded_url.response_recorder.payload_size() > 0): + if (recorded_url.warcprox_meta + and 'captures-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['captures-bucket'] + else: + bucket = '__unspecified__' + buckets[bucket].append(recorded_url) + return buckets + + def _process_batch(self, batch): + buckets = self._filter_and_bucketize(batch) + for bucket in buckets: + self.trough_dedup_db.batch_save(buckets[bucket], bucket) + +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): + def __init__(self, trough_dedup_db, options=warcprox.Options()): + warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + self.trough_dedup_db = trough_dedup_db + + def _startup(self): + self.trough_dedup_db.start() + + def _filter_and_bucketize(self, batch): + ''' + Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not + be looked up. + ''' + buckets = collections.defaultdict(list) + for recorded_url in batch: + if (recorded_url.response_recorder + and recorded_url.payload_digest + and recorded_url.response_recorder.payload_size() > 0): + if (recorded_url.warcprox_meta + and 'captures-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['captures-bucket'] + else: + bucket = '__unspecified__' + buckets[bucket].append(recorded_url) + return buckets + + def _build_key_index(self, batch): + ''' + Returns `{digest_key: [recorded_url, ...]}`. + ''' + key_index = collections.defaultdict(list) + for recorded_url in batch: + digest_key = warcprox.digest_str( + recorded_url.payload_digest, self.options.base32) + key_index[digest_key].append(recorded_url) + return key_index + + def _process_batch(self, batch): + buckets = self._filter_and_bucketize(batch) + for bucket in buckets: + key_index = self._build_key_index(buckets[bucket]) + results = self.trough_dedup_db.batch_lookup( + key_index.keys(), bucket) + for result in results: + for recorded_url in key_index[result['digest_key']]: + recorded_url.dedup_info = result + +class TroughDedupDb(DedupDb): ''' https://github.com/internetarchive/trough ''' @@ -256,7 +347,8 @@ class TroughDedupDb(object): ' url varchar(2100) not null,\n' ' date datetime not null,\n' ' id varchar(100));\n') # warc record id - WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' + WRITE_SQL_TMPL = ('insert or ignore into dedup\n' + '(digest_key, url, date, id)\n' 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): @@ -264,6 +356,12 @@ class TroughDedupDb(object): self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) + def loader(self, *args, **kwargs): + return BatchTroughLoader(self, self.options) + + def storer(self, *args, **kwargs): + return BatchTroughStorer(self, self.options) + def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) @@ -275,6 +373,21 @@ class TroughDedupDb(object): bucket, self.WRITE_SQL_TMPL, (digest_key, url, warc_date, record_id), self.SCHEMA_ID) + def batch_save(self, batch, bucket='__unspecified__'): + sql_tmpl = ('insert or ignore into dedup\n' + '(digest_key, url, date, id)\n' + 'values %s;' % ','.join( + '(%s,%s,%s,%s)' for i in range(len(batch)))) + values = [] + for recorded_url in batch: + values.extend([ + warcprox.digest_str( + recorded_url.payload_digest, self.options.base32), + recorded_url.url, + recorded_url.warc_records[0].date, + recorded_url.warc_records[0].id,]) + self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID) + def lookup(self, digest_key, bucket='__unspecified__', url=None): results = self._trough_cli.read( bucket, 'select * from dedup where digest_key=%s;', @@ -291,6 +404,23 @@ class TroughDedupDb(object): else: return None + def batch_lookup(self, digest_keys, bucket='__unspecified__'): + sql_tmpl = 'select * from dedup where digest_key in (%s)' % ( + ','.join('%s' for i in range(len(digest_keys)))) + results = self._trough_cli.read(bucket, sql_tmpl, digest_keys) + if results is None: + return [] + self.logger.debug( + 'trough batch lookup of %s keys returned %s results', + len(digest_keys), len(results)) + assert len(results) >= 0 and len(results) <= len(digest_keys) + for result in results: + result['id'] = result['id'].encode('ascii') + result['url'] = result['url'].encode('ascii') + result['date'] = result['date'].encode('ascii') + result['digest_key'] = result['digest_key'].encode('ascii') + return results + def notify(self, recorded_url, records): if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): diff --git a/warcprox/main.py b/warcprox/main.py index 1369204..59e4b4a 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -42,7 +42,6 @@ import certauth.certauth import warcprox import doublethink import cryptography.hazmat.backends.openssl -import importlib class BetterArgumentDefaultsHelpFormatter( argparse.ArgumentDefaultsHelpFormatter, @@ -61,7 +60,7 @@ class BetterArgumentDefaultsHelpFormatter( else: return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) -def _build_arg_parser(prog): +def _build_arg_parser(prog='warcprox'): arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=BetterArgumentDefaultsHelpFormatter) @@ -119,9 +118,9 @@ def _build_arg_parser(prog): arg_parser.add_argument('-P', '--playback-port', dest='playback_port', type=int, default=None, help='port to listen on for instant playback') - arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', - default='./warcprox-playback-index.db', - help='playback index database file (only used if --playback-port is specified)') + # arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + # default='./warcprox-playback-index.db', + # help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') @@ -201,12 +200,12 @@ def dump_state(signum=None, frame=None): 'dumping state (caught signal %s)\n%s', signum, '\n'.join(state_strs)) -def init_controller(args): +def parse_args(argv): ''' - Creates a warcprox.controller.WarcproxController configured according to - the supplied arguments (normally the result of parse_args(sys.argv)). + Parses command line arguments with argparse. ''' - options = warcprox.Options(**vars(args)) + arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) + args = arg_parser.parse_args(args=argv[1:]) try: hashlib.new(args.digest_algorithm) @@ -214,106 +213,6 @@ def init_controller(args): logging.fatal(e) exit(1) - listeners = [] - - if args.rethinkdb_dedup_url: - dedup_db = warcprox.dedup.RethinkDedupDb(options=options) - elif args.rethinkdb_big_table_url: - dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) - elif args.rethinkdb_trough_db_url: - dedup_db = warcprox.dedup.TroughDedupDb(options) - elif args.cdxserver_dedup: - cdxserver_maxsize = args.writer_threads or 200 - dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup, - maxsize=cdxserver_maxsize) - elif args.dedup_db_file in (None, '', '/dev/null'): - logging.info('deduplication disabled') - dedup_db = None - else: - dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) - if dedup_db: - listeners.append(dedup_db) - - if args.rethinkdb_stats_url: - stats_db = warcprox.stats.RethinkStatsDb(options=options) - listeners.append(stats_db) - elif args.stats_db_file in (None, '', '/dev/null'): - logging.info('statistics tracking disabled') - stats_db = None - else: - stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) - listeners.append(stats_db) - - recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size) - - ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, - ca_name=ca_name) - - proxy = warcprox.warcproxy.WarcProxy( - ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db, - options=options) - listeners.append(proxy.running_stats) - - if args.playback_port is not None: - playback_index_db = warcprox.playback.PlaybackIndexDb( - args.playback_index_db_file, options=options) - playback_proxy = warcprox.playback.PlaybackProxy( - ca=ca, playback_index_db=playback_index_db, options=options) - listeners.append(playback_index_db) - else: - playback_index_db = None - playback_proxy = None - - if args.crawl_log_dir: - listeners.append(warcprox.crawl_log.CrawlLogger( - args.crawl_log_dir, options=options)) - - for qualname in args.plugins or []: - try: - (module_name, class_name) = qualname.rsplit('.', 1) - module_ = importlib.import_module(module_name) - class_ = getattr(module_, class_name) - listener = class_() - listener.notify # make sure it has this method - listeners.append(listener) - except Exception as e: - logging.fatal('problem with plugin class %r: %s', qualname, e) - sys.exit(1) - - writer_pool = warcprox.writer.WarcWriterPool(options=options) - # number of warc writer threads = sqrt(proxy.max_threads) - # I came up with this out of thin air because it strikes me as reasonable - # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 - num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5) - logging.debug('initializing %d warc writer threads', num_writer_threads) - warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, - writer_pool=writer_pool, dedup_db=dedup_db, - listeners=listeners, options=options) - for i in range(num_writer_threads)] - - if args.rethinkdb_services_url: - parsed = doublethink.parse_rethinkdb_url( - options.rethinkdb_services_url) - rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) - svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) - else: - svcreg = None - - controller = warcprox.controller.WarcproxController( - proxy, warc_writer_threads, playback_proxy, - service_registry=svcreg, options=options) - - return controller - -def parse_args(argv): - ''' - Parses command line arguments with argparse. - ''' - arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) - args = arg_parser.parse_args(args=argv[1:]) return args def main(argv=None): @@ -339,7 +238,8 @@ def main(argv=None): # see https://github.com/pyca/cryptography/issues/2911 cryptography.hazmat.backends.openssl.backend.activate_builtin_random() - controller = init_controller(args) + options = warcprox.Options(**vars(args)) + controller = warcprox.controller.WarcproxController(options) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) @@ -410,7 +310,8 @@ def ensure_rethinkdb_tables(argv=None): svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) did_something = True if args.rethinkdb_stats_url: - stats_db = warcprox.stats.RethinkStatsDb(options=options) + stats_db = warcprox.stats.RethinkStatsProcessor(options=options) + stats_db._ensure_db_table() did_something = True if args.rethinkdb_dedup_url: dedup_db = warcprox.dedup.RethinkDedupDb(options=options) @@ -421,7 +322,7 @@ def ensure_rethinkdb_tables(argv=None): if args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) logging.warn( - 'trough it responsible for creating most of the rethinkdb ' + 'trough is responsible for creating most of the rethinkdb ' 'tables that it uses') did_something = True diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index dd08bf7..7792c5c 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -62,6 +62,8 @@ except ImportError: import concurrent.futures import urlcanon import time +import collections +import cProfile class ProxyingRecorder(object): """ @@ -562,9 +564,14 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): # See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html request_queue_size = 4096 - def __init__(self, max_threads, options=warcprox.Options()): - PooledMixIn.__init__(self, max_threads) - self.profilers = {} + def __init__(self, options=warcprox.Options()): + if options.max_threads: + self.logger.info( + 'max_threads=%s set by command line option', + options.max_threads) + + PooledMixIn.__init__(self, options.max_threads) + self.profilers = collections.defaultdict(cProfile.Profile) if options.profile: self.process_request_thread = self._profile_process_request_thread @@ -572,9 +579,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): self.process_request_thread = self._process_request_thread def _profile_process_request_thread(self, request, client_address): - if not threading.current_thread().ident in self.profilers: - import cProfile - self.profilers[threading.current_thread().ident] = cProfile.Profile() profiler = self.profilers[threading.current_thread().ident] profiler.enable() self._process_request_thread(request, client_address) diff --git a/warcprox/playback.py b/warcprox/playback.py index 1a698c0..91f86aa 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -121,9 +121,6 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( self, headers, refers_to_target_uri, refers_to_date, payload_digest): - """Parameters: - - """ location = self.server.playback_index_db.lookup_exact( refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) @@ -133,11 +130,13 @@ class PlaybackProxyHandler(MitmProxyHandler): for (offset, record, errors) in fh.read_records(limit=1, offsets=True): pass + if not record: + raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename)) + if errors: raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) - warc_type = record.get_header(warctools.WarcRecord.TYPE) - if warc_type != warctools.WarcRecord.RESPONSE: + if record.type != warctools.WarcRecord.RESPONSE: raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type)) # find end of headers @@ -158,12 +157,13 @@ class PlaybackProxyHandler(MitmProxyHandler): for (offset, record, errors) in fh.read_records(limit=1, offsets=True): pass + if not record: + raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename)) + if errors: raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) - warc_type = record.get_header(warctools.WarcRecord.TYPE) - - if warc_type == warctools.WarcRecord.RESPONSE: + if record.type == warctools.WarcRecord.RESPONSE: headers_buf = bytearray() while True: line = record.content_file.readline() @@ -173,7 +173,7 @@ class PlaybackProxyHandler(MitmProxyHandler): return self._send_response(headers_buf, record.content_file) - elif warc_type == warctools.WarcRecord.REVISIT: + elif record.type == warctools.WarcRecord.REVISIT: # response consists of http headers from revisit record and # payload from the referenced record warc_profile = record.get_header(warctools.WarcRecord.PROFILE) diff --git a/warcprox/stats.py b/warcprox/stats.py index 6047443..db2493c 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -53,45 +53,88 @@ def _empty_bucket(bucket): }, } -class StatsDb: - logger = logging.getLogger("warcprox.stats.StatsDb") +class StatsProcessor(warcprox.BaseBatchPostfetchProcessor): + logger = logging.getLogger("warcprox.stats.StatsProcessor") - def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()): - self.file = file - self.options = options - self._lock = threading.RLock() + def _startup(self): + if os.path.exists(self.options.stats_db_file): + self.logger.info( + 'opening existing stats database %s', + self.options.stats_db_file) + else: + self.logger.info( + 'creating new stats database %s', + self.options.stats_db_file) - def start(self): - with self._lock: - if os.path.exists(self.file): - self.logger.info( - 'opening existing stats database %s', self.file) - else: - self.logger.info( - 'creating new stats database %s', self.file) + conn = sqlite3.connect(self.options.stats_db_file) + conn.execute( + 'create table if not exists buckets_of_stats (' + ' bucket varchar(300) primary key,' + ' stats varchar(4000)' + ');') + conn.commit() + conn.close() - conn = sqlite3.connect(self.file) + self.logger.info( + 'created table buckets_of_stats in %s', + self.options.stats_db_file) + + def _process_batch(self, batch): + batch_buckets = self._tally_batch(batch) + self._update_db(batch_buckets) + logging.trace('updated stats from batch of %s', len(batch)) + + def _update_db(self, batch_buckets): + conn = sqlite3.connect(self.options.stats_db_file) + for bucket in batch_buckets: + bucket_stats = batch_buckets[bucket] + + cursor = conn.execute( + 'select stats from buckets_of_stats where bucket=?', + (bucket,)) + result_tuple = cursor.fetchone() + cursor.close() + + if result_tuple: + old_bucket_stats = json.loads(result_tuple[0]) + + bucket_stats['total']['urls'] += old_bucket_stats['total']['urls'] + bucket_stats['total']['wire_bytes'] += old_bucket_stats['total']['wire_bytes'] + bucket_stats['revisit']['urls'] += old_bucket_stats['revisit']['urls'] + bucket_stats['revisit']['wire_bytes'] += old_bucket_stats['revisit']['wire_bytes'] + bucket_stats['new']['urls'] += old_bucket_stats['new']['urls'] + bucket_stats['new']['wire_bytes'] += old_bucket_stats['new']['wire_bytes'] + + json_value = json.dumps(bucket_stats, separators=(',',':')) conn.execute( - 'create table if not exists buckets_of_stats (' - ' bucket varchar(300) primary key,' - ' stats varchar(4000)' - ');') + 'insert or replace into buckets_of_stats ' + '(bucket, stats) values (?, ?)', (bucket, json_value)) conn.commit() - conn.close() + conn.close() - self.logger.info('created table buckets_of_stats in %s', self.file) + def _tally_batch(self, batch): + batch_buckets = {} + for recorded_url in batch: + for bucket in self.buckets(recorded_url): + bucket_stats = batch_buckets.get(bucket) + if not bucket_stats: + bucket_stats = _empty_bucket(bucket) + batch_buckets[bucket] = bucket_stats - def stop(self): - pass + bucket_stats["total"]["urls"] += 1 + bucket_stats["total"]["wire_bytes"] += recorded_url.size - def close(self): - pass - - def sync(self): - pass + if recorded_url.warc_records: + if recorded_url.warc_records[0].type == b'revisit': + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size + return batch_buckets def value(self, bucket0="__all__", bucket1=None, bucket2=None): - conn = sqlite3.connect(self.file) + conn = sqlite3.connect(self.options.stats_db_file) cursor = conn.execute( 'select stats from buckets_of_stats where bucket = ?', (bucket0,)) @@ -109,9 +152,6 @@ class StatsDb: else: return None - def notify(self, recorded_url, records): - self.tally(recorded_url, records) - def buckets(self, recorded_url): ''' Unravels bucket definitions in Warcprox-Meta header. Each bucket @@ -154,117 +194,20 @@ class StatsDb: return buckets - def tally(self, recorded_url, records): - with self._lock: - conn = sqlite3.connect(self.file) - - for bucket in self.buckets(recorded_url): - cursor = conn.execute( - 'select stats from buckets_of_stats where bucket=?', - (bucket,)) - - result_tuple = cursor.fetchone() - cursor.close() - if result_tuple: - bucket_stats = json.loads(result_tuple[0]) - else: - bucket_stats = _empty_bucket(bucket) - - bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += recorded_url.size - - if records: - if records[0].type == b'revisit': - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size - - json_value = json.dumps(bucket_stats, separators=(',',':')) - conn.execute( - 'insert or replace into buckets_of_stats ' - '(bucket, stats) values (?, ?)', (bucket, json_value)) - conn.commit() - - conn.close() - -class RethinkStatsDb(StatsDb): - """Updates database in batch every 2.0 seconds""" - logger = logging.getLogger("warcprox.stats.RethinkStatsDb") +class RethinkStatsProcessor(StatsProcessor): + logger = logging.getLogger("warcprox.stats.RethinkStatsProcessor") def __init__(self, options=warcprox.Options()): + StatsProcessor.__init__(self, options) + parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.table = parsed.table self.replicas = min(3, len(self.rr.servers)) + + def _startup(self): self._ensure_db_table() - self.options = options - - self._stop = threading.Event() - self._batch_lock = threading.RLock() - with self._batch_lock: - self._batch = {} - self._timer = None - - def start(self): - """Starts batch update repeating timer.""" - self._update_batch() # starts repeating timer - - def _bucket_batch_update_reql(self, bucket, batch): - return self.rr.table(self.table).get(bucket).replace( - lambda old: r.branch( - old.eq(None), batch[bucket], old.merge({ - "total": { - "urls": old["total"]["urls"].add( - batch[bucket]["total"]["urls"]), - "wire_bytes": old["total"]["wire_bytes"].add( - batch[bucket]["total"]["wire_bytes"]), - }, - "new": { - "urls": old["new"]["urls"].add( - batch[bucket]["new"]["urls"]), - "wire_bytes": old["new"]["wire_bytes"].add( - batch[bucket]["new"]["wire_bytes"]), - }, - "revisit": { - "urls": old["revisit"]["urls"].add( - batch[bucket]["revisit"]["urls"]), - "wire_bytes": old["revisit"]["wire_bytes"].add( - batch[bucket]["revisit"]["wire_bytes"]), - }, - }))) - - def _update_batch(self): - with self._batch_lock: - batch_copy = copy.deepcopy(self._batch) - self._batch = {} - try: - if len(batch_copy) > 0: - # XXX can all the buckets be done in one query? - for bucket in batch_copy: - result = self._bucket_batch_update_reql( - bucket, batch_copy).run() - if (not result["inserted"] and not result["replaced"] - or sorted(result.values()) != [0,0,0,0,0,1]): - raise Exception( - "unexpected result %s updating stats %s" % ( - result, batch_copy[bucket])) - except Exception as e: - self.logger.error("problem updating stats", exc_info=True) - # now we need to restore the stats that didn't get saved to the - # batch so that they are saved in the next call to _update_batch() - with self._batch_lock: - self._add_to_batch(batch_copy) - finally: - if not self._stop.is_set(): - self._timer = threading.Timer(2.0, self._update_batch) - self._timer.name = "RethinkStats-batch-update-timer-%s" % ( - datetime.datetime.utcnow().isoformat()) - self._timer.start() - else: - self.logger.info("finished") def _ensure_db_table(self): dbs = self.rr.db_list().run() @@ -282,17 +225,38 @@ class RethinkStatsDb(StatsDb): self.table, primary_key="bucket", shards=1, replicas=self.replicas).run() - def close(self): - self.stop() + def _update_db(self, batch_buckets): + # XXX can all the buckets be done in one query? + for bucket in batch_buckets: + result = self._bucket_batch_update_reql( + bucket, batch_buckets[bucket]).run() + if (not result['inserted'] and not result['replaced'] + or sorted(result.values()) != [0,0,0,0,0,1]): + self.logger.error( + 'unexpected result %s updating stats %s' % ( + result, batch_buckets[bucket])) - def stop(self): - self.logger.info("stopping rethinkdb stats table batch updates") - self._stop.set() - if self._timer: - self._timer.join() - - def sync(self): - pass + def _bucket_batch_update_reql(self, bucket, new): + return self.rr.table(self.table).get(bucket).replace( + lambda old: r.branch( + old.eq(None), new, old.merge({ + 'total': { + 'urls': old['total']['urls'].add(new['total']['urls']), + 'wire_bytes': old['total']['wire_bytes'].add( + new['total']['wire_bytes']), + }, + 'new': { + 'urls': old['new']['urls'].add(new['new']['urls']), + 'wire_bytes': old['new']['wire_bytes'].add( + new['new']['wire_bytes']), + }, + 'revisit': { + 'urls': old['revisit']['urls'].add( + new['revisit']['urls']), + 'wire_bytes': old['revisit']['wire_bytes'].add( + new['revisit']['wire_bytes']), + }, + }))) def value(self, bucket0="__all__", bucket1=None, bucket2=None): bucket0_stats = self.rr.table(self.table).get(bucket0).run() @@ -307,39 +271,6 @@ class RethinkStatsDb(StatsDb): return bucket0_stats[bucket1] return bucket0_stats - def tally(self, recorded_url, records): - buckets = self.buckets(recorded_url) - with self._batch_lock: - for bucket in buckets: - bucket_stats = self._batch.setdefault( - bucket, _empty_bucket(bucket)) - - bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += recorded_url.size - - if records: - if records[0].type == b'revisit': - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size - - def _add_to_batch(self, add_me): - with self._batch_lock: - for bucket in add_me: - bucket_stats = self._batch.setdefault( - bucket, _empty_bucket(bucket)) - bucket_stats["total"]["urls"] += add_me[bucket]["total"]["urls"] - bucket_stats["total"]["wire_bytes"] += add_me[bucket]["total"]["wire_bytes"] - bucket_stats["revisit"]["urls"] += add_me[bucket]["revisit"]["urls"] - bucket_stats["revisit"]["wire_bytes"] += add_me[bucket]["revisit"]["wire_bytes"] - bucket_stats["new"]["urls"] += add_me[bucket]["new"]["urls"] - bucket_stats["new"]["wire_bytes"] += add_me[bucket]["new"]["wire_bytes"] - - def notify(self, recorded_url, records): - self.tally(recorded_url, records) - class RunningStats: ''' In-memory stats for measuring overall warcprox performance. diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 31b9b45..8850821 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -2,7 +2,7 @@ warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic, enqueue info on the recorded url queue -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -92,6 +92,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.url, rule)) def _enforce_limit(self, limit_key, limit_value, soft=False): + if not self.server.stats_db: + return bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2) _limit_key = limit_key @@ -328,7 +330,7 @@ class RecordedUrl: warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, - payload_digest=None): + payload_digest=None, warc_records=None): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -367,6 +369,7 @@ class RecordedUrl: self.duration = duration self.referer = referer self.payload_digest = payload_digest + self.warc_records = warc_records # inherit from object so that multiple inheritance from this class works # properly in python 2 @@ -375,8 +378,12 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__( - self, ca=None, recorded_url_q=None, stats_db=None, + self, stats_db=None, status_callback=None, options=warcprox.Options()): + self.status_callback = status_callback + self.stats_db = stats_db + self.options = options + server_address = ( options.address or 'localhost', options.port if options.port is not None else 8000) @@ -395,22 +402,13 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.digest_algorithm = options.digest_algorithm or 'sha1' - if ca is not None: - self.ca = ca - else: - ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - self.ca = CertificateAuthority(ca_file='warcprox-ca.pem', - certs_dir='./warcprox-ca', - ca_name=ca_name) + ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] + self.ca = CertificateAuthority( + ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca', + ca_name=ca_name) - if recorded_url_q is not None: - self.recorded_url_q = recorded_url_q - else: - self.recorded_url_q = warcprox.TimestampedQueue( - maxsize=options.queue_size or 1000) - - self.stats_db = stats_db - self.options = options + self.recorded_url_q = warcprox.TimestampedQueue( + maxsize=options.queue_size or 1000) self.running_stats = warcprox.stats.RunningStats() @@ -425,6 +423,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): 'queued_urls': self.recorded_url_q.qsize(), 'queue_max_size': self.recorded_url_q.maxsize, 'seconds_behind': self.recorded_url_q.seconds_behind(), + 'urls_processed': self.running_stats.urls, + 'warc_bytes_written': self.running_stats.warc_bytes, + 'start_time': self.running_stats.first_snap_time, }) elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1) result['rates_1min'] = { @@ -444,22 +445,20 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): 'urls_per_sec': urls_per_sec, 'warc_bytes_per_sec': warc_bytes_per_sec, } + # gets postfetch chain status from the controller + if self.status_callback: + result.update(self.status_callback()) return result class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__( - self, ca=None, recorded_url_q=None, stats_db=None, + self, stats_db=None, status_callback=None, options=warcprox.Options()): - if options.max_threads: - self.logger.info( - "max_threads=%s set by command line option", - options.max_threads) - warcprox.mitmproxy.PooledMitmProxy.__init__( - self, options.max_threads, options) + warcprox.mitmproxy.PooledMitmProxy.__init__(self, options) SingleThreadedWarcProxy.__init__( - self, ca, recorded_url_q, stats_db, options) + self, stats_db, status_callback, options) def server_activate(self): http_server.HTTPServer.server_activate(self) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 7ee9159..f823cc6 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -2,7 +2,7 @@ warcprox/writerthread.py - warc writer thread, reads from the recorded url queue, writes warc records, runs final tasks after warc records are written -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -28,44 +28,36 @@ except ImportError: import Queue as queue import logging -import threading import time -from datetime import datetime -from hanzo import warctools import warcprox -import sys -class WarcWriterThread(threading.Thread): - logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") - - def __init__( - self, recorded_url_q, name='WarcWriterThread', writer_pool=None, - dedup_db=None, listeners=[], options=warcprox.Options()): - """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" - threading.Thread.__init__(self, name=name) - self.recorded_url_q = recorded_url_q - self.stop = threading.Event() - if writer_pool: - self.writer_pool = writer_pool - else: - self.writer_pool = warcprox.writer.WarcWriterPool() - self.dedup_db = dedup_db - self.listeners = listeners - self.options = options - self.idle = None - self.method_filter = set(method.upper() for method in self.options.method_filter or []) - - def run(self): - if self.options.profile: - import cProfile - self.profiler = cProfile.Profile() - self.profiler.enable() - self._run() - self.profiler.disable() - else: - self._run() +class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): + logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} + + def __init__(self, options=warcprox.Options()): + warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + self.options = options + self.writer_pool = warcprox.writer.WarcWriterPool(options) + self.method_filter = set(method.upper() for method in self.options.method_filter or []) + + def _get_process_put(self): + try: + warcprox.BaseStandardPostfetchProcessor._get_process_put(self) + finally: + self.writer_pool.maybe_idle_rollover() + + def _process_url(self, recorded_url): + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + def _filter_accepts(self, recorded_url): if not self.method_filter: return True @@ -81,68 +73,9 @@ class WarcWriterThread(threading.Thread): # special warc name prefix '-' means "don't archive" return prefix != '-' and self._filter_accepts(recorded_url) - def _run(self): - self.name = '%s(tid=%s)'% (self.name, warcprox.gettid()) - while not self.stop.is_set(): - try: - while True: - try: - if self.stop.is_set(): - qsize = self.recorded_url_q.qsize() - if qsize % 50 == 0: - self.logger.info("%s urls left to write", qsize) - - recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - records = [] - self.idle = None - if self._should_archive(recorded_url): - if self.dedup_db: - warcprox.dedup.decorate_with_dedup_info(self.dedup_db, - recorded_url, base32=self.options.base32) - records = self.writer_pool.write_records(recorded_url) - - self._final_tasks(recorded_url, records) - - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() - except queue.Empty: - if self.stop.is_set(): - break - self.idle = time.time() - finally: - self.writer_pool.maybe_idle_rollover() - - self.logger.info('WarcWriterThread shutting down') - self._shutdown() - except Exception as e: - if isinstance(e, OSError) and e.errno == 28: - # OSError: [Errno 28] No space left on device - self.logger.critical( - 'shutting down due to fatal problem: %s: %s', - e.__class__.__name__, e) - self._shutdown() - sys.exit(1) - - self.logger.critical( - 'WarcWriterThread will try to continue after unexpected ' - 'error', exc_info=True) - time.sleep(0.5) - - def _shutdown(self): - self.writer_pool.close_writers() - for listener in self.listeners: - if hasattr(listener, 'stop'): - try: - listener.stop() - except: - self.logger.error( - '%s raised exception', listener.stop, exc_info=True) - - # closest thing we have to heritrix crawl log at the moment def _log(self, recorded_url, records): try: - payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8") + payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8") except: payload_digest = "-" @@ -156,13 +89,3 @@ class WarcWriterThread(threading.Thread): recorded_url.method, recorded_url.url.decode("utf-8"), recorded_url.mimetype, recorded_url.size, payload_digest, type_, filename, offset) - - def _final_tasks(self, recorded_url, records): - if self.listeners: - for listener in self.listeners: - try: - listener.notify(recorded_url, records) - except: - self.logger.error('%s raised exception', - listener.notify, exc_info=True) - self._log(recorded_url, records)