From 95e611a5d02924cbfea92f34918d3513e617a9b7 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 28 Oct 2015 21:02:42 +0000 Subject: [PATCH] update stats in RethinkDb asynchronously, since profiling shows this to be a bottleneck in WarcWriterThread (which in turn makes it a bottleneck for the whole app) --- setup.py | 14 +++++----- tests/test_warcprox.py | 31 ++++++++++++++++----- warcprox/stats.py | 61 ++++++++++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/setup.py b/setup.py index ed8bbb2..a26166b 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,12 @@ class PyTest(TestCommand): errno = pytest.main(self.test_args) sys.exit(errno) +deps = ['certauth>=1.1.0', 'warctools', 'kafka-python', 'surt', 'rethinkstuff'] +try: + import concurrent.futures +except: + deps.append('futures') + setuptools.setup(name='warcprox', version='1.5.0', description='WARC writing MITM HTTP/S proxy', @@ -26,13 +32,7 @@ setuptools.setup(name='warcprox', long_description=open('README.rst').read(), license='GPL', packages=['warcprox'], - install_requires=[ - 'certauth>=1.1.0', - 'warctools', - 'kafka-python', - 'surt', - 'rethinkstuff', - ], + install_requires=deps, tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index badf9ed..1f6c2fd 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -398,8 +398,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while (not warcprox_.warc_writer_thread.idle + or (warcprox_.proxy.stats_db + and hasattr(warcprox_.proxy.stats_db, "_executor") + and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): time.sleep(0.5) + time.sleep(0.5) # check in dedup db (no change from prev) dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') @@ -461,8 +465,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while (not warcprox_.warc_writer_thread.idle + or (warcprox_.proxy.stats_db + and hasattr(warcprox_.proxy.stats_db, "_executor") + and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): time.sleep(0.5) + time.sleep(0.5) + # check in dedup db (no change from prev) dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') @@ -491,8 +500,12 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while (not warcprox_.warc_writer_thread.idle + or (warcprox_.proxy.stats_db + and hasattr(warcprox_.proxy.stats_db, "_executor") + and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): time.sleep(0.5) + time.sleep(0.5) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 @@ -515,8 +528,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while (not warcprox_.warc_writer_thread.idle + or (warcprox_.proxy.stats_db + and hasattr(warcprox_.proxy.stats_db, "_executor") + and warcprox_.proxy.stats_db._executor._work_queue.qsize() > 0)): time.sleep(0.5) + time.sleep(0.5) # check url1 in dedup db bucket_a dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") @@ -541,6 +558,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) + time.sleep(0.5) # check url2 in dedup db bucket_b dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") @@ -568,6 +586,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, time.sleep(0.5) while not warcprox_.warc_writer_thread.idle: time.sleep(0.5) + time.sleep(0.5) # close the warc assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] @@ -575,14 +594,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, warc_path = os.path.join(writer.directory, writer._f_finalname) warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) - + # read the warc fh = warctools.ArchiveRecord.open_archive(warc_path) record_iter = fh.read_records(limit=None, offsets=True) try: (offset, record, errors) = next(record_iter) assert record.type == b'warcinfo' - + # url1 bucket_a (offset, record, errors) = next(record_iter) assert record.type == b'response' diff --git a/warcprox/stats.py b/warcprox/stats.py index b27c021..9be1b54 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -14,6 +14,7 @@ import json from hanzo import warctools import random import warcprox +import concurrent.futures def _empty_bucket(bucket): return { @@ -95,7 +96,7 @@ class StatsDb: if b in self.db: bucket_stats = json.loads(self.db[b].decode("utf-8")) else: - bucket_stats = _empty_bucket(b) + bucket_stats = _empty_bucket(b) bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size @@ -119,6 +120,7 @@ class RethinkStatsDb: self.replicas = replicas or min(3, len(r.servers)) self._ensure_db_table() self.options = options + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) def _ensure_db_table(self): dbs = self.r.db_list().run() @@ -127,12 +129,15 @@ class RethinkStatsDb: self.r.db_create(self.r.dbname).run() tables = self.r.table_list().run() if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s", + self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s", repr(self.table), repr(self.r.dbname), self.shards, self.replicas) self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() def close(self): - pass + self.logger.info("waiting for ~%s tasks to finish", + self._executor._work_queue.qsize() + (self._executor._max_workers/2)) + self._executor.shutdown(wait=True) + self.logger.info("shut down complete") def sync(self): pass @@ -149,7 +154,32 @@ class RethinkStatsDb: return bucket0_stats[bucket1] return bucket0_stats - def tally(self, recorded_url, records): + def _tally(self, buckets, size, is_revisit): + try: + self.logger.info("starting task self._tally(%s)", (buckets, size, is_revisit)) + for bucket in buckets: + bucket_stats = self.value(bucket) or _empty_bucket(bucket) + + bucket_stats["total"]["urls"] += 1 + bucket_stats["total"]["wire_bytes"] += size + + if is_revisit: + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += size + + self.logger.debug("saving %s", bucket_stats) + result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run() + if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + raise Exception("unexpected result %s saving %s", result, record) + + self.logger.info("finished task self._tally(%s)", (buckets, size, is_revisit)) + except: + self.logger.error("unexpected problem tallying stats", exc_info=True) + + def _extract_stats_info(self, recorded_url, records): buckets = ["__all__"] if (recorded_url.warcprox_meta @@ -159,24 +189,15 @@ class RethinkStatsDb: else: buckets.append("__unspecified__") - for bucket in buckets: - bucket_stats = self.value(bucket) or _empty_bucket(bucket) + is_revisit = records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT - bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += recorded_url.size + return buckets, recorded_url.size, is_revisit - if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size - - self.logger.debug("saving %s", bucket_stats) - result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run() - if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: - raise Exception("unexpected result %s saving %s", result, record) + def tally(self, recorded_url, records): + self._tally(self._extract_stats_info(recorded_url, records)) def notify(self, recorded_url, records): - self.tally(recorded_url, records) + args = self._extract_stats_info(recorded_url, records) + self.logger.info("submitting task self._tally(%s)", args) + self._executor.submit(self._tally, *args)