Merge branch 'master' into trough-dedup

* master:
  hopefully fix test failing occasionally apparently due to race condition by checking that the file we're waiting for has some content
  fix payload digest by pulling calculation up one level where content has already been transfer-decoded
  new failing test for correct calculation of payload digest
  missed a spot handling case of no warc records written
This commit is contained in:
Noah Levitt 2017-11-13 11:47:04 -08:00
commit c40ad8391d
11 changed files with 210 additions and 60 deletions

View File

@ -52,7 +52,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.2.1b2.dev112', version='2.2.1b2.dev115',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -3,7 +3,7 @@
tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for
debugging, does not write warcs debugging, does not write warcs
Copyright (C) 2015-2016 Internet Archive Copyright (C) 2015-2017 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -46,7 +46,7 @@ class FakeQueue(object):
logging.info("{} {} {} {} {} size={} {}".format( logging.info("{} {} {} {} {} size={} {}".format(
recorded_url.client_ip, recorded_url.status, recorded_url.method, recorded_url.client_ip, recorded_url.status, recorded_url.method,
recorded_url.url.decode("utf-8"), recorded_url.mimetype, recorded_url.url.decode("utf-8"), recorded_url.mimetype,
recorded_url.size, warcprox.digest_str(recorded_url.response_recorder.payload_digest, False).decode('utf-8'))) recorded_url.size, warcprox.digest_str(recorded_url.payload_digest, False).decode('utf-8')))
def parse_args(): def parse_args():
prog = os.path.basename(sys.argv[0]) prog = os.path.basename(sys.argv[0])

View File

@ -46,6 +46,10 @@ from collections import Counter
import socket import socket
import datetime import datetime
import warcio.archiveiterator import warcio.archiveiterator
import io
import gzip
import mock
import email.message
try: try:
import http.server as http_server import http.server as http_server
@ -135,6 +139,24 @@ def dump_state(signum=None, frame=None):
signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGQUIT, dump_state)
def chunkify(buf, chunk_size=13):
i = 0
result = b''
while i < len(buf):
chunk_len = min(len(buf) - i, chunk_size)
result += ('%x\r\n' % chunk_len).encode('ascii')
result += buf[i:i+chunk_len]
result += b'\r\n'
i += chunk_size
result += b'0\r\n\r\n'
return result
# def gzipify(buf):
# with io.BytesIO() as outbuf:
# with gzip.GzipFile(fileobj=outbuf, mode='wb') as gz:
# gz.write(buf)
# return outbuf.getvalue()
class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
def build_response(self): def build_response(self):
m = re.match(r'^/([^/]+)/([^/]+)$', self.path) m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
@ -151,6 +173,71 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
+ b'Content-Type: text/plain\r\n' + b'Content-Type: text/plain\r\n'
+ b'\r\n') + b'\r\n')
payload = b'This response is missing a Content-Length http header.' payload = b'This response is missing a Content-Length http header.'
elif self.path.startswith('/test_payload_digest-'):
content_body = (
b'Hello. How are you. I am the test_payload_digest '
b'content body. The entity body is a possibly content-'
b'encoded version of me. The message body is a possibly '
b'transfer-encoded version of the entity body.\n')
gzipped = (
b"\x1f\x8b\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^"
b"\xb1\x1f\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8"
b"\xbf'\n\xa2@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f"
b"\x017H\x81?\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL"
b"\x1a{\xc0}\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc"
b"+\xeb\xac\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab"
b"\xbdI\xb5\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0"
b"\xc8\x00\x00\x00")
double_gzipped = (
b"\x1f\x8b\x08\x00jA\x06Z\x02\xff\x01\x89\x00v\xff\x1f\x8b"
b"\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^\xb1\x1f"
b"\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8\xbf'\n\xa2"
b"@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f\x017H\x81?"
b"\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL\x1a{\xc0}"
b"\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc+\xeb\xac"
b"\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab\xbdI\xb5"
b"\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0\xc8\x00\x00"
b"\x00\xf9\xdd\x8f\xed\x89\x00\x00\x00")
if self.path == '/test_payload_digest-plain':
payload = content_body
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-gzip':
payload = gzipped
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-ce-gzip':
payload = gzipped
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-gzip-ce-gzip':
payload = double_gzipped
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-te-chunked':
payload = chunkify(content_body)
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-gzip-te-chunked':
payload = chunkify(gzipped)
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-ce-gzip-te-chunked':
payload = chunkify(gzipped)
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-gzip-ce-gzip-te-chunked':
payload = chunkify(double_gzipped)
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
else:
raise Exception('bad path')
headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n'
logging.info('headers=%r payload=%r', headers, payload)
else: else:
payload = b'404 Not Found\n' payload = b'404 Not Found\n'
headers = (b'HTTP/1.1 404 Not Found\r\n' headers = (b'HTTP/1.1 404 Not Found\r\n'
@ -1372,11 +1459,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert response.status_code == 200 assert response.status_code == 200
start = time.time() start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')
while time.time() - start < 10: while time.time() - start < 10:
if os.path.exists(os.path.join( if os.path.exists(file) and os.stat(file).st_size > 0:
warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')):
break break
time.sleep(0.5) time.sleep(0.5)
assert os.path.exists(file)
assert os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log'))
crawl_log = open(os.path.join( crawl_log = open(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log'), 'rb').read() warcprox_.options.crawl_log_dir, 'crawl.log'), 'rb').read()
@ -1430,14 +1520,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert response.status_code == 200 assert response.status_code == 200
start = time.time() start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')
while time.time() - start < 10: while time.time() - start < 10:
if os.path.exists(os.path.join( if os.path.exists(file) and os.stat(file).st_size > 0:
warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')):
break break
time.sleep(0.5) time.sleep(0.5)
assert os.path.exists(file)
crawl_log_2 = open(os.path.join( crawl_log_2 = open(file, 'rb').read()
warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log'), 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2) assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2)
assert crawl_log_2[24:31] == b' 200 ' assert crawl_log_2[24:31] == b' 200 '
@ -1464,16 +1554,15 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})}
response = requests.head(url, proxies=archiving_proxies, headers=headers) response = requests.head(url, proxies=archiving_proxies, headers=headers)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')
start = time.time() start = time.time()
while time.time() - start < 10: while time.time() - start < 10:
if os.path.exists(os.path.join( if os.path.exists(file) and os.stat(file).st_size > 0:
warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')):
break break
time.sleep(0.5) time.sleep(0.5)
crawl_log_3 = open(os.path.join( assert os.path.exists(file)
warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log'), 'rb').read() crawl_log_3 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3) assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3)
assert crawl_log_3[24:31] == b' 200 ' assert crawl_log_3[24:31] == b' 200 '
assert crawl_log_3[31:42] == b' 0 ' assert crawl_log_3[31:42] == b' 0 '
@ -1506,14 +1595,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert response.status_code == 204 assert response.status_code == 204
start = time.time() start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')
while time.time() - start < 10: while time.time() - start < 10:
if os.path.exists(os.path.join( if os.path.exists(file) and os.stat(file).st_size > 0:
warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')):
break break
time.sleep(0.5) time.sleep(0.5)
crawl_log_4 = open(os.path.join( assert os.path.exists(file)
warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log'), 'rb').read() crawl_log_4 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4) assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4)
assert crawl_log_4[24:31] == b' 204 ' assert crawl_log_4[24:31] == b' 204 '
@ -1572,6 +1661,66 @@ def test_long_warcprox_meta(
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
next(rec_iter) next(rec_iter)
def test_payload_digest(warcprox_, http_daemon):
'''
Tests that digest is of RFC2616 "entity body"
(transfer-decoded but not content-decoded)
'''
class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler):
def __init__(self, url):
self.path = url
self.request_version = 'HTTP/1.1'
self.client_address = mock.MagicMock()
self.headers = email.message.Message()
self.headers.add_header('Host', 'localhost:%s' % http_daemon.server_port)
self.server = warcprox_.proxy
self.command = 'GET'
self.connection = mock.Mock()
PLAIN_SHA1 = b'sha1:881289333370aa4e3214505f1173423cc5a896b7'
GZIP_SHA1 = b'sha1:634e25de71ae01edb5c5d9e2e99c4836bbe94129'
GZIP_GZIP_SHA1 = b'sha1:cecbf3a5c4975072f5e4c5e0489f808ef246c2b4'
# plain
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-plain' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1
# content-type: application/gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# content-type: application/gzip && content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
# chunked plain
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1
# chunked content-type: application/gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# chunked content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# chunked content-type: application/gzip && content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
if __name__ == '__main__': if __name__ == '__main__':
pytest.main() pytest.main()

View File

@ -28,7 +28,7 @@ except ImportError:
import Queue as queue import Queue as queue
import datetime import datetime
def digest_str(hash_obj, base32): def digest_str(hash_obj, base32=False):
import base64 import base64
return hash_obj.name.encode('utf-8') + b':' + ( return hash_obj.name.encode('utf-8') + b':' + (
base64.b32encode(hash_obj.digest()) if base32 base64.b32encode(hash_obj.digest()) if base32

View File

@ -141,16 +141,16 @@ class RethinkCaptures:
return result return result
def _assemble_entry(self, recorded_url, records): def _assemble_entry(self, recorded_url, records):
if recorded_url.response_recorder: if recorded_url.payload_digest:
if recorded_url.response_recorder.payload_digest.name == "sha1": if recorded_url.payload_digest.name == "sha1":
sha1base32 = base64.b32encode( sha1base32 = base64.b32encode(
recorded_url.response_recorder.payload_digest.digest() recorded_url.payload_digest.digest()
).decode("utf-8") ).decode("utf-8")
else: else:
self.logger.warn( self.logger.warn(
"digest type is %r but big captures table is indexed " "digest type is %r but big captures table is indexed "
"by sha1", "by sha1",
recorded_url.response_recorder.payload_digest.name) recorded_url.payload_digest.name)
else: else:
digest = hashlib.new("sha1", records[0].content[1]) digest = hashlib.new("sha1", records[0].content[1])
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")

View File

@ -43,7 +43,7 @@ class CrawlLogger(object):
if recorded_url.response_recorder: if recorded_url.response_recorder:
content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset
payload_digest = warcprox.digest_str( payload_digest = warcprox.digest_str(
recorded_url.response_recorder.payload_digest, recorded_url.payload_digest,
self.options.base32) self.options.base32)
else: else:
# WARCPROX_WRITE_RECORD request # WARCPROX_WRITE_RECORD request

View File

@ -99,8 +99,7 @@ class DedupDb(object):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest, recorded_url.payload_digest, self.options.base32)
self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save( self.save(
digest_key, records[0], digest_key, records[0],
@ -111,9 +110,9 @@ class DedupDb(object):
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.response_recorder.payload_digest and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url) recorded_url.url)
@ -176,8 +175,8 @@ class RethinkDedupDb:
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, digest_key = warcprox.digest_str(
self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
else: else:

View File

@ -66,7 +66,7 @@ import time
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """
Wraps a socket._fileobject, recording the bytes as they are read, Wraps a socket._fileobject, recording the bytes as they are read,
calculating digests, and sending them on to the proxy client. calculating the block digest, and sending them on to the proxy client.
""" """
logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder") logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder")
@ -78,27 +78,19 @@ class ProxyingRecorder(object):
self.digest_algorithm = digest_algorithm self.digest_algorithm = digest_algorithm
self.block_digest = hashlib.new(digest_algorithm) self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None self.payload_offset = None
self.payload_digest = None
self.proxy_client = proxy_client self.proxy_client = proxy_client
self._proxy_client_conn_open = True self._proxy_client_conn_open = True
self.len = 0 self.len = 0
self.url = url self.url = url
def payload_starts_now(self): def payload_starts_now(self):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_offset = self.len self.payload_offset = self.len
def _update_payload_digest(self, hunk):
if self.payload_digest:
self.payload_digest.update(hunk)
def _update(self, hunk): def _update(self, hunk):
self._update_payload_digest(hunk)
self.block_digest.update(hunk) self.block_digest.update(hunk)
self.tempfile.write(hunk) self.tempfile.write(hunk)
if self.payload_digest and self._proxy_client_conn_open: if self.payload_offset is not None and self._proxy_client_conn_open:
try: try:
self.proxy_client.sendall(hunk) self.proxy_client.sendall(hunk)
except BaseException as e: except BaseException as e:
@ -157,6 +149,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self, sock, debuglevel=debuglevel, method=method) self, sock, debuglevel=debuglevel, method=method)
self.proxy_client = proxy_client self.proxy_client = proxy_client
self.url = url self.url = url
self.digest_algorithm = digest_algorithm
# Keep around extra reference to self.fp because HTTPResponse sets # Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it # self.fp=None after it finishes reading, but we still need it
@ -164,6 +157,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp, proxy_client, digest_algorithm, url=url) self.fp, proxy_client, digest_algorithm, url=url)
self.fp = self.recorder self.fp = self.recorder
self.payload_digest = None
def begin(self, extra_response_headers={}): def begin(self, extra_response_headers={}):
http_client.HTTPResponse.begin(self) # reads status line, headers http_client.HTTPResponse.begin(self) # reads status line, headers
@ -185,6 +180,12 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.proxy_client.sendall(status_and_headers.encode('latin1')) self.proxy_client.sendall(status_and_headers.encode('latin1'))
self.recorder.payload_starts_now() self.recorder.payload_starts_now()
self.payload_digest = hashlib.new(self.digest_algorithm)
def read(self, amt=None):
buf = http_client.HTTPResponse.read(self, amt)
self.payload_digest.update(buf)
return buf
def via_header_value(orig, request_version): def via_header_value(orig, request_version):
via = orig via = orig
@ -361,7 +362,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
return return
try: try:
self._proxy_request() return self._proxy_request()
except: except:
self.logger.error("exception proxying request", exc_info=True) self.logger.error("exception proxying request", exc_info=True)
raise raise
@ -406,6 +407,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
if 'Content-Length' in self.headers: if 'Content-Length' in self.headers:
req += self.rfile.read(int(self.headers['Content-Length'])) req += self.rfile.read(int(self.headers['Content-Length']))
prox_rec_res = None
try: try:
self.logger.debug('sending to remote server req=%r', req) self.logger.debug('sending to remote server req=%r', req)
@ -418,9 +420,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
url=self.url, method=self.command) url=self.url, method=self.command)
prox_rec_res.begin(extra_response_headers=extra_response_headers) prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(8192) buf = prox_rec_res.read(65536)
while buf != b'': while buf != b'':
buf = prox_rec_res.read(8192) buf = prox_rec_res.read(65536)
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
except Exception as e: except Exception as e:

View File

@ -307,7 +307,6 @@ class RethinkStatsDb(StatsDb):
def tally(self, recorded_url, records): def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url) buckets = self.buckets(recorded_url)
is_revisit = records[0].type == b'revisit'
with self._batch_lock: with self._batch_lock:
for bucket in buckets: for bucket in buckets:
bucket_stats = self._batch.setdefault( bucket_stats = self._batch.setdefault(
@ -316,12 +315,13 @@ class RethinkStatsDb(StatsDb):
bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size bucket_stats["total"]["wire_bytes"] += recorded_url.size
if is_revisit: if records:
bucket_stats["revisit"]["urls"] += 1 if records[0].type == b'revisit':
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size bucket_stats["revisit"]["urls"] += 1
else: bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
bucket_stats["new"]["urls"] += 1 else:
bucket_stats["new"]["wire_bytes"] += recorded_url.size bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
def _add_to_batch(self, add_me): def _add_to_batch(self, add_me):
with self._batch_lock: with self._batch_lock:

View File

@ -53,7 +53,8 @@ class WarcRecordBuilder:
refers_to=recorded_url.dedup_info.get('id'), refers_to=recorded_url.dedup_info.get('id'),
refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_target_uri=recorded_url.dedup_info['url'],
refers_to_date=recorded_url.dedup_info['date'], refers_to_date=recorded_url.dedup_info['date'],
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), payload_digest=warcprox.digest_str(
recorded_url.payload_digest, self.base32),
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip) remote_ip=recorded_url.remote_ip)
@ -64,7 +65,9 @@ class WarcRecordBuilder:
recorder=recorded_url.response_recorder, recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE, warc_type=warctools.WarcRecord.RESPONSE,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip) remote_ip=recorded_url.remote_ip,
payload_digest=warcprox.digest_str(
recorded_url.payload_digest, self.base32))
def build_warc_records(self, recorded_url): def build_warc_records(self, recorded_url):
"""Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)""" """Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)"""
@ -122,13 +125,8 @@ class WarcRecordBuilder:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST, headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(recorder.block_digest, self.base32))) warcprox.digest_str(recorder.block_digest, self.base32)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(recorder.payload_digest, self.base32)))
recorder.tempfile.seek(0) recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else: else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
digest = hashlib.new(self.digest_algorithm, data) digest = hashlib.new(self.digest_algorithm, data)
@ -137,7 +135,6 @@ class WarcRecordBuilder:
if not payload_digest: if not payload_digest:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(digest, self.base32))) warcprox.digest_str(digest, self.base32)))
content_tuple = content_type, data content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple) record = warctools.WarcRecord(headers=headers, content=content_tuple)

View File

@ -218,7 +218,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
content_type=content_type, method=self.command, content_type=content_type, method=self.command,
timestamp=timestamp, host=self.hostname, timestamp=timestamp, host=self.hostname,
duration=datetime.datetime.utcnow()-timestamp, duration=datetime.datetime.utcnow()-timestamp,
referer=self.headers.get('referer')) referer=self.headers.get('referer'),
payload_digest=prox_rec_res.payload_digest)
self.server.recorded_url_q.put(recorded_url) self.server.recorded_url_q.put(recorded_url)
return recorded_url return recorded_url
@ -328,7 +329,8 @@ class RecordedUrl:
def __init__(self, url, request_data, response_recorder, remote_ip, def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None, warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None, status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None): timestamp=None, host=None, duration=None, referer=None,
payload_digest=None):
# XXX should test what happens with non-ascii url (when does # XXX should test what happens with non-ascii url (when does
# url-encoding happen?) # url-encoding happen?)
if type(url) is not bytes: if type(url) is not bytes:
@ -366,6 +368,7 @@ class RecordedUrl:
self.host = host self.host = host
self.duration = duration self.duration = duration
self.referer = referer self.referer = referer
self.payload_digest = payload_digest
# inherit from object so that multiple inheritance from this class works # inherit from object so that multiple inheritance from this class works
# properly in python 2 # properly in python 2