tests pass with big rethinkdb captures table

This commit is contained in:
Noah Levitt 2015-08-25 01:26:51 +00:00
parent ab4e90c4b8
commit 6d673ee35f
6 changed files with 135 additions and 33 deletions

View File

@ -31,3 +31,4 @@ import warcprox.writer as writer
import warcprox.warc as warc import warcprox.warc as warc
import warcprox.writerthread as writerthread import warcprox.writerthread as writerthread
import warcprox.stats as stats import warcprox.stats as stats
import warcprox.bigtable as bigtable

View File

@ -7,16 +7,21 @@ from hanzo import warctools
import rethinkdb import rethinkdb
r = rethinkdb r = rethinkdb
import random import random
import warcprox
import base64
import surt
import os
class RethinkCaptures: class RethinkCaptures:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkCaptures")
def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3): def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()):
self.servers = servers self.servers = servers
self.db = db self.db = db
self.table = table self.table = table
self.shards = shards self.shards = shards
self.replicas = replicas self.replicas = replicas
self.options = options
self._ensure_db_table() self._ensure_db_table()
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
@ -38,27 +43,79 @@ class RethinkCaptures:
tables = r.db(self.db).table_list().run(conn) tables = r.db(self.db).table_list().run(conn)
if not self.table in tables: if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
r.db(db).table_create(table, shards=3, replicas=3).run(conn) r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn)
r.db(db).table(table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn)
r.db(db).table(table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn)
# r.db(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn) # r.dself.b(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn)
# r.db(self.db).table(self.table).index_create("timestamp").run(conn) # r.db(self.db).table(self.table).index_create("timestamp").run(conn)
# r.db(self.db).table(self.table).index_create("sha1base32").run(conn) # r.db(self.db).table(self.table).index_create("sha1base32").run(conn)
def find_response_by_digest(self, algo, raw_digest):
if algo != "sha1":
raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo))
sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
with self._random_server_connection() as conn:
cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response"], index="sha1_warc_type").run(conn)
results = list(cursor)
if len(results) > 1:
raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32)
elif len(results) == 1:
result = results[0]
else:
result = None
self.logger.info("returning %s for sha1base32=%s", result, sha1base32)
return result
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
canon_surt = surt.surt(recorded_url.url, trailing_comma=True, host_massage=False) if recorded_url.response_recorder.payload_digest.name != "sha1":
self.logger.warn("digest type is %s but big capture table is indexed by sha1", recorded_url.response_recorder.payload_digest.name)
canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False)
entry = { entry = {
# id only specified for rethinkdb partitioning # id only specified for rethinkdb partitioning
"id": "{} {}".format(canon_surt[:20], record.id.decode("utf-8")[10:-1]), "id": "{} {}".format(canon_surt[:20], records[0].id.decode("utf-8")[10:-1]),
"abbr_canon_surt": canon_surt[:150], "abbr_canon_surt": canon_surt[:150],
"timestamp": re.sub(r"[^0-9]", "", record.date.decode("utf-8")), # "timestamp": re.sub(r"[^0-9]", "", records[0].date.decode("utf-8")),
"url": record.url.decode("utf-8"), "timestamp": records[0].date.decode("utf-8"),
"offset": offset, "url": recorded_url.url.decode("utf-8"),
"filename": os.path.basename(warc_file), "offset": records[0].offset,
"warc_type": record.type.decode("utf-8"), "filename": os.path.basename(records[0].warc_filename),
"warc_id": record.id.decode("utf-8"), "warc_type": records[0].type.decode("utf-8"),
"sha1base32": record.get_header(b'WARC-Payload-Digest').decode("utf-8")[5:], "warc_id": records[0].id.decode("utf-8"),
# mimetype "sha1base32": base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8"),
# response_code "content_type": recorded_url.content_type,
# http_method "response_code": recorded_url.status,
"http_method": recorded_url.method,
} }
with self._random_server_connection() as conn:
result = r.db(self.db).table(self.table).insert(entry).run(conn)
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
raise Exception("unexpected result %s saving %s", result, entry)
self.logger.info('big capture table db saved %s', entry)
class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, captures_db, options=warcprox.Options()):
self.captures_db = captures_db
self.options = options
def lookup(self, digest_key):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
algo, value_str = k.split(":")
self.logger.info("(algo,value_str)=(%s,%s)", algo, value_str)
if self.options.base32:
raw_digest = base64.b32decode(value_str, casefold=True)
else:
raw_digest = base64.b16decode(value_str, casefold=True)
entry = self.captures_db.find_response_by_digest(algo, raw_digest)
if entry:
dedup_info = {"url":entry["url"].encode("utf-8"), "date":entry["timestamp"].encode("utf-8"), "id":entry["warc_id"].encode("utf-8")}
self.logger.info("returning %s for digest_key=%s", dedup_info, digest_key)
return dedup_info
else:
return None
def close(self):
pass

View File

@ -70,7 +70,6 @@ class DedupDb(object):
self.save(key, records[0]) self.save(key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest:
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
@ -123,7 +122,7 @@ class RethinkDedupDb:
record = {'key':k,'url':url,'date':date,'id':record_id} record = {'key':k,'url':url,'date':date,'id':record_id}
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn)
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record) raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', key, record) self.logger.debug('dedup db saved %s:%s', key, record)

View File

@ -122,7 +122,7 @@ def main(argv=sys.argv):
if args.rethinkdb_servers: if args.rethinkdb_servers:
if args.rethinkdb_big_table: if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(bigtable, options=options) dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options)
listeners.append(captures_db) listeners.append(captures_db)
else: else:
dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)

View File

@ -4,8 +4,16 @@ import pytest
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers', parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
parser.addoption('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def rethinkdb_servers(request): def rethinkdb_servers(request):
return request.config.getoption("--rethinkdb-servers") return request.config.getoption("--rethinkdb-servers")
@pytest.fixture(scope="module")
def rethinkdb_big_table(request):
return request.config.getoption("--rethinkdb-big-table")

View File

@ -131,25 +131,62 @@ def https_daemon(request, cert):
return https_daemon return https_daemon
# @pytest.fixture(scope="module")
# def options(request):
# return warcprox.Options(base32=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def dedup_db(request, rethinkdb_servers): def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
captures_db = None
if rethinkdb_servers: if rethinkdb_servers:
servers = rethinkdb_servers.split(",") servers = rethinkdb_servers.split(",")
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) if rethinkdb_big_table:
ddb = warcprox.dedup.RethinkDedupDb(servers, db) db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
else: captures_db = warcprox.bigtable.RethinkCaptures(servers, db)
def fin():
if captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
with captures_db._random_server_connection() as conn:
result = r.db_drop(db).run(conn)
logging.info("result=%s", result)
request.addfinalizer(fin)
return captures_db
@pytest.fixture(scope="module")
def rethink_dedup_db(request, rethinkdb_servers, captures_db):
ddb = None
if rethinkdb_servers:
if captures_db:
ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db)
else:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
ddb = warcprox.dedup.RethinkDedupDb(servers, db)
def fin():
if not captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
with ddb._random_server_connection() as conn:
result = r.db_drop(db).run(conn)
logging.info("result=%s", result)
request.addfinalizer(fin)
return ddb
@pytest.fixture(scope="module")
def dedup_db(request, rethink_dedup_db):
dedup_db_file = None
ddb = rethink_dedup_db
if not ddb:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
f.close() f.close()
dedup_db_file = f.name dedup_db_file = f.name
ddb = warcprox.dedup.DedupDb(dedup_db_file) ddb = warcprox.dedup.DedupDb(dedup_db_file)
def fin(): def fin():
if rethinkdb_servers: if dedup_db_file:
logging.info('dropping rethinkdb database {}'.format(db))
with ddb._random_server_connection() as conn:
result = r.db_drop(db).run(conn)
logging.info("result=%s", result)
else:
logging.info('deleting file {}'.format(dedup_db_file)) logging.info('deleting file {}'.format(dedup_db_file))
os.unlink(dedup_db_file) os.unlink(dedup_db_file)
request.addfinalizer(fin) request.addfinalizer(fin)
@ -182,7 +219,7 @@ def stats_db(request, rethinkdb_servers):
return sdb return sdb
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def warcprox_(request, dedup_db, stats_db): def warcprox_(request, captures_db, dedup_db, stats_db):
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True) f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
f.close() # delete it, or CertificateAuthority will try to read it f.close() # delete it, or CertificateAuthority will try to read it
ca_file = f.name ca_file = f.name
@ -208,7 +245,7 @@ def warcprox_(request, dedup_db, stats_db):
writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer) writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.writerthread.WarcWriterThread( warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool, recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=[dedup_db, playback_index_db, stats_db]) dedup_db=dedup_db, listeners=[captures_db or dedup_db, playback_index_db, stats_db])
warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
logging.info('starting warcprox') logging.info('starting warcprox')