cache trough read and write urls

This commit is contained in:
Noah Levitt 2017-11-03 13:48:00 -07:00
parent ab99fe52b9
commit 147b097a53
2 changed files with 23 additions and 5 deletions

View File

@ -84,7 +84,8 @@ def _send(self, data):
# http_client.HTTPConnection.send = _send
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
# stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
stream=sys.stdout, level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
@ -424,6 +425,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -497,6 +499,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])

View File

@ -322,17 +322,30 @@ class TroughDedupDb(object):
def __init__(self, options=warcprox.Options()):
self.options = options
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
self._write_url_cache = {}
self._read_url_cache = {}
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
def _write_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
return self._trough_cli.write_url(segment_id, self.SCHEMA_ID)
if not bucket in self._write_url_cache:
segment_id = 'warcprox-trough-%s' % bucket
self._write_url_cache[bucket] = self._trough_cli.write_url(
segment_id, self.SCHEMA_ID)
logging.info(
'bucket %r write url is %r', bucket,
self._write_url_cache[bucket])
return self._write_url_cache[bucket]
def _read_url(self, bucket):
segment_id = 'warcprox-trough-%s' % bucket
return self._trough_cli.read_url(segment_id)
if not self._read_url_cache.get(bucket):
segment_id = 'warcprox-trough-%s' % bucket
self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id)
logging.info(
'bucket %r read url is %r', bucket,
self._read_url_cache[bucket])
return self._read_url_cache[bucket]
def sql_value(self, x):
if x is None:
@ -371,6 +384,8 @@ class TroughDedupDb(object):
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
else:
logging.trace('posted %r to %s', sql, write_url)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
read_url = self._read_url(bucket)