diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 281d1f9..ebca589 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -57,8 +57,8 @@ import certauth.certauth import warcprox -# logging.basicConfig(stream=sys.stdout, level=logging.INFO, -logging.basicConfig(stream=sys.stdout, level=warcprox.TRACE, +logging.basicConfig( + stream=sys.stdout, level=logging.INFO, # level=warcprox.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -194,9 +194,9 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): def fin(): if captures_db: captures_db.close() - logging.info('dropping rethinkdb database {}'.format(db)) - result = captures_db.r.db_drop(db).run() - logging.info("result=%s", result) + # logging.info('dropping rethinkdb database {}'.format(db)) + # result = captures_db.r.db_drop(db).run() + # logging.info("result=%s", result) request.addfinalizer(fin) return captures_db @@ -862,40 +862,55 @@ def test_host_doc_limit( time.sleep(0.5) # same host but different scheme and port -- host limit still applies - url = 'https://localhost:{}/q/r'.format(https_daemon.server_port) - for i in range(9): + url = 'https://localhost:{}/o/p'.format(https_daemon.server_port) + for i in range(8): response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True, verify=False) assert response.status_code == 200 - assert response.headers['warcprox-test-header'] == 'q!' - assert response.content == b'I am the warcprox test payload! rrrrrrrrrr!\n' + assert response.headers['warcprox-test-header'] == 'o!' + assert response.content == b'I am the warcprox test payload! pppppppppp!\n' # wait for writer thread to process time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) + # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) + time.sleep(2.0) + + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'o!' + assert response.content == b'I am the warcprox test payload! pppppppppp!\n' + + # wait for writer thread to process time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) + time.sleep(2.0) # back to http, and this is the 11th request - url = 'http://localhost:{}/u/v'.format(http_daemon.server_port) + url = 'http://localhost:{}/o/p'.format(http_daemon.server_port) response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 assert response.reason == "Reached limit" - expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}} + expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'bucket': 'test_host_doc_limit_bucket:localhost', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'new': {'wire_bytes': 135, 'urls': 1}, 'total': {'wire_bytes': 1350, 'urls': 10}}}} assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n" # https also blocked - url = 'https://localhost:{}/w/x'.format(https_daemon.server_port) + url = 'https://localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True, verify=False) assert response.status_code == 420 assert response.reason == "Reached limit" - expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}} + expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'bucket': 'test_host_doc_limit_bucket:localhost', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'new': {'wire_bytes': 135, 'urls': 1}, 'total': {'wire_bytes': 1350, 'urls': 10}}}} assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n" @@ -920,7 +935,8 @@ def test_host_data_limit( time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) - time.sleep(0.5) + # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) + time.sleep(2.0) # duplicate, does not count toward limit url = 'https://localhost:{}/y/z'.format(https_daemon.server_port) @@ -935,7 +951,8 @@ def test_host_data_limit( time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) - time.sleep(0.5) + # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) + time.sleep(2.0) # novel, pushes stats over the limit url = 'https://localhost:{}/z/~'.format(https_daemon.server_port) @@ -950,7 +967,8 @@ def test_host_data_limit( time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) - time.sleep(0.5) + # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) + time.sleep(2.0) # blocked because we're over the limit now url = 'http://localhost:{}/y/z'.format(http_daemon.server_port) diff --git a/warcprox/controller.py b/warcprox/controller.py index 760a1e8..540371a 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -142,7 +142,9 @@ class WarcproxController(object): status_info['queue_size'] = self.proxy.recorded_url_q.qsize() self.status_info = self.service_registry.heartbeat(status_info) - self.logger.debug("status in service registry: %s", self.status_info) + self.logger.log( + warcprox.TRACE, "status in service registry: %s", + self.status_info) def run_until_shutdown(self): """ diff --git a/warcprox/stats.py b/warcprox/stats.py index edbb131..8d5b324 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -194,7 +194,7 @@ class RethinkStatsDb(StatsDb): """Starts batch update repeating timer.""" self._update_batch() # starts repeating timer - def _bucket_batch_update_reql(bucket): + def _bucket_batch_update_reql(self, bucket): return self.r.table(self.table).get(bucket).replace( lambda old: r.branch( old.eq(None), self._batch[bucket], old.merge({