diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 6c76ab0..4206f26 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -246,36 +246,35 @@ class CdxServerDedup(object): """ pass -class TroughDedupDb(object): - ''' - https://github.com/jkafader/trough - ''' - logger = logging.getLogger("warcprox.dedup.TroughDedupDb") +class TroughClient(object): + logger = logging.getLogger("warcprox.dedup.TroughClient") - def __init__(self, options=warcprox.Options()): - parsed = doublethink.parse_rethinkdb_url( - options.rethinkdb_trough_db_url) + def __init__(self, rethinkdb_trough_db_url): + parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) - self.options = options - def start(self): - pass - - def stop(self): - pass - - def _write_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket + def segment_manager_url(self): + # XXX cache until expired (check last_heartbeat and ttl) master_node = self.svcreg.unique_service('trough-sync-master') - response = requests.post(master_node['url'], segment_id) - response.raise_for_status() - write_url = response.text.strip() - return write_url + assert master_node + return master_node['url'] - def _read_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket + def write_url(self, segment_id, schema_id='default'): + provision_url = os.path.join(self.segment_manager_url(), 'provision') + payload_dict = {'segment': segment_id, 'schema': schema_id} + response = requests.post(provision_url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, provision_url, + json.dumps(payload_dict))) + result_dict = response.json() + # assert result_dict['schema'] == schema_id # previously provisioned? + return result_dict['write_url'] + + def read_url(self, segment_id): reql = self.rr.table('services').get_all( segment_id, index='segment').filter( {'role':'trough-read'}).filter( @@ -289,6 +288,52 @@ class TroughDedupDb(object): else: return None + def schema_exists(self, schema_id): + url = os.path.join(self.segment_manager_url(), 'schema', schema_id) + response = requests.get(url) + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + response.raise_for_status() + + def register_schema(self, schema_id, sql): + url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + response = requests.put(url, sql) + if response.status_code not in (201, 204): + raise Exception( + 'Received %s: %r in response to PUT %r with data %r' % ( + response.status_code, response.text, sql, url)) + +class TroughDedupDb(object): + ''' + https://github.com/internetarchive/trough + ''' + logger = logging.getLogger("warcprox.dedup.TroughDedupDb") + + SCHEMA_ID = 'warcprox-dedup-v1' + SCHEMA_SQL = ('create table dedup (\n' + ' digest_key varchar(100) primary key,\n' + ' url varchar(2100) not null,\n' + ' date datetime not null,\n' + ' id varchar(100));\n') # warc record id + + def __init__(self, options=warcprox.Options()): + self.options = options + self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) + + def start(self): + self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) + + def _write_url(self, bucket): + segment_id = 'warcprox-trough-%s' % bucket + return self._trough_cli.write_url(segment_id, self.SCHEMA_ID) + + def _read_url(self, bucket): + segment_id = 'warcprox-trough-%s' % bucket + return self._trough_cli.read_url(segment_id) + def sql_value(self, x): if x is None: return 'null' @@ -317,14 +362,7 @@ class TroughDedupDb(object): url = response_record.get_header(warctools.WarcRecord.URL) warc_date = response_record.get_header(warctools.WarcRecord.DATE) - # XXX create table statement here is a temporary hack, - # see https://webarchive.jira.com/browse/AITFIVE-1465 - sql = ('create table if not exists dedup (\n' - ' digest_key varchar(100) primary key,\n' - ' url varchar(2100) not null,\n' - ' date datetime not null,\n' - ' id varchar(100));\n' # warc record id - 'insert into dedup (digest_key, url, date, id) ' + sql = ('insert into dedup (digest_key, url, date, id) ' 'values (%s, %s, %s, %s);') % ( self.sql_value(digest_key), self.sql_value(url), self.sql_value(warc_date), self.sql_value(record_id))