2017-03-02 15:06:26 -08:00
|
|
|
'''
|
2017-05-24 13:57:09 -07:00
|
|
|
warcprox/dedup.py - identical payload digest deduplication using sqlite db
|
2017-03-02 15:06:26 -08:00
|
|
|
|
|
|
|
Copyright (C) 2013-2017 Internet Archive
|
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
|
|
modify it under the terms of the GNU General Public License
|
|
|
|
as published by the Free Software Foundation; either version 2
|
|
|
|
of the License, or (at your option) any later version.
|
|
|
|
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
|
|
along with this program; if not, write to the Free Software
|
|
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
|
|
|
|
USA.
|
|
|
|
'''
|
2016-04-06 19:37:55 -07:00
|
|
|
|
2015-03-18 16:29:44 -07:00
|
|
|
from __future__ import absolute_import
|
|
|
|
|
2014-11-15 03:20:05 -08:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import json
|
|
|
|
from hanzo import warctools
|
2015-07-30 00:12:59 +00:00
|
|
|
import warcprox
|
2017-05-24 13:57:09 -07:00
|
|
|
import sqlite3
|
2014-11-15 03:20:05 -08:00
|
|
|
|
|
|
|
class DedupDb(object):
|
2015-03-18 16:29:44 -07:00
|
|
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
def __init__(
|
|
|
|
self, file='./warcprox.sqlite', options=warcprox.Options()):
|
|
|
|
self.file = file
|
2015-08-24 23:53:11 +00:00
|
|
|
self.options = options
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2016-06-16 00:04:59 +00:00
|
|
|
def start(self):
|
2017-05-24 13:57:09 -07:00
|
|
|
if os.path.exists(self.file):
|
|
|
|
self.logger.info(
|
|
|
|
'opening existing deduplication database %s',
|
|
|
|
self.file)
|
|
|
|
else:
|
|
|
|
self.logger.info(
|
|
|
|
'creating new deduplication database %s', self.file)
|
|
|
|
|
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
conn.execute(
|
|
|
|
'create table if not exists dedup ('
|
|
|
|
' key varchar(300) primary key,'
|
|
|
|
' value varchar(4000)'
|
|
|
|
');')
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
2016-06-16 00:04:59 +00:00
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def save(self, digest_key, response_record, bucket=""):
|
2014-11-15 03:20:05 -08:00
|
|
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
|
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
|
|
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
key = digest_key.decode('utf-8') + "|" + bucket
|
2015-08-27 20:09:21 +00:00
|
|
|
|
2015-08-20 21:46:40 +00:00
|
|
|
py_value = {'id':record_id, 'url':url, 'date':date}
|
2014-11-15 03:20:05 -08:00
|
|
|
json_value = json.dumps(py_value, separators=(',',':'))
|
|
|
|
|
2017-05-24 13:57:09 -07:00
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
conn.execute(
|
2017-05-25 17:38:20 +00:00
|
|
|
'insert or replace into dedup (key, value) values (?, ?)',
|
2017-05-24 13:57:09 -07:00
|
|
|
(key, json_value))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
2015-07-30 00:12:59 +00:00
|
|
|
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def lookup(self, digest_key, bucket=""):
|
2015-07-30 00:12:59 +00:00
|
|
|
result = None
|
2017-05-24 13:57:09 -07:00
|
|
|
key = digest_key.decode('utf-8') + '|' + bucket
|
|
|
|
conn = sqlite3.connect(self.file)
|
|
|
|
cursor = conn.execute('select value from dedup where key = ?', (key,))
|
|
|
|
result_tuple = cursor.fetchone()
|
|
|
|
conn.close()
|
|
|
|
if result_tuple:
|
|
|
|
result = json.loads(result_tuple[0])
|
2015-08-20 21:46:40 +00:00
|
|
|
result['id'] = result['id'].encode('latin1')
|
|
|
|
result['url'] = result['url'].encode('latin1')
|
|
|
|
result['date'] = result['date'].encode('latin1')
|
2015-07-30 00:12:59 +00:00
|
|
|
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
|
|
|
return result
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2015-08-24 23:53:11 +00:00
|
|
|
def notify(self, recorded_url, records):
|
|
|
|
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2017-05-24 13:57:09 -07:00
|
|
|
digest_key = warcprox.digest_str(
|
|
|
|
recorded_url.response_recorder.payload_digest,
|
|
|
|
self.options.base32)
|
2015-08-27 20:09:21 +00:00
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
2017-05-24 13:57:09 -07:00
|
|
|
self.save(
|
|
|
|
digest_key, records[0],
|
|
|
|
bucket=recorded_url.warcprox_meta["captures-bucket"])
|
2015-08-27 20:09:21 +00:00
|
|
|
else:
|
|
|
|
self.save(digest_key, records[0])
|
2015-08-24 23:53:11 +00:00
|
|
|
|
|
|
|
|
2015-07-30 00:12:59 +00:00
|
|
|
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
|
2015-11-18 02:00:48 +00:00
|
|
|
if (recorded_url.response_recorder
|
|
|
|
and recorded_url.response_recorder.payload_digest
|
2015-10-30 01:15:27 +00:00
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2015-08-27 20:09:21 +00:00
|
|
|
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
|
|
|
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
|
|
|
|
else:
|
|
|
|
recorded_url.dedup_info = dedup_db.lookup(digest_key)
|
2014-11-15 03:20:05 -08:00
|
|
|
|
2015-08-20 21:46:40 +00:00
|
|
|
class RethinkDedupDb:
|
|
|
|
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
|
|
|
|
2017-03-02 15:06:26 -08:00
|
|
|
def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()):
|
|
|
|
self.rr = rr
|
2015-08-20 21:46:40 +00:00
|
|
|
self.table = table
|
2017-03-02 15:06:26 -08:00
|
|
|
self.shards = shards or len(rr.servers)
|
|
|
|
self.replicas = replicas or min(3, len(rr.servers))
|
2015-08-20 21:46:40 +00:00
|
|
|
self._ensure_db_table()
|
2015-08-24 23:53:11 +00:00
|
|
|
self.options = options
|
2015-08-20 21:46:40 +00:00
|
|
|
|
|
|
|
def _ensure_db_table(self):
|
2017-03-02 15:06:26 -08:00
|
|
|
dbs = self.rr.db_list().run()
|
|
|
|
if not self.rr.dbname in dbs:
|
2017-06-07 16:05:47 -07:00
|
|
|
self.logger.info("creating rethinkdb database %r", self.rr.dbname)
|
2017-03-02 15:06:26 -08:00
|
|
|
self.rr.db_create(self.rr.dbname).run()
|
|
|
|
tables = self.rr.table_list().run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if not self.table in tables:
|
2017-03-02 15:06:26 -08:00
|
|
|
self.logger.info(
|
2017-06-07 16:05:47 -07:00
|
|
|
"creating rethinkdb table %r in database %r shards=%r "
|
|
|
|
"replicas=%r", self.table, self.rr.dbname, self.shards,
|
|
|
|
self.replicas)
|
2017-03-02 15:06:26 -08:00
|
|
|
self.rr.table_create(
|
|
|
|
self.table, primary_key="key", shards=self.shards,
|
|
|
|
replicas=self.replicas).run()
|
2015-08-20 21:46:40 +00:00
|
|
|
|
2016-06-16 00:04:59 +00:00
|
|
|
|
|
|
|
def start(self):
|
|
|
|
pass
|
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def save(self, digest_key, response_record, bucket=""):
|
|
|
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
|
|
|
k = "{}|{}".format(k, bucket)
|
2015-08-20 21:46:40 +00:00
|
|
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
|
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
|
|
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
|
|
|
record = {'key':k,'url':url,'date':date,'id':record_id}
|
2017-03-02 15:06:26 -08:00
|
|
|
result = self.rr.table(self.table).insert(
|
|
|
|
record, conflict="replace").run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
|
|
|
raise Exception("unexpected result %s saving %s", result, record)
|
|
|
|
self.logger.debug('dedup db saved %s:%s', k, record)
|
2015-08-20 21:46:40 +00:00
|
|
|
|
2015-08-27 20:09:21 +00:00
|
|
|
def lookup(self, digest_key, bucket=""):
|
|
|
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
|
|
|
k = "{}|{}".format(k, bucket)
|
2017-03-02 15:06:26 -08:00
|
|
|
result = self.rr.table(self.table).get(k).run()
|
2015-08-27 23:57:12 +00:00
|
|
|
if result:
|
|
|
|
for x in result:
|
|
|
|
result[x] = result[x].encode("utf-8")
|
|
|
|
self.logger.debug('dedup db lookup of key=%s returning %s', k, result)
|
|
|
|
return result
|
2015-08-24 23:53:11 +00:00
|
|
|
|
|
|
|
def notify(self, recorded_url, records):
|
|
|
|
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
|
|
|
and recorded_url.response_recorder.payload_size() > 0):
|
2015-11-18 02:00:48 +00:00
|
|
|
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
|
2015-08-24 23:53:11 +00:00
|
|
|
self.options.base32)
|
2015-08-27 20:09:21 +00:00
|
|
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
|
|
|
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
|
|
|
|
else:
|
|
|
|
self.save(digest_key, records[0])
|
2017-03-02 15:06:26 -08:00
|
|
|
|