mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
couple of fixes for host limits
This commit is contained in:
parent
2fe0c2f25b
commit
fabd732b7f
@ -57,8 +57,8 @@ import certauth.certauth
|
|||||||
|
|
||||||
import warcprox
|
import warcprox
|
||||||
|
|
||||||
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
|
logging.basicConfig(
|
||||||
logging.basicConfig(stream=sys.stdout, level=warcprox.TRACE,
|
stream=sys.stdout, level=logging.INFO, # level=warcprox.TRACE,
|
||||||
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
|
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
|
||||||
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||||
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
|
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
|
||||||
@ -194,9 +194,9 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
|
|||||||
def fin():
|
def fin():
|
||||||
if captures_db:
|
if captures_db:
|
||||||
captures_db.close()
|
captures_db.close()
|
||||||
logging.info('dropping rethinkdb database {}'.format(db))
|
# logging.info('dropping rethinkdb database {}'.format(db))
|
||||||
result = captures_db.r.db_drop(db).run()
|
# result = captures_db.r.db_drop(db).run()
|
||||||
logging.info("result=%s", result)
|
# logging.info("result=%s", result)
|
||||||
request.addfinalizer(fin)
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
return captures_db
|
return captures_db
|
||||||
@ -862,40 +862,55 @@ def test_host_doc_limit(
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
# same host but different scheme and port -- host limit still applies
|
# same host but different scheme and port -- host limit still applies
|
||||||
url = 'https://localhost:{}/q/r'.format(https_daemon.server_port)
|
url = 'https://localhost:{}/o/p'.format(https_daemon.server_port)
|
||||||
for i in range(9):
|
for i in range(8):
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
url, proxies=archiving_proxies, headers=headers, stream=True,
|
url, proxies=archiving_proxies, headers=headers, stream=True,
|
||||||
verify=False)
|
verify=False)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.headers['warcprox-test-header'] == 'q!'
|
assert response.headers['warcprox-test-header'] == 'o!'
|
||||||
assert response.content == b'I am the warcprox test payload! rrrrrrrrrr!\n'
|
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
|
||||||
|
|
||||||
# wait for writer thread to process
|
# wait for writer thread to process
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
while not warcprox_.warc_writer_thread.idle:
|
while not warcprox_.warc_writer_thread.idle:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
response = requests.get(
|
||||||
|
url, proxies=archiving_proxies, headers=headers, stream=True,
|
||||||
|
verify=False)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.headers['warcprox-test-header'] == 'o!'
|
||||||
|
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
|
||||||
|
|
||||||
|
# wait for writer thread to process
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
while not warcprox_.warc_writer_thread.idle:
|
||||||
|
time.sleep(0.5)
|
||||||
|
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
# back to http, and this is the 11th request
|
# back to http, and this is the 11th request
|
||||||
url = 'http://localhost:{}/u/v'.format(http_daemon.server_port)
|
url = 'http://localhost:{}/o/p'.format(http_daemon.server_port)
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
url, proxies=archiving_proxies, headers=headers, stream=True)
|
url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||||
assert response.status_code == 420
|
assert response.status_code == 420
|
||||||
assert response.reason == "Reached limit"
|
assert response.reason == "Reached limit"
|
||||||
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}}
|
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'bucket': 'test_host_doc_limit_bucket:localhost', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'new': {'wire_bytes': 135, 'urls': 1}, 'total': {'wire_bytes': 1350, 'urls': 10}}}}
|
||||||
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
||||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||||
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
|
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
|
||||||
|
|
||||||
# https also blocked
|
# https also blocked
|
||||||
url = 'https://localhost:{}/w/x'.format(https_daemon.server_port)
|
url = 'https://localhost:{}/o/p'.format(https_daemon.server_port)
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
url, proxies=archiving_proxies, headers=headers, stream=True,
|
url, proxies=archiving_proxies, headers=headers, stream=True,
|
||||||
verify=False)
|
verify=False)
|
||||||
assert response.status_code == 420
|
assert response.status_code == 420
|
||||||
assert response.reason == "Reached limit"
|
assert response.reason == "Reached limit"
|
||||||
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}}
|
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'bucket': 'test_host_doc_limit_bucket:localhost', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'new': {'wire_bytes': 135, 'urls': 1}, 'total': {'wire_bytes': 1350, 'urls': 10}}}}
|
||||||
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
||||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||||
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
|
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
|
||||||
@ -920,7 +935,8 @@ def test_host_data_limit(
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
while not warcprox_.warc_writer_thread.idle:
|
while not warcprox_.warc_writer_thread.idle:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
time.sleep(0.5)
|
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
# duplicate, does not count toward limit
|
# duplicate, does not count toward limit
|
||||||
url = 'https://localhost:{}/y/z'.format(https_daemon.server_port)
|
url = 'https://localhost:{}/y/z'.format(https_daemon.server_port)
|
||||||
@ -935,7 +951,8 @@ def test_host_data_limit(
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
while not warcprox_.warc_writer_thread.idle:
|
while not warcprox_.warc_writer_thread.idle:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
time.sleep(0.5)
|
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
# novel, pushes stats over the limit
|
# novel, pushes stats over the limit
|
||||||
url = 'https://localhost:{}/z/~'.format(https_daemon.server_port)
|
url = 'https://localhost:{}/z/~'.format(https_daemon.server_port)
|
||||||
@ -950,7 +967,8 @@ def test_host_data_limit(
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
while not warcprox_.warc_writer_thread.idle:
|
while not warcprox_.warc_writer_thread.idle:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
time.sleep(0.5)
|
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
# blocked because we're over the limit now
|
# blocked because we're over the limit now
|
||||||
url = 'http://localhost:{}/y/z'.format(http_daemon.server_port)
|
url = 'http://localhost:{}/y/z'.format(http_daemon.server_port)
|
||||||
|
@ -142,7 +142,9 @@ class WarcproxController(object):
|
|||||||
status_info['queue_size'] = self.proxy.recorded_url_q.qsize()
|
status_info['queue_size'] = self.proxy.recorded_url_q.qsize()
|
||||||
|
|
||||||
self.status_info = self.service_registry.heartbeat(status_info)
|
self.status_info = self.service_registry.heartbeat(status_info)
|
||||||
self.logger.debug("status in service registry: %s", self.status_info)
|
self.logger.log(
|
||||||
|
warcprox.TRACE, "status in service registry: %s",
|
||||||
|
self.status_info)
|
||||||
|
|
||||||
def run_until_shutdown(self):
|
def run_until_shutdown(self):
|
||||||
"""
|
"""
|
||||||
|
@ -194,7 +194,7 @@ class RethinkStatsDb(StatsDb):
|
|||||||
"""Starts batch update repeating timer."""
|
"""Starts batch update repeating timer."""
|
||||||
self._update_batch() # starts repeating timer
|
self._update_batch() # starts repeating timer
|
||||||
|
|
||||||
def _bucket_batch_update_reql(bucket):
|
def _bucket_batch_update_reql(self, bucket):
|
||||||
return self.r.table(self.table).get(bucket).replace(
|
return self.r.table(self.table).get(bucket).replace(
|
||||||
lambda old: r.branch(
|
lambda old: r.branch(
|
||||||
old.eq(None), self._batch[bucket], old.merge({
|
old.eq(None), self._batch[bucket], old.merge({
|
||||||
|
Loading…
x
Reference in New Issue
Block a user