From 03fe7179f86931f8a0f4af704fe3d7984527b8f9 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 30 Oct 2013 14:16:30 -0700 Subject: [PATCH] -g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM digest algorithm, one of md5, sha1, sha224, sha256, sha384, sha512 (default: sha1) --- README.md | 17 ++++++++--- warcprox.py | 82 ++++++++++++++++++++++++++++++++--------------------- 2 files changed, 62 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index f8d5094..3b77234 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,8 @@ incorporated into warctools mainline. usage: warcprox.py [-h] [-p PORT] [-b ADDRESS] [-c CACERT] [--certs-dir CERTS_DIR] [-d DIRECTORY] [-z] [-n PREFIX] [-s SIZE] [--rollover-idle-time ROLLOVER_IDLE_TIME] - [--base32] [-v] [-q] + [-g DIGEST_ALGORITHM] [--base32] [-j DEDUP_DB_FILE] [-v] + [-q] warcprox - WARC writing MITM HTTP/S proxy @@ -37,11 +38,12 @@ incorporated into warctools mainline. address to listen on (default: localhost) -c CACERT, --cacert CACERT CA certificate file; if file does not exist, it will - be created (default: ./desktop-nlevitt-warcprox- - ca.pem) + be created (default: ./Noah-Levitts-MacBook-Pro.local- + warcprox-ca.pem) --certs-dir CERTS_DIR where to store and load generated certificates - (default: ./desktop-nlevitt-warcprox-ca) + (default: ./Noah-Levitts-MacBook-Pro.local-warcprox- + ca) -d DIRECTORY, --dir DIRECTORY where to write warcs (default: ./warcs) -z, --gzip write gzip-compressed warc records (default: False) @@ -53,8 +55,15 @@ incorporated into warctools mainline. WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data) (default: None) + -g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM + digest algorithm, one of md5, sha1, sha224, sha256, + sha384, sha512 (default: sha1) --base32 write SHA1 digests in Base32 instead of hex (default: False) + -j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE + persistent deduplication database file; empty string + or /dev/null disables deduplication (default: + ./warcprox-dedup.db) -v, --verbose -q, --quiet diff --git a/warcprox.py b/warcprox.py index 93ef4e1..5a878da 100755 --- a/warcprox.py +++ b/warcprox.py @@ -116,49 +116,50 @@ class UnsupportedSchemeException(Exception): # send the raw bytes on to the proxy destination. class ProxyingRecorder: - def __init__(self, fp, proxy_dest): + def __init__(self, fp, proxy_dest, digest_algorithm='sha1'): self.fp = fp # "The file has no name, and will cease to exist when it is closed." self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) - self.block_sha1 = hashlib.sha1() + self.digest_algorithm = digest_algorithm + self.block_digest = hashlib.new(digest_algorithm) self.payload_offset = None - self.payload_sha1 = None + self.payload_digest = None self.proxy_dest = proxy_dest self._prev_hunk_last_two_bytes = '' self.len = 0 def _update(self, hunk): - if self.payload_sha1 is None: + if self.payload_digest is None: # convoluted handling of two newlines crossing hunks # XXX write tests for this if self._prev_hunk_last_two_bytes.endswith('\n'): if hunk.startswith('\n'): - self.payload_sha1 = hashlib.sha1() - self.payload_sha1.update(hunk[1:]) + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[1:]) self.payload_offset = self.len + 1 elif hunk.startswith('\r\n'): - self.payload_sha1 = hashlib.sha1() - self.payload_sha1.update(hunk[2:]) + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[2:]) self.payload_offset = self.len + 2 elif self._prev_hunk_last_two_bytes == '\n\r': if hunk.startswith('\n'): - self.payload_sha1 = hashlib.sha1() - self.payload_sha1.update(hunk[1:]) + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[1:]) self.payload_offset = self.len + 1 else: m = re.search(r'\n\r?\n', hunk) if m is not None: - self.payload_sha1 = hashlib.sha1() - self.payload_sha1.update(hunk[m.end():]) + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[m.end():]) self.payload_offset = self.len + m.end() # if we still haven't found start of payload hold on to these bytes - if self.payload_sha1 is None: + if self.payload_digest is None: self._prev_hunk_last_two_bytes = hunk[-2:] else: - self.payload_sha1.update(hunk) + self.payload_digest.update(hunk) - self.block_sha1.update(hunk) + self.block_digest.update(hunk) self.tempfile.write(hunk) self.proxy_dest.sendall(hunk) @@ -192,12 +193,12 @@ class ProxyingRecorder: class ProxyingRecordingHTTPResponse(httplib.HTTPResponse): - def __init__(self, sock, debuglevel=0, strict=0, method=None, buffering=False, proxy_dest=None): + def __init__(self, sock, debuglevel=0, strict=0, method=None, buffering=False, proxy_dest=None, digest_algorithm='sha1'): httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, strict=strict, method=method, buffering=buffering) # Keep around extra reference to self.fp because HTTPResponse sets # self.fp=None after it finishes reading, but we still need it - self.recorder = ProxyingRecorder(self.fp, proxy_dest) + self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm) self.fp = self.recorder @@ -318,7 +319,9 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # to the proxy client. # Proxy and record the response - h = ProxyingRecordingHTTPResponse(self._proxy_sock, proxy_dest=self.connection) + h = ProxyingRecordingHTTPResponse(self._proxy_sock, + proxy_dest=self.connection, + digest_algorithm=self.server.digest_algorithm) h.begin() buf = h.read(8192) @@ -361,10 +364,12 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=WarcProxyHandler, bind_and_activate=True, ca_file='./warcprox-ca.pem', - certs_dir='./warcprox-ca', recorded_url_q=None): + certs_dir='./warcprox-ca', recorded_url_q=None, + digest_algorithm='sha1'): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self.ca = CertificateAuthority(ca_file, certs_dir) self.recorded_url_q = recorded_url_q + self.digest_algorithm = digest_algorithm def server_activate(self): BaseHTTPServer.HTTPServer.server_activate(self) @@ -416,7 +421,7 @@ class WarcWriterThread(threading.Thread): # port is only used for warc filename def __init__(self, recorded_url_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0, - base32=False, dedup_db=None): + digest_algorithm='sha1', base32=False, dedup_db=None): threading.Thread.__init__(self, name='WarcWriterThread') self.recorded_url_q = recorded_url_q @@ -425,6 +430,7 @@ class WarcWriterThread(threading.Thread): self.rollover_idle_time = rollover_idle_time self.gzip = gzip + self.digest_algorithm = digest_algorithm self.base32 = base32 self.dedup_db = dedup_db @@ -450,8 +456,8 @@ class WarcWriterThread(threading.Thread): warc_date = warctools.warc.warc_datetime_str(datetime.now()) dedup_info = None - if dedup_db is not None and recorded_url.response_recorder.payload_sha1 is not None: - key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) + if dedup_db is not None and recorded_url.response_recorder.payload_digest is not None: + key = self.digest_str(recorded_url.response_recorder.payload_digest) dedup_info = dedup_db.lookup(key) if dedup_info is not None: @@ -493,10 +499,8 @@ class WarcWriterThread(threading.Thread): def digest_str(self, hash_obj): - if self.base32: - return base64.b32encode(hash_obj.digest()) - else: - return hash_obj.hexdigest() + return '{}:{}'.format(hash_obj.name, + base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest()) def build_warc_record(self, url, warc_date=None, recorder=None, data=None, @@ -533,24 +537,26 @@ class WarcWriterThread(threading.Thread): if recorder is not None: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, - 'sha1:{}'.format(self.digest_str(recorder.block_sha1)))) - if recorder.payload_sha1 is not None: + self.digest_str(recorder.block_digest))) + if recorder.payload_digest is not None: headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, - 'sha1:{}'.format(self.digest_str(recorder.payload_sha1)))) + self.digest_str(recorder.payload_digest))) recorder.tempfile.seek(0) record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) else: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)))) + block_digest = hashlib.new(self.digest_algorithm, data) headers.append((warctools.WarcRecord.BLOCK_DIGEST, - 'sha1:{}'.format(self.digest_str(hashlib.sha1(data))))) + self.digest_str(block_digest))) content_tuple = content_type, data record = warctools.WarcRecord(headers=headers, content=content_tuple) return record, record_id + def timestamp17(self): now = datetime.now() return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) @@ -616,7 +622,7 @@ class WarcWriterThread(threading.Thread): if (self.dedup_db is not None and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): - key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) + key = self.digest_str(recorded_url.response_recorder.payload_digest) self.dedup_db.save(key, recordset[0], recordset_offset) recorded_url.response_recorder.tempfile.close() @@ -691,6 +697,8 @@ if __name__ == '__main__': arg_parser.add_argument('--rollover-idle-time', dest='rollover_idle_time', default=None, help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write SHA1 digests in Base32 instead of hex') arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', @@ -719,17 +727,25 @@ if __name__ == '__main__': else: dedup_db = DedupDb(args.dedup_db_file) + try: + hashlib.new(args.digest_algorithm) + except Exception as e: + logging.fatal(e) + exit(1) + recorded_url_q = Queue.Queue() proxy = WarcProxy(server_address=(args.address, int(args.port)), ca_file=args.cacert, certs_dir=args.certs_dir, - recorded_url_q=recorded_url_q) + recorded_url_q=recorded_url_q, + digest_algorithm=args.digest_algorithm) warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, port=int(args.port), rollover_size=int(args.size), rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, - base32=args.base32, dedup_db=dedup_db) + base32=args.base32, dedup_db=dedup_db, + digest_algorithm=args.digest_algorithm) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start()