mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
more cleanly separate trough client code from the rest of TroughDedup
This commit is contained in:
parent
43c36cae10
commit
895683e062
@ -253,13 +253,42 @@ class TroughClient(object):
|
|||||||
self.rr = doublethink.Rethinker(
|
self.rr = doublethink.Rethinker(
|
||||||
servers=parsed.hosts, db=parsed.database)
|
servers=parsed.hosts, db=parsed.database)
|
||||||
self.svcreg = doublethink.ServiceRegistry(self.rr)
|
self.svcreg = doublethink.ServiceRegistry(self.rr)
|
||||||
|
self._write_url_cache = {}
|
||||||
|
self._read_url_cache = {}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sql_value(x):
|
||||||
|
if x is None:
|
||||||
|
return 'null'
|
||||||
|
elif isinstance(x, datetime.datetime):
|
||||||
|
return 'datetime(%r)' % x.isoformat()
|
||||||
|
elif isinstance(x, bool):
|
||||||
|
return int(x)
|
||||||
|
elif isinstance(x, str) or isinstance(x, bytes):
|
||||||
|
# py3: repr(u'abc') => 'abc'
|
||||||
|
# repr(b'abc') => b'abc'
|
||||||
|
# py2: repr(u'abc') => u'abc'
|
||||||
|
# repr(b'abc') => 'abc'
|
||||||
|
# Repr gives us a prefix we don't want in different situations
|
||||||
|
# depending on whether this is py2 or py3. Chop it off either way.
|
||||||
|
r = repr(x)
|
||||||
|
if r[:1] == "'":
|
||||||
|
return r
|
||||||
|
else:
|
||||||
|
return r[1:]
|
||||||
|
elif isinstance(x, (int, float)):
|
||||||
|
return x
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
"don't know how to make an sql value from %r (%r)" % (
|
||||||
|
x, type(x)))
|
||||||
|
|
||||||
def segment_manager_url(self):
|
def segment_manager_url(self):
|
||||||
master_node = self.svcreg.unique_service('trough-sync-master')
|
master_node = self.svcreg.unique_service('trough-sync-master')
|
||||||
assert master_node
|
assert master_node
|
||||||
return master_node['url']
|
return master_node['url']
|
||||||
|
|
||||||
def write_url(self, segment_id, schema_id='default'):
|
def write_url_nocache(self, segment_id, schema_id='default'):
|
||||||
provision_url = os.path.join(self.segment_manager_url(), 'provision')
|
provision_url = os.path.join(self.segment_manager_url(), 'provision')
|
||||||
payload_dict = {'segment': segment_id, 'schema': schema_id}
|
payload_dict = {'segment': segment_id, 'schema': schema_id}
|
||||||
response = requests.post(provision_url, json=payload_dict)
|
response = requests.post(provision_url, json=payload_dict)
|
||||||
@ -272,7 +301,7 @@ class TroughClient(object):
|
|||||||
# assert result_dict['schema'] == schema_id # previously provisioned?
|
# assert result_dict['schema'] == schema_id # previously provisioned?
|
||||||
return result_dict['write_url']
|
return result_dict['write_url']
|
||||||
|
|
||||||
def read_url(self, segment_id):
|
def read_url_nocache(self, segment_id):
|
||||||
reql = self.rr.table('services').get_all(
|
reql = self.rr.table('services').get_all(
|
||||||
segment_id, index='segment').filter(
|
segment_id, index='segment').filter(
|
||||||
{'role':'trough-read'}).filter(
|
{'role':'trough-read'}).filter(
|
||||||
@ -286,6 +315,69 @@ class TroughClient(object):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def write_url(self, segment_id, schema_id='default'):
|
||||||
|
if not segment_id in self._write_url_cache:
|
||||||
|
self._write_url_cache[segment_id] = self.write_url_nocache(
|
||||||
|
segment_id, schema_id)
|
||||||
|
self.logger.info(
|
||||||
|
'segment %r write url is %r', segment_id,
|
||||||
|
self._write_url_cache[segment_id])
|
||||||
|
return self._write_url_cache[segment_id]
|
||||||
|
|
||||||
|
def read_url(self, segment_id):
|
||||||
|
if not self._read_url_cache.get(segment_id):
|
||||||
|
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
|
||||||
|
self.logger.info(
|
||||||
|
'segment %r read url is %r', segment_id,
|
||||||
|
self._read_url_cache[segment_id])
|
||||||
|
return self._read_url_cache[segment_id]
|
||||||
|
|
||||||
|
def write(self, segment_id, sql_tmpl, values, schema_id='default'):
|
||||||
|
write_url = self.write_url(segment_id, schema_id)
|
||||||
|
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(write_url, sql)
|
||||||
|
except:
|
||||||
|
del self._write_url_cache[segment_id]
|
||||||
|
self.logger.error(
|
||||||
|
'problem with trough write url %r', write_url,
|
||||||
|
exc_info=True)
|
||||||
|
return
|
||||||
|
if response.status_code != 200:
|
||||||
|
del self._write_url_cache[segment_id]
|
||||||
|
self.logger.warn(
|
||||||
|
'unexpected response %r %r %r from %r to sql=%r',
|
||||||
|
response.status_code, response.reason, response.text,
|
||||||
|
write_url, sql)
|
||||||
|
return
|
||||||
|
self.logger.debug('posted %r to %s', sql, write_url)
|
||||||
|
|
||||||
|
def read(self, segment_id, sql_tmpl, values):
|
||||||
|
read_url = self.read_url(segment_id)
|
||||||
|
if not read_url:
|
||||||
|
return None
|
||||||
|
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
|
||||||
|
try:
|
||||||
|
response = requests.post(read_url, sql)
|
||||||
|
except:
|
||||||
|
del self._read_url_cache[segment_id]
|
||||||
|
self.logger.error(
|
||||||
|
'problem with trough read url %r', read_url, exc_info=True)
|
||||||
|
return None
|
||||||
|
if response.status_code != 200:
|
||||||
|
del self._read_url_cache[segment_id]
|
||||||
|
self.logger.warn(
|
||||||
|
'unexpected response %r %r %r from %r to sql=%r',
|
||||||
|
response.status_code, response.reason, response.text,
|
||||||
|
read_url, sql)
|
||||||
|
return None
|
||||||
|
self.logger.trace(
|
||||||
|
'got %r from posting query %r to %r', response.text, sql,
|
||||||
|
read_url)
|
||||||
|
results = json.loads(response.text)
|
||||||
|
return results
|
||||||
|
|
||||||
def schema_exists(self, schema_id):
|
def schema_exists(self, schema_id):
|
||||||
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
|
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
|
||||||
response = requests.get(url)
|
response = requests.get(url)
|
||||||
@ -316,104 +408,30 @@ class TroughDedupDb(object):
|
|||||||
' url varchar(2100) not null,\n'
|
' url varchar(2100) not null,\n'
|
||||||
' date datetime not null,\n'
|
' date datetime not null,\n'
|
||||||
' id varchar(100));\n') # warc record id
|
' id varchar(100));\n') # warc record id
|
||||||
|
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
|
||||||
|
'values (%s, %s, %s, %s);')
|
||||||
|
|
||||||
def __init__(self, options=warcprox.Options()):
|
def __init__(self, options=warcprox.Options()):
|
||||||
self.options = options
|
self.options = options
|
||||||
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
|
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
|
||||||
self._write_url_cache = {}
|
|
||||||
self._read_url_cache = {}
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
||||||
|
|
||||||
def _write_url(self, bucket):
|
|
||||||
if not bucket in self._write_url_cache:
|
|
||||||
self._write_url_cache[bucket] = self._trough_cli.write_url(
|
|
||||||
bucket, self.SCHEMA_ID)
|
|
||||||
self.logger.info(
|
|
||||||
'trough dedup bucket %r write url is %r', bucket,
|
|
||||||
self._write_url_cache[bucket])
|
|
||||||
return self._write_url_cache[bucket]
|
|
||||||
|
|
||||||
def _read_url(self, bucket):
|
|
||||||
if not self._read_url_cache.get(bucket):
|
|
||||||
self._read_url_cache[bucket] = self._trough_cli.read_url(bucket)
|
|
||||||
self.logger.info(
|
|
||||||
'trough dedup bucket %r read url is %r', bucket,
|
|
||||||
self._read_url_cache[bucket])
|
|
||||||
return self._read_url_cache[bucket]
|
|
||||||
|
|
||||||
def sql_value(self, x):
|
|
||||||
if x is None:
|
|
||||||
return 'null'
|
|
||||||
elif isinstance(x, datetime.datetime):
|
|
||||||
return 'datetime(%r)' % x.isoformat()
|
|
||||||
elif isinstance(x, bool):
|
|
||||||
return int(x)
|
|
||||||
elif isinstance(x, str) or isinstance(x, bytes):
|
|
||||||
# py3: repr(u'abc') => 'abc'
|
|
||||||
# repr(b'abc') => b'abc'
|
|
||||||
# py2: repr(u'abc') => u'abc'
|
|
||||||
# repr(b'abc') => 'abc'
|
|
||||||
# Repr gives us a prefix we don't want in different situations
|
|
||||||
# depending on whether this is py2 or py3. Chop it off either way.
|
|
||||||
r = repr(x)
|
|
||||||
if r[:1] == "'":
|
|
||||||
return r
|
|
||||||
else:
|
|
||||||
return r[1:]
|
|
||||||
else:
|
|
||||||
raise Exception("don't know how to make an sql value from %r" % x)
|
|
||||||
|
|
||||||
def save(self, digest_key, response_record, bucket='__unspecified__'):
|
def save(self, digest_key, response_record, bucket='__unspecified__'):
|
||||||
write_url = self._write_url(bucket)
|
|
||||||
record_id = response_record.get_header(warctools.WarcRecord.ID)
|
record_id = response_record.get_header(warctools.WarcRecord.ID)
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL)
|
url = response_record.get_header(warctools.WarcRecord.URL)
|
||||||
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
|
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
|
||||||
|
self._trough_cli.write(
|
||||||
sql = ('insert into dedup (digest_key, url, date, id) '
|
bucket, self.WRITE_SQL_TMPL,
|
||||||
'values (%s, %s, %s, %s);') % (
|
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
|
||||||
self.sql_value(digest_key), self.sql_value(url),
|
|
||||||
self.sql_value(warc_date), self.sql_value(record_id))
|
|
||||||
try:
|
|
||||||
response = requests.post(write_url, sql)
|
|
||||||
except:
|
|
||||||
self.logger.error(
|
|
||||||
'problem with trough write url %r', write_url,
|
|
||||||
exc_info=True)
|
|
||||||
del self._write_url_cache[bucket]
|
|
||||||
return
|
|
||||||
if response.status_code != 200:
|
|
||||||
del self._write_url_cache[bucket]
|
|
||||||
self.logger.warn(
|
|
||||||
'unexpected response %r %r %r to sql=%r',
|
|
||||||
response.status_code, response.reason, response.text, sql)
|
|
||||||
else:
|
|
||||||
self.logger.debug('posted %r to %s', sql, write_url)
|
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket='__unspecified__', url=None):
|
def lookup(self, digest_key, bucket='__unspecified__', url=None):
|
||||||
read_url = self._read_url(bucket)
|
results = self._trough_cli.read(
|
||||||
if not read_url:
|
bucket, 'select * from dedup where digest_key=%s;',
|
||||||
return None
|
(digest_key,))
|
||||||
sql = 'select * from dedup where digest_key=%s;' % (
|
|
||||||
self.sql_value(digest_key))
|
|
||||||
try:
|
|
||||||
response = requests.post(read_url, sql)
|
|
||||||
except:
|
|
||||||
self.logger.error(
|
|
||||||
'problem with trough read url %r', read_url, exc_info=True)
|
|
||||||
del self._read_url_cache[bucket]
|
|
||||||
return None
|
|
||||||
if response.status_code != 200:
|
|
||||||
del self._read_url_cache[bucket]
|
|
||||||
self.logger.warn(
|
|
||||||
'unexpected response %r %r %r to sql=%r',
|
|
||||||
response.status_code, response.reason, response.text, sql)
|
|
||||||
return None
|
|
||||||
self.logger.trace('got %r from query %r', response.text, sql)
|
|
||||||
results = json.loads(response.text)
|
|
||||||
assert len(results) <= 1 # sanity check (digest_key is primary key)
|
|
||||||
if results:
|
if results:
|
||||||
|
assert len(results) == 1 # sanity check (digest_key is primary key)
|
||||||
result = results[0]
|
result = results[0]
|
||||||
result['id'] = result['id'].encode('ascii')
|
result['id'] = result['id'].encode('ascii')
|
||||||
result['url'] = result['url'].encode('ascii')
|
result['url'] = result['url'].encode('ascii')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user