From e66dc3a9fba672d13bbc8f0a58f124609dc34ff2 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 20 Aug 2015 21:46:40 +0000 Subject: [PATCH] rethinkdb dedup --- setup.py | 2 +- warcprox/dedup.py | 72 +++++++++++++++++++++++++++++++++++++--- warcprox/main.py | 18 ++++++++-- warcprox/warc.py | 6 ++-- warcprox/writerthread.py | 2 +- 5 files changed, 88 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index 5bde72b..e5b71d5 100755 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ setuptools.setup(name='warcprox', license='GPL', packages=['warcprox'], package_data={'warcprox':['version.txt']}, - install_requires=['certauth>=1.1.0', 'warctools>=4.8.3'], # gdbm not in pip :( + install_requires=['certauth>=1.1.0', 'warctools>=4.8.3', 'rethinkdb'], # gdbm not in pip :( dependency_links=['git+https://github.com/internetarchive/warctools.git#egg=warctools-4.8.3'], tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 2a99358..a715b01 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -15,6 +15,9 @@ import os import json from hanzo import warctools import warcprox +import rethinkdb +r = rethinkdb +import random class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -36,12 +39,12 @@ class DedupDb(object): except: pass - def save(self, key, response_record, offset): + def save(self, key, response_record): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - py_value = {'i':record_id, 'u':url, 'd':date} + py_value = {'id':record_id, 'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) self.db[key] = json_value.encode('utf-8') @@ -52,9 +55,9 @@ class DedupDb(object): if key in self.db: json_result = self.db[key] result = json.loads(json_result.decode('utf-8')) - result['i'] = result['i'].encode('latin1') - result['u'] = result['u'].encode('latin1') - result['d'] = result['d'].encode('latin1') + result['id'] = result['id'].encode('latin1') + result['url'] = result['url'].encode('latin1') + result['date'] = result['date'].encode('latin1') self.logger.debug('dedup db lookup of key=%s returning %s', key, result) return result @@ -63,3 +66,62 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) recorded_url.dedup_info = dedup_db.lookup(key) +class RethinkDedupDb: + logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") + + def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3): + self.servers = servers + self.db = db + self.table = table + self.shards = shards + self.replicas = replicas + self._ensure_db_table() + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + server = random.choice(self.servers) + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + + def _ensure_db_table(self): + with self._random_server_connection() as conn: + dbs = r.db_list().run(conn) + if not self.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.db)) + r.db_create(self.db).run(conn) + tables = r.db(self.db).table_list().run(conn) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) + r.db(self.db).table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run(conn) + + def close(self): + pass + + def sync(self): + pass + + def save(self, key, response_record): + k = key.decode("utf-8") if isinstance(key, bytes) else key + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') + date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') + record = {'key':k,'url':url,'date':date,'id':record_id} + with self._random_server_connection() as conn: + result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) + if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + raise Exception("unexpected result %s saving %s", result, record) + self.logger.debug('dedup db saved %s:%s', key, record) + + def lookup(self, key): + k = key.decode("utf-8") if isinstance(key, bytes) else key + with self._random_server_connection() as conn: + result = r.db(self.db).table(self.table).get(k).run(conn) + if result: + for x in result: + result[x] = result[x].encode("utf-8") + self.logger.debug('dedup db lookup of key=%s returning %s', key, result) + return result diff --git a/warcprox/main.py b/warcprox/main.py index 52553f9..f6bf322 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -20,6 +20,7 @@ import signal import threading import certauth.certauth import warcprox +import re def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, @@ -55,7 +56,10 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + group = arg_parser.add_mutually_exclusive_group() + group.add_argument('--dedup-rethinkdb-url', dest='dedup_rethinkdb_url', + help='persistent deduplication rethink db url, e.g. rethinkdb://db0.foo.org,db0.foo.org:38015,db1.foo.org/warcprox/dedup') + group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') arg_parser.add_argument('--stats-db-file', dest='stats_db_file', default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') @@ -108,7 +112,17 @@ def main(argv=sys.argv): logging.fatal(e) exit(1) - if args.dedup_db_file in (None, '', '/dev/null'): + if args.dedup_rethinkdb_url: + m = re.fullmatch(r"rethinkdb://([^/]+)/([^/]+)/([^/]+)", args.dedup_rethinkdb_url) + if m: + servers = m.group(1).split(",") + db = m.group(2) + table = m.group(3) + dedup_db = warcprox.dedup.RethinkDedupDb(servers, db, table) + else: + logging.fatal("failed to parse dedup rethinkdb url %s", args.dedup_rethinkdb_url) + exit(1) + elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None else: diff --git a/warcprox/warc.py b/warcprox/warc.py index 91843a7..eaeeedf 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -32,9 +32,9 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, - refers_to=recorded_url.dedup_info['i'], - refers_to_target_uri=recorded_url.dedup_info['u'], - refers_to_date=recorded_url.dedup_info['d'], + refers_to=recorded_url.dedup_info['id'], + refers_to_target_uri=recorded_url.dedup_info['url'], + refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 68c5676..df70d63 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -70,7 +70,7 @@ class WarcWriterThread(threading.Thread): and recorded_url.response_recorder.payload_size() > 0): key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.writer_pool.default_warc_writer.record_builder.base32) - self.dedup_db.save(key, records[0], records[0].offset) + self.dedup_db.save(key, records[0]) def _save_playback_info(self, recorded_url, records): if self.playback_index_db is not None: