update trough dedup to use new segment manager api to register schema sql

This commit is contained in:
Noah Levitt 2017-11-03 12:39:26 -07:00
parent ed49eea4d5
commit ab99fe52b9

View File

@ -246,36 +246,35 @@ class CdxServerDedup(object):
"""
pass
class TroughDedupDb(object):
'''
https://github.com/jkafader/trough
'''
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
class TroughClient(object):
logger = logging.getLogger("warcprox.dedup.TroughClient")
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_trough_db_url)
def __init__(self, rethinkdb_trough_db_url):
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self.options = options
def start(self):
pass
def stop(self):
pass
def _write_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
def segment_manager_url(self):
# XXX cache until expired (check last_heartbeat and ttl)
master_node = self.svcreg.unique_service('trough-sync-master')
response = requests.post(master_node['url'], segment_id)
response.raise_for_status()
write_url = response.text.strip()
return write_url
assert master_node
return master_node['url']
def _read_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
def write_url(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
@ -289,6 +288,52 @@ class TroughDedupDb(object):
else:
return None
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id)
response = requests.put(url, sql)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))
class TroughDedupDb(object):
'''
https://github.com/internetarchive/trough
'''
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
SCHEMA_ID = 'warcprox-dedup-v1'
SCHEMA_SQL = ('create table dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
def __init__(self, options=warcprox.Options()):
self.options = options
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
def _write_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
return self._trough_cli.write_url(segment_id, self.SCHEMA_ID)
def _read_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
return self._trough_cli.read_url(segment_id)
def sql_value(self, x):
if x is None:
return 'null'
@ -317,14 +362,7 @@ class TroughDedupDb(object):
url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
# XXX create table statement here is a temporary hack,
# see https://webarchive.jira.com/browse/AITFIVE-1465
sql = ('create table if not exists dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n' # warc record id
'insert into dedup (digest_key, url, date, id) '
sql = ('insert into dedup (digest_key, url, date, id) '
'values (%s, %s, %s, %s);') % (
self.sql_value(digest_key), self.sql_value(url),
self.sql_value(warc_date), self.sql_value(record_id))