-g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM digest algorithm, one of md5, sha1, sha224, sha256, sha384, sha512 (default: sha1)

This commit is contained in:
Noah Levitt 2013-10-30 14:16:30 -07:00
parent e370ec6fe2
commit 03fe7179f8
2 changed files with 62 additions and 37 deletions

View File

@ -26,7 +26,8 @@ incorporated into warctools mainline.
usage: warcprox.py [-h] [-p PORT] [-b ADDRESS] [-c CACERT] usage: warcprox.py [-h] [-p PORT] [-b ADDRESS] [-c CACERT]
[--certs-dir CERTS_DIR] [-d DIRECTORY] [-z] [-n PREFIX] [--certs-dir CERTS_DIR] [-d DIRECTORY] [-z] [-n PREFIX]
[-s SIZE] [--rollover-idle-time ROLLOVER_IDLE_TIME] [-s SIZE] [--rollover-idle-time ROLLOVER_IDLE_TIME]
[--base32] [-v] [-q] [-g DIGEST_ALGORITHM] [--base32] [-j DEDUP_DB_FILE] [-v]
[-q]
warcprox - WARC writing MITM HTTP/S proxy warcprox - WARC writing MITM HTTP/S proxy
@ -37,11 +38,12 @@ incorporated into warctools mainline.
address to listen on (default: localhost) address to listen on (default: localhost)
-c CACERT, --cacert CACERT -c CACERT, --cacert CACERT
CA certificate file; if file does not exist, it will CA certificate file; if file does not exist, it will
be created (default: ./desktop-nlevitt-warcprox- be created (default: ./Noah-Levitts-MacBook-Pro.local-
ca.pem) warcprox-ca.pem)
--certs-dir CERTS_DIR --certs-dir CERTS_DIR
where to store and load generated certificates where to store and load generated certificates
(default: ./desktop-nlevitt-warcprox-ca) (default: ./Noah-Levitts-MacBook-Pro.local-warcprox-
ca)
-d DIRECTORY, --dir DIRECTORY -d DIRECTORY, --dir DIRECTORY
where to write warcs (default: ./warcs) where to write warcs (default: ./warcs)
-z, --gzip write gzip-compressed warc records (default: False) -z, --gzip write gzip-compressed warc records (default: False)
@ -53,8 +55,15 @@ incorporated into warctools mainline.
WARC file rollover idle time threshold in seconds (so WARC file rollover idle time threshold in seconds (so
that Friday's last open WARC doesn't sit there all that Friday's last open WARC doesn't sit there all
weekend waiting for more data) (default: None) weekend waiting for more data) (default: None)
-g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM
digest algorithm, one of md5, sha1, sha224, sha256,
sha384, sha512 (default: sha1)
--base32 write SHA1 digests in Base32 instead of hex (default: --base32 write SHA1 digests in Base32 instead of hex (default:
False) False)
-j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE
persistent deduplication database file; empty string
or /dev/null disables deduplication (default:
./warcprox-dedup.db)
-v, --verbose -v, --verbose
-q, --quiet -q, --quiet

View File

@ -116,49 +116,50 @@ class UnsupportedSchemeException(Exception):
# send the raw bytes on to the proxy destination. # send the raw bytes on to the proxy destination.
class ProxyingRecorder: class ProxyingRecorder:
def __init__(self, fp, proxy_dest): def __init__(self, fp, proxy_dest, digest_algorithm='sha1'):
self.fp = fp self.fp = fp
# "The file has no name, and will cease to exist when it is closed." # "The file has no name, and will cease to exist when it is closed."
self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024)
self.block_sha1 = hashlib.sha1() self.digest_algorithm = digest_algorithm
self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None self.payload_offset = None
self.payload_sha1 = None self.payload_digest = None
self.proxy_dest = proxy_dest self.proxy_dest = proxy_dest
self._prev_hunk_last_two_bytes = '' self._prev_hunk_last_two_bytes = ''
self.len = 0 self.len = 0
def _update(self, hunk): def _update(self, hunk):
if self.payload_sha1 is None: if self.payload_digest is None:
# convoluted handling of two newlines crossing hunks # convoluted handling of two newlines crossing hunks
# XXX write tests for this # XXX write tests for this
if self._prev_hunk_last_two_bytes.endswith('\n'): if self._prev_hunk_last_two_bytes.endswith('\n'):
if hunk.startswith('\n'): if hunk.startswith('\n'):
self.payload_sha1 = hashlib.sha1() self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_sha1.update(hunk[1:]) self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1 self.payload_offset = self.len + 1
elif hunk.startswith('\r\n'): elif hunk.startswith('\r\n'):
self.payload_sha1 = hashlib.sha1() self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_sha1.update(hunk[2:]) self.payload_digest.update(hunk[2:])
self.payload_offset = self.len + 2 self.payload_offset = self.len + 2
elif self._prev_hunk_last_two_bytes == '\n\r': elif self._prev_hunk_last_two_bytes == '\n\r':
if hunk.startswith('\n'): if hunk.startswith('\n'):
self.payload_sha1 = hashlib.sha1() self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_sha1.update(hunk[1:]) self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1 self.payload_offset = self.len + 1
else: else:
m = re.search(r'\n\r?\n', hunk) m = re.search(r'\n\r?\n', hunk)
if m is not None: if m is not None:
self.payload_sha1 = hashlib.sha1() self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_sha1.update(hunk[m.end():]) self.payload_digest.update(hunk[m.end():])
self.payload_offset = self.len + m.end() self.payload_offset = self.len + m.end()
# if we still haven't found start of payload hold on to these bytes # if we still haven't found start of payload hold on to these bytes
if self.payload_sha1 is None: if self.payload_digest is None:
self._prev_hunk_last_two_bytes = hunk[-2:] self._prev_hunk_last_two_bytes = hunk[-2:]
else: else:
self.payload_sha1.update(hunk) self.payload_digest.update(hunk)
self.block_sha1.update(hunk) self.block_digest.update(hunk)
self.tempfile.write(hunk) self.tempfile.write(hunk)
self.proxy_dest.sendall(hunk) self.proxy_dest.sendall(hunk)
@ -192,12 +193,12 @@ class ProxyingRecorder:
class ProxyingRecordingHTTPResponse(httplib.HTTPResponse): class ProxyingRecordingHTTPResponse(httplib.HTTPResponse):
def __init__(self, sock, debuglevel=0, strict=0, method=None, buffering=False, proxy_dest=None): def __init__(self, sock, debuglevel=0, strict=0, method=None, buffering=False, proxy_dest=None, digest_algorithm='sha1'):
httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, strict=strict, method=method, buffering=buffering) httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, strict=strict, method=method, buffering=buffering)
# Keep around extra reference to self.fp because HTTPResponse sets # Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it # self.fp=None after it finishes reading, but we still need it
self.recorder = ProxyingRecorder(self.fp, proxy_dest) self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm)
self.fp = self.recorder self.fp = self.recorder
@ -318,7 +319,9 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# to the proxy client. # to the proxy client.
# Proxy and record the response # Proxy and record the response
h = ProxyingRecordingHTTPResponse(self._proxy_sock, proxy_dest=self.connection) h = ProxyingRecordingHTTPResponse(self._proxy_sock,
proxy_dest=self.connection,
digest_algorithm=self.server.digest_algorithm)
h.begin() h.begin()
buf = h.read(8192) buf = h.read(8192)
@ -361,10 +364,12 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, server_address, req_handler_class=WarcProxyHandler, def __init__(self, server_address, req_handler_class=WarcProxyHandler,
bind_and_activate=True, ca_file='./warcprox-ca.pem', bind_and_activate=True, ca_file='./warcprox-ca.pem',
certs_dir='./warcprox-ca', recorded_url_q=None): certs_dir='./warcprox-ca', recorded_url_q=None,
digest_algorithm='sha1'):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = CertificateAuthority(ca_file, certs_dir) self.ca = CertificateAuthority(ca_file, certs_dir)
self.recorded_url_q = recorded_url_q self.recorded_url_q = recorded_url_q
self.digest_algorithm = digest_algorithm
def server_activate(self): def server_activate(self):
BaseHTTPServer.HTTPServer.server_activate(self) BaseHTTPServer.HTTPServer.server_activate(self)
@ -416,7 +421,7 @@ class WarcWriterThread(threading.Thread):
# port is only used for warc filename # port is only used for warc filename
def __init__(self, recorded_url_q, directory, rollover_size=1000000000, def __init__(self, recorded_url_q, directory, rollover_size=1000000000,
rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0,
base32=False, dedup_db=None): digest_algorithm='sha1', base32=False, dedup_db=None):
threading.Thread.__init__(self, name='WarcWriterThread') threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q self.recorded_url_q = recorded_url_q
@ -425,6 +430,7 @@ class WarcWriterThread(threading.Thread):
self.rollover_idle_time = rollover_idle_time self.rollover_idle_time = rollover_idle_time
self.gzip = gzip self.gzip = gzip
self.digest_algorithm = digest_algorithm
self.base32 = base32 self.base32 = base32
self.dedup_db = dedup_db self.dedup_db = dedup_db
@ -450,8 +456,8 @@ class WarcWriterThread(threading.Thread):
warc_date = warctools.warc.warc_datetime_str(datetime.now()) warc_date = warctools.warc.warc_datetime_str(datetime.now())
dedup_info = None dedup_info = None
if dedup_db is not None and recorded_url.response_recorder.payload_sha1 is not None: if dedup_db is not None and recorded_url.response_recorder.payload_digest is not None:
key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) key = self.digest_str(recorded_url.response_recorder.payload_digest)
dedup_info = dedup_db.lookup(key) dedup_info = dedup_db.lookup(key)
if dedup_info is not None: if dedup_info is not None:
@ -493,10 +499,8 @@ class WarcWriterThread(threading.Thread):
def digest_str(self, hash_obj): def digest_str(self, hash_obj):
if self.base32: return '{}:{}'.format(hash_obj.name,
return base64.b32encode(hash_obj.digest()) base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest())
else:
return hash_obj.hexdigest()
def build_warc_record(self, url, warc_date=None, recorder=None, data=None, def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
@ -533,24 +537,26 @@ class WarcWriterThread(threading.Thread):
if recorder is not None: if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder))))
headers.append((warctools.WarcRecord.BLOCK_DIGEST, headers.append((warctools.WarcRecord.BLOCK_DIGEST,
'sha1:{}'.format(self.digest_str(recorder.block_sha1)))) self.digest_str(recorder.block_digest)))
if recorder.payload_sha1 is not None: if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
'sha1:{}'.format(self.digest_str(recorder.payload_sha1)))) self.digest_str(recorder.payload_digest)))
recorder.tempfile.seek(0) recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else: else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)))) headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data))))
block_digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST, headers.append((warctools.WarcRecord.BLOCK_DIGEST,
'sha1:{}'.format(self.digest_str(hashlib.sha1(data))))) self.digest_str(block_digest)))
content_tuple = content_type, data content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple) record = warctools.WarcRecord(headers=headers, content=content_tuple)
return record, record_id return record, record_id
def timestamp17(self): def timestamp17(self):
now = datetime.now() now = datetime.now()
return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
@ -616,7 +622,7 @@ class WarcWriterThread(threading.Thread):
if (self.dedup_db is not None if (self.dedup_db is not None
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) key = self.digest_str(recorded_url.response_recorder.payload_digest)
self.dedup_db.save(key, recordset[0], recordset_offset) self.dedup_db.save(key, recordset[0], recordset_offset)
recorded_url.response_recorder.tempfile.close() recorded_url.response_recorder.tempfile.close()
@ -691,6 +697,8 @@ if __name__ == '__main__':
arg_parser.add_argument('--rollover-idle-time', arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None, dest='rollover_idle_time', default=None,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms)))
arg_parser.add_argument('--base32', dest='base32', action='store_true', arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write SHA1 digests in Base32 instead of hex') default=False, help='write SHA1 digests in Base32 instead of hex')
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
@ -719,17 +727,25 @@ if __name__ == '__main__':
else: else:
dedup_db = DedupDb(args.dedup_db_file) dedup_db = DedupDb(args.dedup_db_file)
try:
hashlib.new(args.digest_algorithm)
except Exception as e:
logging.fatal(e)
exit(1)
recorded_url_q = Queue.Queue() recorded_url_q = Queue.Queue()
proxy = WarcProxy(server_address=(args.address, int(args.port)), proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca_file=args.cacert, certs_dir=args.certs_dir, ca_file=args.cacert, certs_dir=args.certs_dir,
recorded_url_q=recorded_url_q) recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm)
warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q,
directory=args.directory, gzip=args.gzip, prefix=args.prefix, directory=args.directory, gzip=args.gzip, prefix=args.prefix,
port=int(args.port), rollover_size=int(args.size), port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None,
base32=args.base32, dedup_db=dedup_db) base32=args.base32, dedup_db=dedup_db,
digest_algorithm=args.digest_algorithm)
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start() proxy_thread.start()