From 1967b6aabf096b2702f577efe08524facb8cdf0f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 30 Oct 2013 00:54:35 -0700 Subject: [PATCH] persistent dedup database using anydbm --- warcprox.py | 52 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/warcprox.py b/warcprox.py index f985ef9..5f7f71f 100755 --- a/warcprox.py +++ b/warcprox.py @@ -23,6 +23,8 @@ import signal import time import tempfile import base64 +import anydbm +import json class CertificateAuthority(object): @@ -356,11 +358,20 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): logging.info('shutting down') BaseHTTPServer.HTTPServer.server_close(self) + class DedupDb: - def __init__(self): - # XXX in memory for the moment - self.db = {} + def __init__(self, dbm_file='./warcprox-dedup.db'): + if os.path.exists(dbm_file): + logging.info('opening existing deduplication database {}'.format(dbm_file)) + else: + logging.info('creating new deduplication database {}'.format(dbm_file)) + + self.db = anydbm.open(dbm_file, 'c') + + + def close(self): + self.db.close() def warc_record_written(self, record, warcfile, offset): @@ -368,20 +379,23 @@ class DedupDb: if warc_type != warctools.WarcRecord.RESPONSE: return - payload_digest = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST) - if payload_digest is None: + key = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST) + if key is None: return record_id = record.get_header(warctools.WarcRecord.ID) url = record.get_header(warctools.WarcRecord.URL) date = record.get_header(warctools.WarcRecord.DATE) - self.db[payload_digest] = {'i':record_id, 'u':url, 'd':date} + value = json.dumps({'i':record_id, 'u':url, 'd':date}) + self.db[key] = value def lookup(self, key): if key in self.db: - return self.db[key] + json_result = self.db[key] + result = json.loads(json_result) + return result else: return None @@ -398,6 +412,7 @@ class WarcRecordsetQueue(Queue.Queue): def create_and_queue(self, url, request_data, response_recorder, remote_ip): warc_date = warctools.warc.warc_datetime_str(datetime.now()) + dedup_info = None if dedup_db is not None and response_recorder.payload_sha1 is not None: key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1)) dedup_info = dedup_db.lookup(key) @@ -459,18 +474,18 @@ class WarcRecordsetQueue(Queue.Queue): if warc_type is not None: headers.append((warctools.WarcRecord.TYPE, warc_type)) headers.append((warctools.WarcRecord.ID, record_id)) - if profile is not None: - headers.append((warctools.WarcRecord.TYPE, profile)) headers.append((warctools.WarcRecord.DATE, warc_date)) headers.append((warctools.WarcRecord.URL, url)) + if remote_ip is not None: + headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) + if profile is not None: + headers.append((warctools.WarcRecord.TYPE, profile)) if refers_to is not None: headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) if refers_to_target_uri is not None: headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) if refers_to_date is not None: headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date)) - if remote_ip is not None: - headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if concurrent_to is not None: headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) if content_type is not None: @@ -666,8 +681,8 @@ if __name__ == '__main__': help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write SHA1 digests in Base32 instead of hex') - # arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', - # default='./dedup.db', help='persistent deduplication database file') + arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -686,7 +701,11 @@ if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=loglevel, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') - dedup_db = DedupDb() + if args.dedup_db_file in (None, '', '/dev/null'): + logging.info('deduplication disabled') + dedup_db = None + else: + dedup_db = DedupDb(args.dedup_db_file) recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db) @@ -705,7 +724,8 @@ if __name__ == '__main__': record.content_file.close() warc_writer.register_listener(close_content_file) - warc_writer.register_listener(dedup_db.warc_record_written) + if dedup_db is not None: + warc_writer.register_listener(dedup_db.warc_record_written) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start() @@ -723,4 +743,6 @@ if __name__ == '__main__': warc_writer.stop.set() proxy.shutdown() proxy.server_close() + if dedup_db is not None: + dedup_db.close()