persistent dedup database using anydbm

This commit is contained in:
Noah Levitt 2013-10-30 00:54:35 -07:00
parent 975657c74b
commit 1967b6aabf

View File

@ -23,6 +23,8 @@ import signal
import time
import tempfile
import base64
import anydbm
import json
class CertificateAuthority(object):
@ -356,11 +358,20 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
logging.info('shutting down')
BaseHTTPServer.HTTPServer.server_close(self)
class DedupDb:
def __init__(self):
# XXX in memory for the moment
self.db = {}
def __init__(self, dbm_file='./warcprox-dedup.db'):
if os.path.exists(dbm_file):
logging.info('opening existing deduplication database {}'.format(dbm_file))
else:
logging.info('creating new deduplication database {}'.format(dbm_file))
self.db = anydbm.open(dbm_file, 'c')
def close(self):
self.db.close()
def warc_record_written(self, record, warcfile, offset):
@ -368,20 +379,23 @@ class DedupDb:
if warc_type != warctools.WarcRecord.RESPONSE:
return
payload_digest = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST)
if payload_digest is None:
key = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST)
if key is None:
return
record_id = record.get_header(warctools.WarcRecord.ID)
url = record.get_header(warctools.WarcRecord.URL)
date = record.get_header(warctools.WarcRecord.DATE)
self.db[payload_digest] = {'i':record_id, 'u':url, 'd':date}
value = json.dumps({'i':record_id, 'u':url, 'd':date})
self.db[key] = value
def lookup(self, key):
if key in self.db:
return self.db[key]
json_result = self.db[key]
result = json.loads(json_result)
return result
else:
return None
@ -398,6 +412,7 @@ class WarcRecordsetQueue(Queue.Queue):
def create_and_queue(self, url, request_data, response_recorder, remote_ip):
warc_date = warctools.warc.warc_datetime_str(datetime.now())
dedup_info = None
if dedup_db is not None and response_recorder.payload_sha1 is not None:
key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1))
dedup_info = dedup_db.lookup(key)
@ -459,18 +474,18 @@ class WarcRecordsetQueue(Queue.Queue):
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
if profile is not None:
headers.append((warctools.WarcRecord.TYPE, profile))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if profile is not None:
headers.append((warctools.WarcRecord.TYPE, profile))
if refers_to is not None:
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
if refers_to_target_uri is not None:
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
if refers_to_date is not None:
headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if concurrent_to is not None:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
@ -666,8 +681,8 @@ if __name__ == '__main__':
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write SHA1 digests in Base32 instead of hex')
# arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
# default='./dedup.db', help='persistent deduplication database file')
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
@ -686,7 +701,11 @@ if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s')
dedup_db = DedupDb()
if args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = DedupDb(args.dedup_db_file)
recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db)
@ -705,7 +724,8 @@ if __name__ == '__main__':
record.content_file.close()
warc_writer.register_listener(close_content_file)
warc_writer.register_listener(dedup_db.warc_record_written)
if dedup_db is not None:
warc_writer.register_listener(dedup_db.warc_record_written)
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
@ -723,4 +743,6 @@ if __name__ == '__main__':
warc_writer.stop.set()
proxy.shutdown()
proxy.server_close()
if dedup_db is not None:
dedup_db.close()