trough for deduplication initial proof-of-concept-ish code

This commit is contained in:
Noah Levitt 2017-10-03 12:41:04 -07:00
parent 9b8043d3a2
commit 4eda89f232
2 changed files with 110 additions and 1 deletions

View File

@ -27,6 +27,7 @@ import json
from hanzo import warctools
import warcprox
import sqlite3
import requests
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -174,3 +175,107 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class TroughDedupDb(object):
'''
https://github.com/jkafader/trough
'''
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
def __init__(self, options=warcprox.Options()):
self.options = options
def start(self):
pass
def stop(self):
pass
def _write_url(self, bucket):
import doublethink
segment_id = 'warcprox-trough-%s' % bucket
rr = doublethink.Rethinker(
servers=['localhost'], db='trough_configuration')
services = doublethink.ServiceRegistry(rr)
master_node = services.unique_service('trough-sync-master')
response = requests.post(master_node['url'], segment_id)
response.raise_for_status()
write_url = response.text.strip()
return write_url
def _read_url(self, bucket):
import doublethink
import rethinkdb as r
segment_id = 'warcprox-trough-%s' % bucket
rr = doublethink.Rethinker(
servers=['localhost'], db='trough_configuration')
reql = rr.table('services').get_all(segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
logging.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def save(self, digest_key, response_record, bucket='__unspecified__'):
write_url = self._write_url(bucket)
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('ascii')
url = response_record.get_header(warctools.WarcRecord.URL).decode('ascii')
warc_date = response_record.get_header(warctools.WarcRecord.DATE).decode('ascii')
# XXX create table statement here is a temporary hack,
# see https://webarchive.jira.com/browse/AITFIVE-1465
sql = ('create table if not exists dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n' # warc record id
'insert into dedup (digest_key, url, date, id) '
'values (%r, %r, %r, %r);') % (
digest_key.decode('ascii'), url, warc_date, record_id)
response = requests.post(write_url, sql)
if response.status_code != 200:
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
def lookup(self, digest_key, bucket='__unspecified__'):
read_url = self._read_url(bucket)
if not read_url:
return None
sql = 'select * from dedup where digest_key=%r;' % digest_key.decode('ascii')
response = requests.post(read_url, sql)
if response.status_code != 200:
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
return None
logging.debug('got %r from query %r', response.text, sql)
results = json.loads(response.text)
assert len(results) <= 1 # sanity check (digest_key is primary key)
if results:
result = results[0]
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
self.logger.debug(
'trough lookup of key=%r returning %r', digest_key, result)
return result
else:
return None
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest,
self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['captures-bucket'])
else:
self.save(digest_key, records[0])

View File

@ -108,6 +108,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
@ -177,7 +178,10 @@ def init_controller(args):
exit(1)
listeners = []
if args.rethinkdb_servers:
if args.trough:
dedup_db = warcprox.dedup.TroughDedupDb(options)
listeners.append(dedup_db)
elif args.rethinkdb_servers:
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
if args.rethinkdb_big_table: