update stats in RethinkDb asynchronously, since profiling shows this to be a bottleneck in WarcWriterThread (which in turn makes it a bottleneck for the whole app)

This commit is contained in:
Noah Levitt 2015-10-28 21:02:42 +00:00
parent 6b3cd9de2e
commit 95e611a5d0
3 changed files with 73 additions and 33 deletions

View File

@ -17,6 +17,12 @@ class PyTest(TestCommand):
errno = pytest.main(self.test_args)
sys.exit(errno)
deps = ['certauth>=1.1.0', 'warctools', 'kafka-python', 'surt', 'rethinkstuff']
try:
import concurrent.futures
except:
deps.append('futures')
setuptools.setup(name='warcprox',
version='1.5.0',
description='WARC writing MITM HTTP/S proxy',
@ -26,13 +32,7 @@ setuptools.setup(name='warcprox',
long_description=open('README.rst').read(),
license='GPL',
packages=['warcprox'],
install_requires=[
'certauth>=1.1.0',
'warctools',
'kafka-python',
'surt',
'rethinkstuff',
],
install_requires=deps,
tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
cmdclass = {'test': PyTest},
test_suite='warcprox.tests',

View File

@ -398,8 +398,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while (not warcprox_.warc_writer_thread.idle
or (warcprox_.proxy.stats_db
and hasattr(warcprox_.proxy.stats_db, "_executor")
and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
@ -461,8 +465,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while (not warcprox_.warc_writer_thread.idle
or (warcprox_.proxy.stats_db
and hasattr(warcprox_.proxy.stats_db, "_executor")
and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
@ -491,8 +500,12 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while (not warcprox_.warc_writer_thread.idle
or (warcprox_.proxy.stats_db
and hasattr(warcprox_.proxy.stats_db, "_executor")
and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
time.sleep(0.5)
time.sleep(0.5)
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420
@ -515,8 +528,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while (not warcprox_.warc_writer_thread.idle
or (warcprox_.proxy.stats_db
and hasattr(warcprox_.proxy.stats_db, "_executor")
and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)):
time.sleep(0.5)
time.sleep(0.5)
# check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
@ -541,6 +558,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
@ -568,6 +586,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# close the warc
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
@ -575,14 +594,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path)
# read the warc
fh = warctools.ArchiveRecord.open_archive(warc_path)
record_iter = fh.read_records(limit=None, offsets=True)
try:
(offset, record, errors) = next(record_iter)
assert record.type == b'warcinfo'
# url1 bucket_a
(offset, record, errors) = next(record_iter)
assert record.type == b'response'

View File

@ -14,6 +14,7 @@ import json
from hanzo import warctools
import random
import warcprox
import concurrent.futures
def _empty_bucket(bucket):
return {
@ -95,7 +96,7 @@ class StatsDb:
if b in self.db:
bucket_stats = json.loads(self.db[b].decode("utf-8"))
else:
bucket_stats = _empty_bucket(b)
bucket_stats = _empty_bucket(b)
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
@ -119,6 +120,7 @@ class RethinkStatsDb:
self.replicas = replicas or min(3, len(r.servers))
self._ensure_db_table()
self.options = options
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
def _ensure_db_table(self):
dbs = self.r.db_list().run()
@ -127,12 +129,15 @@ class RethinkStatsDb:
self.r.db_create(self.r.dbname).run()
tables = self.r.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s",
self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s",
repr(self.table), repr(self.r.dbname), self.shards, self.replicas)
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
def close(self):
pass
self.logger.info("waiting for ~%s tasks to finish",
self._executor._work_queue.qsize() + (self._executor._max_workers/2))
self._executor.shutdown(wait=True)
self.logger.info("shut down complete")
def sync(self):
pass
@ -149,7 +154,32 @@ class RethinkStatsDb:
return bucket0_stats[bucket1]
return bucket0_stats
def tally(self, recorded_url, records):
def _tally(self, buckets, size, is_revisit):
try:
self.logger.info("starting task self._tally(%s)", (buckets, size, is_revisit))
for bucket in buckets:
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += size
if is_revisit:
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += size
self.logger.debug("saving %s", bucket_stats)
result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.info("finished task self._tally(%s)", (buckets, size, is_revisit))
except:
self.logger.error("unexpected problem tallying stats", exc_info=True)
def _extract_stats_info(self, recorded_url, records):
buckets = ["__all__"]
if (recorded_url.warcprox_meta
@ -159,24 +189,15 @@ class RethinkStatsDb:
else:
buckets.append("__unspecified__")
for bucket in buckets:
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
is_revisit = records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
return buckets, recorded_url.size, is_revisit
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
self.logger.debug("saving %s", bucket_stats)
result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)
def tally(self, recorded_url, records):
self._tally(self._extract_stats_info(recorded_url, records))
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
args = self._extract_stats_info(recorded_url, records)
self.logger.info("submitting task self._tally(%s)", args)
self._executor.submit(self._tally, *args)