From 895683e062d487813536e9bc0d4804b6cabaf539 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:45:49 -0800 Subject: [PATCH] more cleanly separate trough client code from the rest of TroughDedup --- warcprox/dedup.py | 188 +++++++++++++++++++++++++--------------------- 1 file changed, 103 insertions(+), 85 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 9cb0bc5..5c56752 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -253,13 +253,42 @@ class TroughClient(object): self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) + self._write_url_cache = {} + self._read_url_cache = {} + + @staticmethod + def sql_value(x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + elif isinstance(x, (int, float)): + return x + else: + raise Exception( + "don't know how to make an sql value from %r (%r)" % ( + x, type(x))) def segment_manager_url(self): master_node = self.svcreg.unique_service('trough-sync-master') assert master_node return master_node['url'] - def write_url(self, segment_id, schema_id='default'): + def write_url_nocache(self, segment_id, schema_id='default'): provision_url = os.path.join(self.segment_manager_url(), 'provision') payload_dict = {'segment': segment_id, 'schema': schema_id} response = requests.post(provision_url, json=payload_dict) @@ -272,7 +301,7 @@ class TroughClient(object): # assert result_dict['schema'] == schema_id # previously provisioned? return result_dict['write_url'] - def read_url(self, segment_id): + def read_url_nocache(self, segment_id): reql = self.rr.table('services').get_all( segment_id, index='segment').filter( {'role':'trough-read'}).filter( @@ -286,6 +315,69 @@ class TroughClient(object): else: return None + def write_url(self, segment_id, schema_id='default'): + if not segment_id in self._write_url_cache: + self._write_url_cache[segment_id] = self.write_url_nocache( + segment_id, schema_id) + self.logger.info( + 'segment %r write url is %r', segment_id, + self._write_url_cache[segment_id]) + return self._write_url_cache[segment_id] + + def read_url(self, segment_id): + if not self._read_url_cache.get(segment_id): + self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) + self.logger.info( + 'segment %r read url is %r', segment_id, + self._read_url_cache[segment_id]) + return self._read_url_cache[segment_id] + + def write(self, segment_id, sql_tmpl, values, schema_id='default'): + write_url = self.write_url(segment_id, schema_id) + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + + try: + response = requests.post(write_url, sql) + except: + del self._write_url_cache[segment_id] + self.logger.error( + 'problem with trough write url %r', write_url, + exc_info=True) + return + if response.status_code != 200: + del self._write_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + write_url, sql) + return + self.logger.debug('posted %r to %s', sql, write_url) + + def read(self, segment_id, sql_tmpl, values): + read_url = self.read_url(segment_id) + if not read_url: + return None + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + try: + response = requests.post(read_url, sql) + except: + del self._read_url_cache[segment_id] + self.logger.error( + 'problem with trough read url %r', read_url, exc_info=True) + return None + if response.status_code != 200: + del self._read_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + read_url, sql) + return None + self.logger.trace( + 'got %r from posting query %r to %r', response.text, sql, + read_url) + results = json.loads(response.text) + return results + def schema_exists(self, schema_id): url = os.path.join(self.segment_manager_url(), 'schema', schema_id) response = requests.get(url) @@ -316,104 +408,30 @@ class TroughDedupDb(object): ' url varchar(2100) not null,\n' ' date datetime not null,\n' ' id varchar(100));\n') # warc record id + WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' + 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) - self._write_url_cache = {} - self._read_url_cache = {} def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) - def _write_url(self, bucket): - if not bucket in self._write_url_cache: - self._write_url_cache[bucket] = self._trough_cli.write_url( - bucket, self.SCHEMA_ID) - self.logger.info( - 'trough dedup bucket %r write url is %r', bucket, - self._write_url_cache[bucket]) - return self._write_url_cache[bucket] - - def _read_url(self, bucket): - if not self._read_url_cache.get(bucket): - self._read_url_cache[bucket] = self._trough_cli.read_url(bucket) - self.logger.info( - 'trough dedup bucket %r read url is %r', bucket, - self._read_url_cache[bucket]) - return self._read_url_cache[bucket] - - def sql_value(self, x): - if x is None: - return 'null' - elif isinstance(x, datetime.datetime): - return 'datetime(%r)' % x.isoformat() - elif isinstance(x, bool): - return int(x) - elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r - else: - return r[1:] - else: - raise Exception("don't know how to make an sql value from %r" % x) - def save(self, digest_key, response_record, bucket='__unspecified__'): - write_url = self._write_url(bucket) record_id = response_record.get_header(warctools.WarcRecord.ID) url = response_record.get_header(warctools.WarcRecord.URL) warc_date = response_record.get_header(warctools.WarcRecord.DATE) - - sql = ('insert into dedup (digest_key, url, date, id) ' - 'values (%s, %s, %s, %s);') % ( - self.sql_value(digest_key), self.sql_value(url), - self.sql_value(warc_date), self.sql_value(record_id)) - try: - response = requests.post(write_url, sql) - except: - self.logger.error( - 'problem with trough write url %r', write_url, - exc_info=True) - del self._write_url_cache[bucket] - return - if response.status_code != 200: - del self._write_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - else: - self.logger.debug('posted %r to %s', sql, write_url) + self._trough_cli.write( + bucket, self.WRITE_SQL_TMPL, + (digest_key, url, warc_date, record_id), self.SCHEMA_ID) def lookup(self, digest_key, bucket='__unspecified__', url=None): - read_url = self._read_url(bucket) - if not read_url: - return None - sql = 'select * from dedup where digest_key=%s;' % ( - self.sql_value(digest_key)) - try: - response = requests.post(read_url, sql) - except: - self.logger.error( - 'problem with trough read url %r', read_url, exc_info=True) - del self._read_url_cache[bucket] - return None - if response.status_code != 200: - del self._read_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - return None - self.logger.trace('got %r from query %r', response.text, sql) - results = json.loads(response.text) - assert len(results) <= 1 # sanity check (digest_key is primary key) + results = self._trough_cli.read( + bucket, 'select * from dedup where digest_key=%s;', + (digest_key,)) if results: + assert len(results) == 1 # sanity check (digest_key is primary key) result = results[0] result['id'] = result['id'].encode('ascii') result['url'] = result['url'].encode('ascii')