mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
rethinkdb for stats
This commit is contained in:
parent
788bc69f47
commit
df38cf856d
@ -56,11 +56,6 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
|
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
|
||||||
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
||||||
default=False, help='write digests in Base32 instead of hex')
|
default=False, help='write digests in Base32 instead of hex')
|
||||||
group = arg_parser.add_mutually_exclusive_group()
|
|
||||||
group.add_argument('--dedup-rethinkdb-url', dest='dedup_rethinkdb_url',
|
|
||||||
help='persistent deduplication rethink db url, e.g. rethinkdb://db0.foo.org,db0.foo.org:38015,db1.foo.org/warcprox/dedup')
|
|
||||||
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
|
||||||
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
|
||||||
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
|
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
|
||||||
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
|
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
|
||||||
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
|
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
|
||||||
@ -68,6 +63,13 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
|
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
|
||||||
default='./warcprox-playback-index.db',
|
default='./warcprox-playback-index.db',
|
||||||
help='playback index database file (only used if --playback-port is specified)')
|
help='playback index database file (only used if --playback-port is specified)')
|
||||||
|
group = arg_parser.add_mutually_exclusive_group()
|
||||||
|
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
||||||
|
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
||||||
|
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
||||||
|
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
||||||
|
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox",
|
||||||
|
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
|
||||||
arg_parser.add_argument('--version', action='version',
|
arg_parser.add_argument('--version', action='version',
|
||||||
version="warcprox {}".format(warcprox.version_str))
|
version="warcprox {}".format(warcprox.version_str))
|
||||||
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
|
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
|
||||||
@ -112,23 +114,18 @@ def main(argv=sys.argv):
|
|||||||
logging.fatal(e)
|
logging.fatal(e)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
if args.dedup_rethinkdb_url:
|
if args.rethinkdb_servers:
|
||||||
m = re.fullmatch(r"rethinkdb://([^/]+)/([^/]+)/([^/]+)", args.dedup_rethinkdb_url)
|
dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db)
|
||||||
if m:
|
|
||||||
servers = m.group(1).split(",")
|
|
||||||
db = m.group(2)
|
|
||||||
table = m.group(3)
|
|
||||||
dedup_db = warcprox.dedup.RethinkDedupDb(servers, db, table)
|
|
||||||
else:
|
|
||||||
logging.fatal("failed to parse dedup rethinkdb url %s", args.dedup_rethinkdb_url)
|
|
||||||
exit(1)
|
|
||||||
elif args.dedup_db_file in (None, '', '/dev/null'):
|
elif args.dedup_db_file in (None, '', '/dev/null'):
|
||||||
logging.info('deduplication disabled')
|
logging.info('deduplication disabled')
|
||||||
dedup_db = None
|
dedup_db = None
|
||||||
else:
|
else:
|
||||||
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file)
|
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file)
|
||||||
|
|
||||||
if args.stats_db_file in (None, '', '/dev/null'):
|
|
||||||
|
if args.rethinkdb_servers:
|
||||||
|
stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db)
|
||||||
|
elif args.stats_db_file in (None, '', '/dev/null'):
|
||||||
logging.info('statistics tracking disabled')
|
logging.info('statistics tracking disabled')
|
||||||
stats_db = None
|
stats_db = None
|
||||||
else:
|
else:
|
||||||
|
@ -14,6 +14,29 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
|
import rethinkdb
|
||||||
|
r = rethinkdb
|
||||||
|
import random
|
||||||
|
|
||||||
|
def _empty_bucket(bucket):
|
||||||
|
return {
|
||||||
|
"bucket": bucket,
|
||||||
|
"total": {
|
||||||
|
"urls": 0,
|
||||||
|
"wire_bytes": 0,
|
||||||
|
# "warc_bytes": 0,
|
||||||
|
},
|
||||||
|
"new": {
|
||||||
|
"urls": 0,
|
||||||
|
"wire_bytes": 0,
|
||||||
|
# "warc_bytes": 0,
|
||||||
|
},
|
||||||
|
"revisit": {
|
||||||
|
"urls": 0,
|
||||||
|
"wire_bytes": 0,
|
||||||
|
# "warc_bytes": 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
class StatsDb:
|
class StatsDb:
|
||||||
logger = logging.getLogger("warcprox.stats.StatsDb")
|
logger = logging.getLogger("warcprox.stats.StatsDb")
|
||||||
@ -35,25 +58,6 @@ class StatsDb:
|
|||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _empty_bucket(self):
|
|
||||||
return {
|
|
||||||
"total": {
|
|
||||||
"urls": 0,
|
|
||||||
"wire_bytes": 0,
|
|
||||||
# "warc_bytes": 0,
|
|
||||||
},
|
|
||||||
"new": {
|
|
||||||
"urls": 0,
|
|
||||||
"wire_bytes": 0,
|
|
||||||
# "warc_bytes": 0,
|
|
||||||
},
|
|
||||||
"revisit": {
|
|
||||||
"urls": 0,
|
|
||||||
"wire_bytes": 0,
|
|
||||||
# "warc_bytes": 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
||||||
if bucket0 in self.db:
|
if bucket0 in self.db:
|
||||||
bucket0_stats = json.loads(self.db[bucket0].decode("utf-8"))
|
bucket0_stats = json.loads(self.db[bucket0].decode("utf-8"))
|
||||||
@ -81,7 +85,7 @@ class StatsDb:
|
|||||||
if bucket in self.db:
|
if bucket in self.db:
|
||||||
bucket_stats = json.loads(self.db[bucket].decode("utf-8"))
|
bucket_stats = json.loads(self.db[bucket].decode("utf-8"))
|
||||||
else:
|
else:
|
||||||
bucket_stats = self._empty_bucket()
|
bucket_stats = _empty_bucket(bucket)
|
||||||
|
|
||||||
bucket_stats["total"]["urls"] += 1
|
bucket_stats["total"]["urls"] += 1
|
||||||
bucket_stats["total"]["wire_bytes"] += recorded_url.size
|
bucket_stats["total"]["wire_bytes"] += recorded_url.size
|
||||||
@ -95,3 +99,83 @@ class StatsDb:
|
|||||||
|
|
||||||
self.db[bucket] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8")
|
self.db[bucket] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8")
|
||||||
|
|
||||||
|
class RethinkStatsDb:
|
||||||
|
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
|
||||||
|
|
||||||
|
def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3):
|
||||||
|
self.servers = servers
|
||||||
|
self.db = db
|
||||||
|
self.table = table
|
||||||
|
self.shards = shards
|
||||||
|
self.replicas = replicas
|
||||||
|
self._ensure_db_table()
|
||||||
|
|
||||||
|
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||||
|
# "Best practices: Managing connections: a connection per request"
|
||||||
|
def _random_server_connection(self):
|
||||||
|
server = random.choice(self.servers)
|
||||||
|
try:
|
||||||
|
host, port = server.split(":")
|
||||||
|
return r.connect(host=host, port=port)
|
||||||
|
except ValueError:
|
||||||
|
return r.connect(host=server)
|
||||||
|
|
||||||
|
def _ensure_db_table(self):
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
dbs = r.db_list().run(conn)
|
||||||
|
if not self.db in dbs:
|
||||||
|
self.logger.info("creating rethinkdb database %s", repr(self.db))
|
||||||
|
r.db_create(self.db).run(conn)
|
||||||
|
tables = r.db(self.db).table_list().run(conn)
|
||||||
|
if not self.table in tables:
|
||||||
|
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
|
||||||
|
r.db(self.db).table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run(conn)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
||||||
|
# XXX use pluck?
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
bucket0_stats = r.db(self.db).table(self.table).get(bucket0).run(conn)
|
||||||
|
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
|
||||||
|
if bucket0_stats:
|
||||||
|
if bucket1:
|
||||||
|
if bucket2:
|
||||||
|
return bucket0_stats[bucket1][bucket2]
|
||||||
|
else:
|
||||||
|
return bucket0_stats[bucket1]
|
||||||
|
return bucket0_stats
|
||||||
|
|
||||||
|
def tally(self, recorded_url, records):
|
||||||
|
buckets = ["__all__"]
|
||||||
|
|
||||||
|
if (recorded_url.warcprox_meta
|
||||||
|
and "stats" in recorded_url.warcprox_meta
|
||||||
|
and "buckets" in recorded_url.warcprox_meta["stats"]):
|
||||||
|
buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"])
|
||||||
|
else:
|
||||||
|
buckets.append("__unspecified__")
|
||||||
|
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
for bucket in buckets:
|
||||||
|
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
||||||
|
|
||||||
|
bucket_stats["total"]["urls"] += 1
|
||||||
|
bucket_stats["total"]["wire_bytes"] += recorded_url.size
|
||||||
|
|
||||||
|
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
|
||||||
|
bucket_stats["revisit"]["urls"] += 1
|
||||||
|
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
|
||||||
|
else:
|
||||||
|
bucket_stats["new"]["urls"] += 1
|
||||||
|
bucket_stats["new"]["wire_bytes"] += recorded_url.size
|
||||||
|
|
||||||
|
self.logger.info("saving %s", bucket_stats)
|
||||||
|
result = r.db(self.db).table(self.table).insert(bucket_stats, conflict="replace").run(conn)
|
||||||
|
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||||
|
raise Exception("unexpected result %s saving %s", result, record)
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ def https_daemon(request, cert):
|
|||||||
def dedup_db(request, rethinkdb_servers):
|
def dedup_db(request, rethinkdb_servers):
|
||||||
if rethinkdb_servers:
|
if rethinkdb_servers:
|
||||||
servers = rethinkdb_servers.split(",")
|
servers = rethinkdb_servers.split(",")
|
||||||
db = 'warcprox_test_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
||||||
ddb = warcprox.dedup.RethinkDedupDb(servers, db)
|
ddb = warcprox.dedup.RethinkDedupDb(servers, db)
|
||||||
else:
|
else:
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
|
||||||
@ -157,7 +157,32 @@ def dedup_db(request, rethinkdb_servers):
|
|||||||
return ddb
|
return ddb
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def warcprox_(request, dedup_db):
|
def stats_db(request, rethinkdb_servers):
|
||||||
|
if rethinkdb_servers:
|
||||||
|
servers = rethinkdb_servers.split(",")
|
||||||
|
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
||||||
|
sdb = warcprox.stats.RethinkStatsDb(servers, db)
|
||||||
|
else:
|
||||||
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
|
||||||
|
f.close()
|
||||||
|
stats_db_file = f.name
|
||||||
|
sdb = warcprox.stats.StatsDb(stats_db_file)
|
||||||
|
|
||||||
|
def fin():
|
||||||
|
if rethinkdb_servers:
|
||||||
|
logging.info('dropping rethinkdb database {}'.format(db))
|
||||||
|
with sdb._random_server_connection() as conn:
|
||||||
|
result = r.db_drop(db).run(conn)
|
||||||
|
logging.info("result=%s", result)
|
||||||
|
else:
|
||||||
|
logging.info('deleting file {}'.format(stats_db_file))
|
||||||
|
os.unlink(stats_db_file)
|
||||||
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
|
return sdb
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def warcprox_(request, dedup_db, stats_db):
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
|
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
|
||||||
f.close() # delete it, or CertificateAuthority will try to read it
|
f.close() # delete it, or CertificateAuthority will try to read it
|
||||||
ca_file = f.name
|
ca_file = f.name
|
||||||
@ -166,11 +191,6 @@ def warcprox_(request, dedup_db):
|
|||||||
|
|
||||||
recorded_url_q = queue.Queue()
|
recorded_url_q = queue.Queue()
|
||||||
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
|
|
||||||
f.close()
|
|
||||||
stats_db_file = f.name
|
|
||||||
stats_db = warcprox.stats.StatsDb(stats_db_file)
|
|
||||||
|
|
||||||
proxy = warcprox.warcproxy.WarcProxy(server_address=('localhost', 0), ca=ca,
|
proxy = warcprox.warcproxy.WarcProxy(server_address=('localhost', 0), ca=ca,
|
||||||
recorded_url_q=recorded_url_q, stats_db=stats_db)
|
recorded_url_q=recorded_url_q, stats_db=stats_db)
|
||||||
|
|
||||||
@ -201,7 +221,7 @@ def warcprox_(request, dedup_db):
|
|||||||
logging.info('stopping warcprox')
|
logging.info('stopping warcprox')
|
||||||
warcprox_.stop.set()
|
warcprox_.stop.set()
|
||||||
warcprox_thread.join()
|
warcprox_thread.join()
|
||||||
for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file, stats_db_file):
|
for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file):
|
||||||
if os.path.isdir(f):
|
if os.path.isdir(f):
|
||||||
logging.info('deleting directory {}'.format(f))
|
logging.info('deleting directory {}'.format(f))
|
||||||
shutil.rmtree(f)
|
shutil.rmtree(f)
|
||||||
@ -433,7 +453,7 @@ def test_limits(http_daemon, archiving_proxies):
|
|||||||
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||||
assert response.status_code == 420
|
assert response.status_code == 420
|
||||||
assert response.reason == "Reached limit"
|
assert response.reason == "Reached limit"
|
||||||
expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}}
|
expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'bucket': 'job1', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}}
|
||||||
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
|
||||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||||
assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n"
|
assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user