From df38cf856d30b265e29db5a88e3f2d28518ff2ae Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 21 Aug 2015 00:27:30 +0000 Subject: [PATCH] rethinkdb for stats --- warcprox/main.py | 29 ++++---- warcprox/stats.py | 124 ++++++++++++++++++++++++++------ warcprox/tests/test_warcprox.py | 38 +++++++--- 3 files changed, 146 insertions(+), 45 deletions(-) diff --git a/warcprox/main.py b/warcprox/main.py index f6bf322..e7bfee2 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -56,11 +56,6 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') - group = arg_parser.add_mutually_exclusive_group() - group.add_argument('--dedup-rethinkdb-url', dest='dedup_rethinkdb_url', - help='persistent deduplication rethink db url, e.g. rethinkdb://db0.foo.org,db0.foo.org:38015,db1.foo.org/warcprox/dedup') - group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', - default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') arg_parser.add_argument('--stats-db-file', dest='stats_db_file', default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') arg_parser.add_argument('-P', '--playback-port', dest='playback_port', @@ -68,6 +63,13 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', default='./warcprox-playback-index.db', help='playback index database file (only used if --playback-port is specified)') + group = arg_parser.add_mutually_exclusive_group() + group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', + help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox", + help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.version_str)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') @@ -112,23 +114,18 @@ def main(argv=sys.argv): logging.fatal(e) exit(1) - if args.dedup_rethinkdb_url: - m = re.fullmatch(r"rethinkdb://([^/]+)/([^/]+)/([^/]+)", args.dedup_rethinkdb_url) - if m: - servers = m.group(1).split(",") - db = m.group(2) - table = m.group(3) - dedup_db = warcprox.dedup.RethinkDedupDb(servers, db, table) - else: - logging.fatal("failed to parse dedup rethinkdb url %s", args.dedup_rethinkdb_url) - exit(1) + if args.rethinkdb_servers: + dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None else: dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) - if args.stats_db_file in (None, '', '/dev/null'): + + if args.rethinkdb_servers: + stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) + elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') stats_db = None else: diff --git a/warcprox/stats.py b/warcprox/stats.py index 6ad3ca4..f700cca 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -14,6 +14,29 @@ import logging import os import json from hanzo import warctools +import rethinkdb +r = rethinkdb +import random + +def _empty_bucket(bucket): + return { + "bucket": bucket, + "total": { + "urls": 0, + "wire_bytes": 0, + # "warc_bytes": 0, + }, + "new": { + "urls": 0, + "wire_bytes": 0, + # "warc_bytes": 0, + }, + "revisit": { + "urls": 0, + "wire_bytes": 0, + # "warc_bytes": 0, + }, + } class StatsDb: logger = logging.getLogger("warcprox.stats.StatsDb") @@ -35,25 +58,6 @@ class StatsDb: except: pass - def _empty_bucket(self): - return { - "total": { - "urls": 0, - "wire_bytes": 0, - # "warc_bytes": 0, - }, - "new": { - "urls": 0, - "wire_bytes": 0, - # "warc_bytes": 0, - }, - "revisit": { - "urls": 0, - "wire_bytes": 0, - # "warc_bytes": 0, - }, - } - def value(self, bucket0="__all__", bucket1=None, bucket2=None): if bucket0 in self.db: bucket0_stats = json.loads(self.db[bucket0].decode("utf-8")) @@ -81,7 +85,7 @@ class StatsDb: if bucket in self.db: bucket_stats = json.loads(self.db[bucket].decode("utf-8")) else: - bucket_stats = self._empty_bucket() + bucket_stats = _empty_bucket(bucket) bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size @@ -95,3 +99,83 @@ class StatsDb: self.db[bucket] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") +class RethinkStatsDb: + logger = logging.getLogger("warcprox.stats.RethinkStatsDb") + + def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3): + self.servers = servers + self.db = db + self.table = table + self.shards = shards + self.replicas = replicas + self._ensure_db_table() + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + server = random.choice(self.servers) + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + + def _ensure_db_table(self): + with self._random_server_connection() as conn: + dbs = r.db_list().run(conn) + if not self.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.db)) + r.db_create(self.db).run(conn) + tables = r.db(self.db).table_list().run(conn) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) + r.db(self.db).table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run(conn) + + def close(self): + pass + + def sync(self): + pass + + def value(self, bucket0="__all__", bucket1=None, bucket2=None): + # XXX use pluck? + with self._random_server_connection() as conn: + bucket0_stats = r.db(self.db).table(self.table).get(bucket0).run(conn) + self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) + if bucket0_stats: + if bucket1: + if bucket2: + return bucket0_stats[bucket1][bucket2] + else: + return bucket0_stats[bucket1] + return bucket0_stats + + def tally(self, recorded_url, records): + buckets = ["__all__"] + + if (recorded_url.warcprox_meta + and "stats" in recorded_url.warcprox_meta + and "buckets" in recorded_url.warcprox_meta["stats"]): + buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"]) + else: + buckets.append("__unspecified__") + + with self._random_server_connection() as conn: + for bucket in buckets: + bucket_stats = self.value(bucket) or _empty_bucket(bucket) + + bucket_stats["total"]["urls"] += 1 + bucket_stats["total"]["wire_bytes"] += recorded_url.size + + if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size + + self.logger.info("saving %s", bucket_stats) + result = r.db(self.db).table(self.table).insert(bucket_stats, conflict="replace").run(conn) + if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + raise Exception("unexpected result %s saving %s", result, record) + diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 5d9141c..2489807 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -135,7 +135,7 @@ def https_daemon(request, cert): def dedup_db(request, rethinkdb_servers): if rethinkdb_servers: servers = rethinkdb_servers.split(",") - db = 'warcprox_test_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) ddb = warcprox.dedup.RethinkDedupDb(servers, db) else: f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) @@ -157,7 +157,32 @@ def dedup_db(request, rethinkdb_servers): return ddb @pytest.fixture(scope="module") -def warcprox_(request, dedup_db): +def stats_db(request, rethinkdb_servers): + if rethinkdb_servers: + servers = rethinkdb_servers.split(",") + db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + sdb = warcprox.stats.RethinkStatsDb(servers, db) + else: + f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) + f.close() + stats_db_file = f.name + sdb = warcprox.stats.StatsDb(stats_db_file) + + def fin(): + if rethinkdb_servers: + logging.info('dropping rethinkdb database {}'.format(db)) + with sdb._random_server_connection() as conn: + result = r.db_drop(db).run(conn) + logging.info("result=%s", result) + else: + logging.info('deleting file {}'.format(stats_db_file)) + os.unlink(stats_db_file) + request.addfinalizer(fin) + + return sdb + +@pytest.fixture(scope="module") +def warcprox_(request, dedup_db, stats_db): f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True) f.close() # delete it, or CertificateAuthority will try to read it ca_file = f.name @@ -166,11 +191,6 @@ def warcprox_(request, dedup_db): recorded_url_q = queue.Queue() - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) - f.close() - stats_db_file = f.name - stats_db = warcprox.stats.StatsDb(stats_db_file) - proxy = warcprox.warcproxy.WarcProxy(server_address=('localhost', 0), ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db) @@ -201,7 +221,7 @@ def warcprox_(request, dedup_db): logging.info('stopping warcprox') warcprox_.stop.set() warcprox_thread.join() - for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file, stats_db_file): + for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file): if os.path.isdir(f): logging.info('deleting directory {}'.format(f)) shutil.rmtree(f) @@ -433,7 +453,7 @@ def test_limits(http_daemon, archiving_proxies): response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 assert response.reason == "Reached limit" - expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}} + expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'bucket': 'job1', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}} assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n"