2017-03-02 15:06:26 -08:00
|
|
|
'''
|
2017-05-24 13:57:09 -07:00
|
|
|
warcprox/dedup.py - identical payload digest deduplication using sqlite db
|
2017-03-02 15:06:26 -08:00
|
|
|
|
|
|
|
Copyright (C) 2013-2017 Internet Archive
|
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
|
|
modify it under the terms of the GNU General Public License
|
|
|
|
as published by the Free Software Foundation; either version 2
|
|
|
|
of the License, or (at your option) any later version.
|
|
|
|
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
|
|
along with this program; if not, write to the Free Software
|
|
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
|
|
|
|
USA.
|
|
|
|
'''
|
2016-04-06 19:37:55 -07:00
|
|
|
|
2015-03-18 16:29:44 -07:00
|
|
|
from __future__ import absolute_import
|
|
|
|
|
2014-11-15 03:20:05 -08:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import json
|
|
|
|
from hanzo import warctools
|
2015-07-30 00:12:59 +00:00
|
|
|
import warcprox
|
2017-05-24 13:57:09 -07:00
|
|
|
import sqlite3
|
2017-10-03 12:41:04 -07:00
|
|
|
import requests
|
2017-10-11 12:06:19 -07:00
|
|
|
import doublethink
|
|
|
|
import rethinkdb as r
|
2017-10-13 15:54:05 -07:00
|
|
|
import datetime
|
2017-10-13 17:44:07 +00:00
|
|
|
import urllib3
|
2017-10-19 22:11:22 +00:00
|
|
|
from urllib3.exceptions import HTTPError
|
2017-10-13 17:44:07 +00:00
|
|
|
|
|
|
|
urllib3.disable_warnings()
|
2014-11-15 03:20:05 -08:00
|
|
|
|
|
|
|
class DedupDb(object):
|
2015-03-18 16:29:44 -07:00
|
|
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
def __init__(
|
|
|
|
self, file='./warcprox.sqlite', options=warcprox.Options()):
|
|
|
|
self.file = file
|
2015-08-24 23:53:11 +00:00
|
|
|
self.options = options
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2016-06-16 00:04:59 +00:00
|
|
|
def start(self):
|
2017-05-24 13:57:09 -07:00
|
|
|
if os.path.exists(self.file):
|
|
|
|
self.logger.info(
|
|
|
|
'opening existing deduplication database %s',
|
|
|
|
self.file)
|
|
|
|
else:
|
|
|
|
self.logger.info(
|
|
|
|
'creating new deduplication database %s', self.file)
|
|
|
|
|
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
conn.execute(
|
|
|
|
'create table if not exists dedup ('
|
|
|
|
' key varchar(300) primary key,'
|
|
|
|
' value varchar(4000)'
|
|
|
|
');')
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
2016-06-16 00:04:59 +00:00
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def save(self, digest_key, response_record, bucket=""):
|
2014-11-15 03:20:05 -08:00
|
|
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
|
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
|
|
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
key = digest_key.decode('utf-8') + "|" + bucket
|
2015-08-27 20:09:21 +00:00
|
|
|
|
2015-08-20 21:46:40 +00:00
|
|
|
py_value = {'id':record_id, 'url':url, 'date':date}
|
2014-11-15 03:20:05 -08:00
|
|
|
json_value = json.dumps(py_value, separators=(',',':'))
|
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
conn.execute(
|
2017-05-25 17:38:20 +00:00
|
|
|
'insert or replace into dedup (key, value) values (?, ?)',
|
2017-05-24 13:57:09 -07:00
|
|
|
(key, json_value))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
2015-07-30 00:12:59 +00:00
|
|
|
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2017-10-21 20:24:28 +00:00
|
|
|
def lookup(self, digest_key, bucket="", url=None):
|
2015-07-30 00:12:59 +00:00
|
|
|
result = None
|
2017-05-24 13:57:09 -07:00
|
|
|
key = digest_key.decode('utf-8') + '|' + bucket
|
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
cursor = conn.execute('select value from dedup where key = ?', (key,))
|
|
|
|
result_tuple = cursor.fetchone()
|
|
|
|
conn.close()
|
|
|
|
if result_tuple:
|
|
|
|
result = json.loads(result_tuple[0])
|
2015-08-20 21:46:40 +00:00
|
|
|
result['id'] = result['id'].encode('latin1')
|
|
|
|
result['url'] = result['url'].encode('latin1')
|
|
|
|
result['date'] = result['date'].encode('latin1')
|
2015-07-30 00:12:59 +00:00
|
|
|
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
|
|
|
return result
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2015-08-24 23:53:11 +00:00
|
|
|
def notify(self, recorded_url, records):
|
2017-11-09 13:10:57 -08:00
|
|
|
if (records and records[0].type == b'response'
|
2015-08-24 23:53:11 +00:00
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2017-05-24 13:57:09 -07:00
|
|
|
digest_key = warcprox.digest_str(
|
2017-11-10 17:18:22 -08:00
|
|
|
recorded_url.payload_digest, self.options.base32)
|
2015-08-27 20:09:21 +00:00
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
2017-05-24 13:57:09 -07:00
|
|
|
self.save(
|
|
|
|
digest_key, records[0],
|
|
|
|
bucket=recorded_url.warcprox_meta["captures-bucket"])
|
2015-08-27 20:09:21 +00:00
|
|
|
else:
|
|
|
|
self.save(digest_key, records[0])
|
2015-08-24 23:53:11 +00:00
|
|
|
|
|
|
|
|
2015-07-30 00:12:59 +00:00
|
|
|
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
|
2015-11-18 02:00:48 +00:00
|
|
|
if (recorded_url.response_recorder
|
2017-11-10 17:18:22 -08:00
|
|
|
and recorded_url.payload_digest
|
2015-10-30 01:15:27 +00:00
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2017-11-10 17:18:22 -08:00
|
|
|
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
|
2015-08-27 20:09:21 +00:00
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
2017-10-20 20:00:02 +00:00
|
|
|
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
|
2017-10-21 20:24:28 +00:00
|
|
|
recorded_url.url)
|
2015-08-27 20:09:21 +00:00
|
|
|
else:
|
2017-10-21 20:24:28 +00:00
|
|
|
recorded_url.dedup_info = dedup_db.lookup(digest_key,
|
|
|
|
url=recorded_url.url)
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2015-08-20 21:46:40 +00:00
|
|
|
class RethinkDedupDb:
|
|
|
|
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
|
|
|
|
2017-10-11 12:06:19 -07:00
|
|
|
def __init__(self, options=warcprox.Options()):
|
|
|
|
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
|
|
|
|
self.rr = doublethink.Rethinker(
|
|
|
|
servers=parsed.hosts, db=parsed.database)
|
|
|
|
self.table = parsed.table
|
2015-08-20 21:46:40 +00:00
|
|
|
self._ensure_db_table()
|
2015-08-24 23:53:11 +00:00
|
|
|
self.options = options
|
2015-08-20 21:46:40 +00:00
|
|
|
|
|
|
|
def _ensure_db_table(self):
|
2017-03-02 15:06:26 -08:00
|
|
|
dbs = self.rr.db_list().run()
|
|
|
|
if not self.rr.dbname in dbs:
|
2017-06-07 16:05:47 -07:00
|
|
|
self.logger.info("creating rethinkdb database %r", self.rr.dbname)
|
2017-03-02 15:06:26 -08:00
|
|
|
self.rr.db_create(self.rr.dbname).run()
|
|
|
|
tables = self.rr.table_list().run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if not self.table in tables:
|
2017-03-02 15:06:26 -08:00
|
|
|
self.logger.info(
|
2017-06-07 16:05:47 -07:00
|
|
|
"creating rethinkdb table %r in database %r shards=%r "
|
2017-10-11 12:06:19 -07:00
|
|
|
"replicas=%r", self.table, self.rr.dbname,
|
|
|
|
len(self.rr.servers), min(3, len(self.rr.servers)))
|
2017-03-02 15:06:26 -08:00
|
|
|
self.rr.table_create(
|
2017-10-11 12:06:19 -07:00
|
|
|
self.table, primary_key="key", shards=len(self.rr.servers),
|
|
|
|
replicas=min(3, len(self.rr.servers))).run()
|
2016-06-16 00:04:59 +00:00
|
|
|
|
|
|
|
def start(self):
|
|
|
|
pass
|
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def save(self, digest_key, response_record, bucket=""):
|
|
|
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
|
|
|
k = "{}|{}".format(k, bucket)
|
2015-08-20 21:46:40 +00:00
|
|
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
|
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
|
|
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
|
|
|
record = {'key':k,'url':url,'date':date,'id':record_id}
|
2017-03-02 15:06:26 -08:00
|
|
|
result = self.rr.table(self.table).insert(
|
|
|
|
record, conflict="replace").run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
|
|
|
raise Exception("unexpected result %s saving %s", result, record)
|
|
|
|
self.logger.debug('dedup db saved %s:%s', k, record)
|
2015-08-20 21:46:40 +00:00
|
|
|
|
2017-10-21 20:24:28 +00:00
|
|
|
def lookup(self, digest_key, bucket="", url=None):
|
2015-08-27 20:09:21 +00:00
|
|
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
|
|
|
k = "{}|{}".format(k, bucket)
|
2017-03-02 15:06:26 -08:00
|
|
|
result = self.rr.table(self.table).get(k).run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if result:
|
|
|
|
for x in result:
|
|
|
|
result[x] = result[x].encode("utf-8")
|
|
|
|
self.logger.debug('dedup db lookup of key=%s returning %s', k, result)
|
|
|
|
return result
|
2015-08-24 23:53:11 +00:00
|
|
|
|
|
|
|
def notify(self, recorded_url, records):
|
2017-11-09 13:10:57 -08:00
|
|
|
if (records and records[0].type == b'response'
|
2015-08-24 23:53:11 +00:00
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2017-11-10 17:18:22 -08:00
|
|
|
digest_key = warcprox.digest_str(
|
|
|
|
recorded_url.payload_digest, self.options.base32)
|
2015-08-27 20:09:21 +00:00
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
|
|
|
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
|
|
|
|
else:
|
|
|
|
self.save(digest_key, records[0])
|
2017-03-02 15:06:26 -08:00
|
|
|
|
2017-10-13 17:44:07 +00:00
|
|
|
class CdxServerDedup(object):
|
|
|
|
"""Query a CDX server to perform deduplication.
|
|
|
|
"""
|
|
|
|
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
|
2017-10-19 22:11:22 +00:00
|
|
|
http_pool = urllib3.PoolManager()
|
2017-10-13 17:44:07 +00:00
|
|
|
|
2017-10-19 22:54:34 +00:00
|
|
|
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
|
2017-10-13 17:44:07 +00:00
|
|
|
options=warcprox.Options()):
|
|
|
|
self.cdx_url = cdx_url
|
|
|
|
self.options = options
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def save(self, digest_key, response_record, bucket=""):
|
|
|
|
"""Does not apply to CDX server, as it is obviously read-only.
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
|
2017-10-21 20:24:28 +00:00
|
|
|
def lookup(self, digest_key, url):
|
2017-10-13 17:44:07 +00:00
|
|
|
"""Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be
|
|
|
|
computed on the original content, after decoding Content-Encoding and
|
|
|
|
Transfer-Encoding, if any), if they match, write a revisit record.
|
|
|
|
|
2017-10-25 20:28:56 +00:00
|
|
|
Get only the last item (limit=-1) because Wayback Machine has special
|
|
|
|
performance optimisation to handle that. limit < 0 is very inefficient
|
|
|
|
in general. Maybe it could be configurable in the future.
|
|
|
|
|
2017-10-19 22:11:22 +00:00
|
|
|
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
|
2017-10-13 17:44:07 +00:00
|
|
|
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
2017-10-21 20:24:28 +00:00
|
|
|
:param url: Target URL string
|
2017-10-13 17:44:07 +00:00
|
|
|
Result must contain:
|
2017-10-19 22:11:22 +00:00
|
|
|
{"url": <URL>, "date": "%Y-%m-%dT%H:%M:%SZ"}
|
2017-10-13 17:44:07 +00:00
|
|
|
"""
|
|
|
|
u = url.decode("utf-8") if isinstance(url, bytes) else url
|
|
|
|
try:
|
|
|
|
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
|
2017-10-20 21:59:43 +00:00
|
|
|
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
|
2017-10-21 20:45:46 +00:00
|
|
|
limit=-1))
|
2017-10-19 22:11:22 +00:00
|
|
|
assert result.status == 200
|
|
|
|
if isinstance(digest_key, bytes):
|
|
|
|
dkey = digest_key
|
|
|
|
else:
|
|
|
|
dkey = digest_key.encode('utf-8')
|
|
|
|
dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey
|
2017-10-23 22:21:57 +00:00
|
|
|
line = result.data.strip()
|
2017-10-21 20:45:46 +00:00
|
|
|
if line:
|
2017-10-23 22:21:57 +00:00
|
|
|
(cdx_ts, cdx_digest) = line.split(b' ')
|
2017-10-21 20:45:46 +00:00
|
|
|
if cdx_digest == dkey:
|
2017-11-02 16:34:52 -07:00
|
|
|
dt = datetime.datetime.strptime(
|
|
|
|
cdx_ts.decode('ascii'), '%Y%m%d%H%M%S')
|
2017-10-21 20:45:46 +00:00
|
|
|
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
|
|
|
|
return dict(url=url, date=date)
|
2017-10-19 22:11:22 +00:00
|
|
|
except (HTTPError, AssertionError, ValueError) as exc:
|
|
|
|
self.logger.error('CdxServerDedup request failed for url=%s %s',
|
|
|
|
url, exc)
|
2017-10-13 17:44:07 +00:00
|
|
|
return None
|
|
|
|
|
|
|
|
def notify(self, recorded_url, records):
|
|
|
|
"""Since we don't save anything to CDX server, this does not apply.
|
|
|
|
"""
|
|
|
|
pass
|
2017-11-02 16:34:52 -07:00
|
|
|
|
2017-11-03 12:39:26 -07:00
|
|
|
class TroughClient(object):
|
|
|
|
logger = logging.getLogger("warcprox.dedup.TroughClient")
|
2017-10-03 12:41:04 -07:00
|
|
|
|
2017-11-03 12:39:26 -07:00
|
|
|
def __init__(self, rethinkdb_trough_db_url):
|
|
|
|
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
|
2017-10-11 12:06:19 -07:00
|
|
|
self.rr = doublethink.Rethinker(
|
|
|
|
servers=parsed.hosts, db=parsed.database)
|
|
|
|
self.svcreg = doublethink.ServiceRegistry(self.rr)
|
2017-11-13 12:45:49 -08:00
|
|
|
self._write_url_cache = {}
|
|
|
|
self._read_url_cache = {}
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def sql_value(x):
|
|
|
|
if x is None:
|
|
|
|
return 'null'
|
|
|
|
elif isinstance(x, datetime.datetime):
|
|
|
|
return 'datetime(%r)' % x.isoformat()
|
|
|
|
elif isinstance(x, bool):
|
|
|
|
return int(x)
|
|
|
|
elif isinstance(x, str) or isinstance(x, bytes):
|
|
|
|
# py3: repr(u'abc') => 'abc'
|
|
|
|
# repr(b'abc') => b'abc'
|
|
|
|
# py2: repr(u'abc') => u'abc'
|
|
|
|
# repr(b'abc') => 'abc'
|
|
|
|
# Repr gives us a prefix we don't want in different situations
|
|
|
|
# depending on whether this is py2 or py3. Chop it off either way.
|
|
|
|
r = repr(x)
|
|
|
|
if r[:1] == "'":
|
|
|
|
return r
|
|
|
|
else:
|
|
|
|
return r[1:]
|
|
|
|
elif isinstance(x, (int, float)):
|
|
|
|
return x
|
|
|
|
else:
|
|
|
|
raise Exception(
|
|
|
|
"don't know how to make an sql value from %r (%r)" % (
|
|
|
|
x, type(x)))
|
2017-10-03 12:41:04 -07:00
|
|
|
|
2017-11-03 12:39:26 -07:00
|
|
|
def segment_manager_url(self):
|
2017-10-11 12:06:19 -07:00
|
|
|
master_node = self.svcreg.unique_service('trough-sync-master')
|
2017-11-03 12:39:26 -07:00
|
|
|
assert master_node
|
|
|
|
return master_node['url']
|
|
|
|
|
2017-11-13 12:45:49 -08:00
|
|
|
def write_url_nocache(self, segment_id, schema_id='default'):
|
2017-11-03 12:39:26 -07:00
|
|
|
provision_url = os.path.join(self.segment_manager_url(), 'provision')
|
|
|
|
payload_dict = {'segment': segment_id, 'schema': schema_id}
|
|
|
|
response = requests.post(provision_url, json=payload_dict)
|
|
|
|
if response.status_code != 200:
|
|
|
|
raise Exception(
|
|
|
|
'Received %s: %r in response to POST %s with data %s' % (
|
|
|
|
response.status_code, response.text, provision_url,
|
|
|
|
json.dumps(payload_dict)))
|
|
|
|
result_dict = response.json()
|
|
|
|
# assert result_dict['schema'] == schema_id # previously provisioned?
|
|
|
|
return result_dict['write_url']
|
|
|
|
|
2017-11-13 12:45:49 -08:00
|
|
|
def read_url_nocache(self, segment_id):
|
2017-10-11 12:06:19 -07:00
|
|
|
reql = self.rr.table('services').get_all(
|
|
|
|
segment_id, index='segment').filter(
|
|
|
|
{'role':'trough-read'}).filter(
|
|
|
|
lambda svc: r.now().sub(
|
|
|
|
svc['last_heartbeat']).lt(svc['ttl'])
|
|
|
|
).order_by('load')
|
2017-11-10 13:37:09 -08:00
|
|
|
self.logger.debug('querying rethinkdb: %r', reql)
|
2017-10-03 12:41:04 -07:00
|
|
|
results = reql.run()
|
|
|
|
if results:
|
|
|
|
return results[0]['url']
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
2017-11-13 12:45:49 -08:00
|
|
|
def write_url(self, segment_id, schema_id='default'):
|
|
|
|
if not segment_id in self._write_url_cache:
|
|
|
|
self._write_url_cache[segment_id] = self.write_url_nocache(
|
|
|
|
segment_id, schema_id)
|
|
|
|
self.logger.info(
|
|
|
|
'segment %r write url is %r', segment_id,
|
|
|
|
self._write_url_cache[segment_id])
|
|
|
|
return self._write_url_cache[segment_id]
|
|
|
|
|
|
|
|
def read_url(self, segment_id):
|
|
|
|
if not self._read_url_cache.get(segment_id):
|
|
|
|
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
|
|
|
|
self.logger.info(
|
|
|
|
'segment %r read url is %r', segment_id,
|
|
|
|
self._read_url_cache[segment_id])
|
|
|
|
return self._read_url_cache[segment_id]
|
|
|
|
|
|
|
|
def write(self, segment_id, sql_tmpl, values, schema_id='default'):
|
|
|
|
write_url = self.write_url(segment_id, schema_id)
|
|
|
|
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
|
|
|
|
|
|
|
|
try:
|
|
|
|
response = requests.post(write_url, sql)
|
|
|
|
except:
|
|
|
|
del self._write_url_cache[segment_id]
|
|
|
|
self.logger.error(
|
|
|
|
'problem with trough write url %r', write_url,
|
|
|
|
exc_info=True)
|
|
|
|
return
|
|
|
|
if response.status_code != 200:
|
|
|
|
del self._write_url_cache[segment_id]
|
|
|
|
self.logger.warn(
|
|
|
|
'unexpected response %r %r %r from %r to sql=%r',
|
|
|
|
response.status_code, response.reason, response.text,
|
|
|
|
write_url, sql)
|
|
|
|
return
|
|
|
|
self.logger.debug('posted %r to %s', sql, write_url)
|
|
|
|
|
|
|
|
def read(self, segment_id, sql_tmpl, values):
|
|
|
|
read_url = self.read_url(segment_id)
|
|
|
|
if not read_url:
|
|
|
|
return None
|
|
|
|
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
|
|
|
|
try:
|
|
|
|
response = requests.post(read_url, sql)
|
|
|
|
except:
|
|
|
|
del self._read_url_cache[segment_id]
|
|
|
|
self.logger.error(
|
|
|
|
'problem with trough read url %r', read_url, exc_info=True)
|
|
|
|
return None
|
|
|
|
if response.status_code != 200:
|
|
|
|
del self._read_url_cache[segment_id]
|
|
|
|
self.logger.warn(
|
|
|
|
'unexpected response %r %r %r from %r to sql=%r',
|
|
|
|
response.status_code, response.reason, response.text,
|
|
|
|
read_url, sql)
|
|
|
|
return None
|
|
|
|
self.logger.trace(
|
|
|
|
'got %r from posting query %r to %r', response.text, sql,
|
|
|
|
read_url)
|
|
|
|
results = json.loads(response.text)
|
|
|
|
return results
|
|
|
|
|
2017-11-03 12:39:26 -07:00
|
|
|
def schema_exists(self, schema_id):
|
|
|
|
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
|
|
|
|
response = requests.get(url)
|
|
|
|
if response.status_code == 200:
|
|
|
|
return True
|
|
|
|
elif response.status_code == 404:
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
def register_schema(self, schema_id, sql):
|
|
|
|
url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id)
|
|
|
|
response = requests.put(url, sql)
|
|
|
|
if response.status_code not in (201, 204):
|
|
|
|
raise Exception(
|
|
|
|
'Received %s: %r in response to PUT %r with data %r' % (
|
|
|
|
response.status_code, response.text, sql, url))
|
|
|
|
|
|
|
|
class TroughDedupDb(object):
|
|
|
|
'''
|
|
|
|
https://github.com/internetarchive/trough
|
|
|
|
'''
|
|
|
|
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
|
|
|
|
|
|
|
|
SCHEMA_ID = 'warcprox-dedup-v1'
|
|
|
|
SCHEMA_SQL = ('create table dedup (\n'
|
|
|
|
' digest_key varchar(100) primary key,\n'
|
|
|
|
' url varchar(2100) not null,\n'
|
|
|
|
' date datetime not null,\n'
|
|
|
|
' id varchar(100));\n') # warc record id
|
2017-11-13 12:45:49 -08:00
|
|
|
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
|
|
|
|
'values (%s, %s, %s, %s);')
|
2017-11-03 12:39:26 -07:00
|
|
|
|
|
|
|
def __init__(self, options=warcprox.Options()):
|
|
|
|
self.options = options
|
|
|
|
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
|
|
|
|
2017-10-03 12:41:04 -07:00
|
|
|
def save(self, digest_key, response_record, bucket='__unspecified__'):
|
2017-10-13 15:54:05 -07:00
|
|
|
record_id = response_record.get_header(warctools.WarcRecord.ID)
|
|
|
|
url = response_record.get_header(warctools.WarcRecord.URL)
|
|
|
|
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
|
2017-11-13 12:45:49 -08:00
|
|
|
self._trough_cli.write(
|
|
|
|
bucket, self.WRITE_SQL_TMPL,
|
|
|
|
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
|
2017-10-03 12:41:04 -07:00
|
|
|
|
2017-11-02 16:34:52 -07:00
|
|
|
def lookup(self, digest_key, bucket='__unspecified__', url=None):
|
2017-11-13 12:45:49 -08:00
|
|
|
results = self._trough_cli.read(
|
|
|
|
bucket, 'select * from dedup where digest_key=%s;',
|
|
|
|
(digest_key,))
|
2017-10-03 12:41:04 -07:00
|
|
|
if results:
|
2017-11-13 12:45:49 -08:00
|
|
|
assert len(results) == 1 # sanity check (digest_key is primary key)
|
2017-10-03 12:41:04 -07:00
|
|
|
result = results[0]
|
|
|
|
result['id'] = result['id'].encode('ascii')
|
|
|
|
result['url'] = result['url'].encode('ascii')
|
|
|
|
result['date'] = result['date'].encode('ascii')
|
|
|
|
self.logger.debug(
|
|
|
|
'trough lookup of key=%r returning %r', digest_key, result)
|
|
|
|
return result
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def notify(self, recorded_url, records):
|
|
|
|
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
|
|
|
digest_key = warcprox.digest_str(
|
2017-11-13 12:27:31 -08:00
|
|
|
recorded_url.payload_digest,
|
2017-10-03 12:41:04 -07:00
|
|
|
self.options.base32)
|
|
|
|
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
|
|
|
|
self.save(
|
|
|
|
digest_key, records[0],
|
|
|
|
bucket=recorded_url.warcprox_meta['captures-bucket'])
|
|
|
|
else:
|
|
|
|
self.save(digest_key, records[0])
|