From 4eda89f23264bcd0ab8040a1e408bf71c4825710 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 3 Oct 2017 12:41:04 -0700 Subject: [PATCH] trough for deduplication initial proof-of-concept-ish code --- warcprox/dedup.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++ warcprox/main.py | 6 ++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..41439c2 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -27,6 +27,7 @@ import json from hanzo import warctools import warcprox import sqlite3 +import requests class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -174,3 +175,107 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) +class TroughDedupDb(object): + ''' + https://github.com/jkafader/trough + ''' + logger = logging.getLogger("warcprox.dedup.TroughDedupDb") + + def __init__(self, options=warcprox.Options()): + self.options = options + + def start(self): + pass + + def stop(self): + pass + + def _write_url(self, bucket): + import doublethink + segment_id = 'warcprox-trough-%s' % bucket + rr = doublethink.Rethinker( + servers=['localhost'], db='trough_configuration') + services = doublethink.ServiceRegistry(rr) + master_node = services.unique_service('trough-sync-master') + response = requests.post(master_node['url'], segment_id) + response.raise_for_status() + write_url = response.text.strip() + return write_url + + def _read_url(self, bucket): + import doublethink + import rethinkdb as r + segment_id = 'warcprox-trough-%s' % bucket + rr = doublethink.Rethinker( + servers=['localhost'], db='trough_configuration') + reql = rr.table('services').get_all(segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') + logging.debug('querying rethinkdb: %r', reql) + results = reql.run() + if results: + return results[0]['url'] + else: + return None + + def save(self, digest_key, response_record, bucket='__unspecified__'): + write_url = self._write_url(bucket) + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('ascii') + url = response_record.get_header(warctools.WarcRecord.URL).decode('ascii') + warc_date = response_record.get_header(warctools.WarcRecord.DATE).decode('ascii') + + # XXX create table statement here is a temporary hack, + # see https://webarchive.jira.com/browse/AITFIVE-1465 + sql = ('create table if not exists dedup (\n' + ' digest_key varchar(100) primary key,\n' + ' url varchar(2100) not null,\n' + ' date datetime not null,\n' + ' id varchar(100));\n' # warc record id + 'insert into dedup (digest_key, url, date, id) ' + 'values (%r, %r, %r, %r);') % ( + digest_key.decode('ascii'), url, warc_date, record_id) + response = requests.post(write_url, sql) + if response.status_code != 200: + logging.warn( + 'unexpected response %r %r %r to sql=%r', + response.status_code, response.reason, response.text, sql) + + def lookup(self, digest_key, bucket='__unspecified__'): + read_url = self._read_url(bucket) + if not read_url: + return None + sql = 'select * from dedup where digest_key=%r;' % digest_key.decode('ascii') + response = requests.post(read_url, sql) + if response.status_code != 200: + logging.warn( + 'unexpected response %r %r %r to sql=%r', + response.status_code, response.reason, response.text, sql) + return None + logging.debug('got %r from query %r', response.text, sql) + results = json.loads(response.text) + assert len(results) <= 1 # sanity check (digest_key is primary key) + if results: + result = results[0] + result['id'] = result['id'].encode('ascii') + result['url'] = result['url'].encode('ascii') + result['date'] = result['date'].encode('ascii') + self.logger.debug( + 'trough lookup of key=%r returning %r', digest_key, result) + return result + else: + return None + + def notify(self, recorded_url, records): + if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + digest_key = warcprox.digest_str( + recorded_url.response_recorder.payload_digest, + self.options.base32) + if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + self.save( + digest_key, records[0], + bucket=recorded_url.warcprox_meta['captures-bucket']) + else: + self.save(digest_key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..f4ac391 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -108,6 +108,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument('--rethinkdb-big-table', @@ -177,7 +178,10 @@ def init_controller(args): exit(1) listeners = [] - if args.rethinkdb_servers: + if args.trough: + dedup_db = warcprox.dedup.TroughDedupDb(options) + listeners.append(dedup_db) + elif args.rethinkdb_servers: rr = doublethink.Rethinker( args.rethinkdb_servers.split(","), args.rethinkdb_db) if args.rethinkdb_big_table: