From 6d673ee35f1e7ae3ce00724fd8f1c07c524de848 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 25 Aug 2015 01:26:51 +0000 Subject: [PATCH] tests pass with big rethinkdb captures table --- warcprox/__init__.py | 1 + warcprox/bigtable.py | 93 ++++++++++++++++++++++++++------- warcprox/dedup.py | 3 +- warcprox/main.py | 2 +- warcprox/tests/conftest.py | 8 +++ warcprox/tests/test_warcprox.py | 61 ++++++++++++++++----- 6 files changed, 135 insertions(+), 33 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 4f8ad91..994b919 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -31,3 +31,4 @@ import warcprox.writer as writer import warcprox.warc as warc import warcprox.writerthread as writerthread import warcprox.stats as stats +import warcprox.bigtable as bigtable diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 787aa9b..f1494d6 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -7,16 +7,21 @@ from hanzo import warctools import rethinkdb r = rethinkdb import random +import warcprox +import base64 +import surt +import os class RethinkCaptures: - logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") + logger = logging.getLogger("warcprox.dedup.RethinkCaptures") - def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3): + def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()): self.servers = servers self.db = db self.table = table self.shards = shards self.replicas = replicas + self.options = options self._ensure_db_table() # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py @@ -38,27 +43,79 @@ class RethinkCaptures: tables = r.db(self.db).table_list().run(conn) if not self.table in tables: self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) - r.db(db).table_create(table, shards=3, replicas=3).run(conn) - r.db(db).table(table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) - r.db(db).table(table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) - # r.db(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn) + r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn) + r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) + r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) + # r.dself.b(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn) # r.db(self.db).table(self.table).index_create("timestamp").run(conn) # r.db(self.db).table(self.table).index_create("sha1base32").run(conn) + def find_response_by_digest(self, algo, raw_digest): + if algo != "sha1": + raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) + sha1base32 = base64.b32encode(raw_digest).decode("utf-8") + with self._random_server_connection() as conn: + cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response"], index="sha1_warc_type").run(conn) + results = list(cursor) + if len(results) > 1: + raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) + elif len(results) == 1: + result = results[0] + else: + result = None + self.logger.info("returning %s for sha1base32=%s", result, sha1base32) + return result + def notify(self, recorded_url, records): - canon_surt = surt.surt(recorded_url.url, trailing_comma=True, host_massage=False) + if recorded_url.response_recorder.payload_digest.name != "sha1": + self.logger.warn("digest type is %s but big capture table is indexed by sha1", recorded_url.response_recorder.payload_digest.name) + + canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False) entry = { # id only specified for rethinkdb partitioning - "id": "{} {}".format(canon_surt[:20], record.id.decode("utf-8")[10:-1]), + "id": "{} {}".format(canon_surt[:20], records[0].id.decode("utf-8")[10:-1]), "abbr_canon_surt": canon_surt[:150], - "timestamp": re.sub(r"[^0-9]", "", record.date.decode("utf-8")), - "url": record.url.decode("utf-8"), - "offset": offset, - "filename": os.path.basename(warc_file), - "warc_type": record.type.decode("utf-8"), - "warc_id": record.id.decode("utf-8"), - "sha1base32": record.get_header(b'WARC-Payload-Digest').decode("utf-8")[5:], - # mimetype - # response_code - # http_method + # "timestamp": re.sub(r"[^0-9]", "", records[0].date.decode("utf-8")), + "timestamp": records[0].date.decode("utf-8"), + "url": recorded_url.url.decode("utf-8"), + "offset": records[0].offset, + "filename": os.path.basename(records[0].warc_filename), + "warc_type": records[0].type.decode("utf-8"), + "warc_id": records[0].id.decode("utf-8"), + "sha1base32": base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8"), + "content_type": recorded_url.content_type, + "response_code": recorded_url.status, + "http_method": recorded_url.method, } + + with self._random_server_connection() as conn: + result = r.db(self.db).table(self.table).insert(entry).run(conn) + if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: + raise Exception("unexpected result %s saving %s", result, entry) + self.logger.info('big capture table db saved %s', entry) + +class RethinkCapturesDedup: + logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") + + def __init__(self, captures_db, options=warcprox.Options()): + self.captures_db = captures_db + self.options = options + + def lookup(self, digest_key): + k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key + algo, value_str = k.split(":") + self.logger.info("(algo,value_str)=(%s,%s)", algo, value_str) + if self.options.base32: + raw_digest = base64.b32decode(value_str, casefold=True) + else: + raw_digest = base64.b16decode(value_str, casefold=True) + entry = self.captures_db.find_response_by_digest(algo, raw_digest) + if entry: + dedup_info = {"url":entry["url"].encode("utf-8"), "date":entry["timestamp"].encode("utf-8"), "id":entry["warc_id"].encode("utf-8")} + self.logger.info("returning %s for digest_key=%s", dedup_info, digest_key) + return dedup_info + else: + return None + + def close(self): + pass diff --git a/warcprox/dedup.py b/warcprox/dedup.py index adf2c44..44c5503 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -70,7 +70,6 @@ class DedupDb(object): self.save(key, records[0]) - def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) @@ -123,7 +122,7 @@ class RethinkDedupDb: record = {'key':k,'url':url,'date':date,'id':record_id} with self._random_server_connection() as conn: result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) - if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', key, record) diff --git a/warcprox/main.py b/warcprox/main.py index 3a2d032..eb20db6 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -122,7 +122,7 @@ def main(argv=sys.argv): if args.rethinkdb_servers: if args.rethinkdb_big_table: captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) - dedup_db = warcprox.bigtable.RethinkCapturesDedup(bigtable, options=options) + dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options) listeners.append(captures_db) else: dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) diff --git a/warcprox/tests/conftest.py b/warcprox/tests/conftest.py index db27210..f417fed 100644 --- a/warcprox/tests/conftest.py +++ b/warcprox/tests/conftest.py @@ -4,8 +4,16 @@ import pytest def pytest_addoption(parser): parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers', help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + parser.addoption('--rethinkdb-big-table', + dest='rethinkdb_big_table', action='store_true', default=False, + help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') @pytest.fixture(scope="module") def rethinkdb_servers(request): return request.config.getoption("--rethinkdb-servers") +@pytest.fixture(scope="module") +def rethinkdb_big_table(request): + return request.config.getoption("--rethinkdb-big-table") + + diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 7477d05..477ce6f 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -131,25 +131,62 @@ def https_daemon(request, cert): return https_daemon +# @pytest.fixture(scope="module") +# def options(request): +# return warcprox.Options(base32=True) + @pytest.fixture(scope="module") -def dedup_db(request, rethinkdb_servers): +def captures_db(request, rethinkdb_servers, rethinkdb_big_table): + captures_db = None if rethinkdb_servers: servers = rethinkdb_servers.split(",") - db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - ddb = warcprox.dedup.RethinkDedupDb(servers, db) - else: + if rethinkdb_big_table: + db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + captures_db = warcprox.bigtable.RethinkCaptures(servers, db) + + def fin(): + if captures_db: + logging.info('dropping rethinkdb database {}'.format(db)) + with captures_db._random_server_connection() as conn: + result = r.db_drop(db).run(conn) + logging.info("result=%s", result) + request.addfinalizer(fin) + + return captures_db + +@pytest.fixture(scope="module") +def rethink_dedup_db(request, rethinkdb_servers, captures_db): + ddb = None + if rethinkdb_servers: + if captures_db: + ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db) + else: + servers = rethinkdb_servers.split(",") + db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + ddb = warcprox.dedup.RethinkDedupDb(servers, db) + + def fin(): + if not captures_db: + logging.info('dropping rethinkdb database {}'.format(db)) + with ddb._random_server_connection() as conn: + result = r.db_drop(db).run(conn) + logging.info("result=%s", result) + request.addfinalizer(fin) + + return ddb + +@pytest.fixture(scope="module") +def dedup_db(request, rethink_dedup_db): + dedup_db_file = None + ddb = rethink_dedup_db + if not ddb: f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) f.close() dedup_db_file = f.name ddb = warcprox.dedup.DedupDb(dedup_db_file) def fin(): - if rethinkdb_servers: - logging.info('dropping rethinkdb database {}'.format(db)) - with ddb._random_server_connection() as conn: - result = r.db_drop(db).run(conn) - logging.info("result=%s", result) - else: + if dedup_db_file: logging.info('deleting file {}'.format(dedup_db_file)) os.unlink(dedup_db_file) request.addfinalizer(fin) @@ -182,7 +219,7 @@ def stats_db(request, rethinkdb_servers): return sdb @pytest.fixture(scope="module") -def warcprox_(request, dedup_db, stats_db): +def warcprox_(request, captures_db, dedup_db, stats_db): f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True) f.close() # delete it, or CertificateAuthority will try to read it ca_file = f.name @@ -208,7 +245,7 @@ def warcprox_(request, dedup_db, stats_db): writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer) warc_writer_thread = warcprox.writerthread.WarcWriterThread( recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, listeners=[dedup_db, playback_index_db, stats_db]) + dedup_db=dedup_db, listeners=[captures_db or dedup_db, playback_index_db, stats_db]) warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) logging.info('starting warcprox')