From 42f5e9b7a42eacb46a82405a319fa2af43edd565 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 11:21:42 -0800 Subject: [PATCH 01/18] add --crawl-log-dir option to fix failing test --- tests/test_warcprox.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index cf43949..0e8d33a 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -253,7 +253,8 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): '--method-filter=POST', '--port=0', '--playback-port=0', - '--onion-tor-socks-proxy=localhost:9050'] + '--onion-tor-socks-proxy=localhost:9050', + '--crawl-log-dir=crawl-logs'] if rethinkdb_servers: rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) From 72c2950c101cfa5073e715bf2098ea6fc62cbb75 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 11:22:58 -0800 Subject: [PATCH 02/18] bump dev version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3de51f7..a31c846 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev107', + version='2.2.1b2.dev108', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 538c9e0caf27f623ac57ff9f35f7a117456a7d5d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 12:34:06 -0800 Subject: [PATCH 03/18] modify test_crawl_log to expect crawl log to honor --base32 setting and add tests of WARCPROX_WRITE_RECORD request and HEAD request (not written to warc) --- setup.py | 2 +- tests/test_warcprox.py | 58 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index a31c846..04d0352 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev108', + version='2.2.1b2.dev109', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0e8d33a..21bf57e 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1374,7 +1374,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[6] == b'text/plain' assert fields[7] == b'-' assert re.match(br'^\d{17}[+]\d{3}', fields[8]) - assert fields[9] == b'sha1:NHKRURXEJICOQEINUDERRF6OZ2LZ7JYP' + assert fields[9] == b'sha1:69d51a46e44a04e8110da0c91897cece979fa70f' assert fields[10] == b'-' assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) @@ -1395,7 +1395,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[6] == b'text/plain' assert fields[7] == b'-' assert re.match(br'^\d{17}[+]\d{3}', fields[8]) - assert fields[9] == b'sha1:TKXGVS3ZPR24VDVV3XWZXYQSPTDBWP53' + assert fields[9] == b'sha1:9aae6acb797c75ca8eb5dded9be2127cc61b3fbb' assert fields[10] == b'-' assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) @@ -1432,7 +1432,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[6] == b'text/plain' assert fields[7] == b'-' assert re.match(br'^\d{17}[+]\d{3}', fields[8]) - assert fields[9] == b'sha1:NHKRURXEJICOQEINUDERRF6OZ2LZ7JYP' + assert fields[9] == b'sha1:69d51a46e44a04e8110da0c91897cece979fa70f' assert fields[10] == b'http://example.com/seed' assert fields[11] == b'duplicate:digest' extra_info = json.loads(fields[12].decode('utf-8')) @@ -1440,6 +1440,58 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): 'contentSize', 'warcFilename', 'warcFileOffset'} assert extra_info['contentSize'] == 145 + # a request that is not saved to a warc (because of --method-filter) + # currently not logged at all (XXX maybe it should be) + url = 'http://localhost:%s/b/cc' % http_daemon.server_port + headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} + response = requests.head(url, proxies=archiving_proxies, headers=headers) + time.sleep(3) + assert not os.path.exists(os.path.join( + warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')) + + # WARCPROX_WRITE_RECORD + url = 'http://fakeurl/' + payload = b'I am the WARCPROX_WRITE_RECORD payload' + headers = { + 'Content-Type': 'text/plain', + 'WARC-Type': 'metadata', + 'Host': 'N/A', + 'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_4'}), + } + response = requests.request( + method='WARCPROX_WRITE_RECORD', url=url, data=payload, + headers=headers, proxies=archiving_proxies) + assert response.status_code == 204 + + start = time.time() + while time.time() - start < 10: + if os.path.exists(os.path.join( + warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')): + break + time.sleep(0.5) + + crawl_log_4 = open(os.path.join( + warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log'), 'rb').read() + + assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4) + assert crawl_log_4[24:31] == b' 204 ' + assert crawl_log_4[31:42] == b' 38 ' + fields = crawl_log_4.split() + assert len(fields) == 13 + assert fields[3] == b'http://fakeurl/' + assert fields[4] == b'-' + assert fields[5] == b'-' + assert fields[6] == b'text/plain' + assert fields[7] == b'-' + assert re.match(br'^\d{17}[+]\d{3}', fields[8]) + assert fields[9] == b'sha1:bb56497c17d2684f5eca4af9df908c78ba74ca1c' + assert fields[10] == b'-' + assert fields[11] == b'-' + extra_info = json.loads(fields[12].decode('utf-8')) + assert set(extra_info.keys()) == { + 'contentSize', 'warcFilename', 'warcFileOffset'} + assert extra_info['contentSize'] == 38 + def test_long_warcprox_meta( warcprox_, http_daemon, archiving_proxies, playback_proxies): url = 'http://localhost:%s/b/g' % http_daemon.server_port From 78c6137016f602a37925bc81f1baefd4b40e4ff1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 12:35:10 -0800 Subject: [PATCH 04/18] fix crawl log handling of WARCPROX_WRITE_RECORD request --- setup.py | 2 +- warcprox/crawl_log.py | 18 ++++++++++++++---- warcprox/main.py | 3 ++- warcprox/warcproxy.py | 23 +++++++++++++---------- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/setup.py b/setup.py index 04d0352..78e312b 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev109', + version='2.2.1b2.dev110', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index eff4df9..68d1fbf 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -26,8 +26,9 @@ import os import warcprox class CrawlLogger(object): - def __init__(self, dir_): + def __init__(self, dir_, options=warcprox.Options()): self.dir = dir_ + self.options = options if not os.path.exists(self.dir): logging.info('creating directory %r', self.dir) os.mkdir(self.dir) @@ -40,10 +41,20 @@ class CrawlLogger(object): 'warcFilename': records[0].warc_filename, 'warcFileOffset': records[0].offset, } + if recorded_url.response_recorder: + content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset + payload_digest = warcprox.digest_str( + recorded_url.response_recorder.payload_digest, + self.options.base32) + else: + # WARCPROX_WRITE_RECORD request + content_length = len(recorded_url.request_data) + payload_digest = records[0].get_header( + b'WARC-Payload-Digest') fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '% 5s' % recorded_url.status, - '% 10s' % (recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset), + '% 10s' % content_length, recorded_url.url, '-', # hop path recorded_url.referer or '-', @@ -53,8 +64,7 @@ class CrawlLogger(object): recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, recorded_url.duration.microseconds//1000), - warcprox.digest_str( - recorded_url.response_recorder.payload_digest, True), + payload_digest, recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), 'duplicate:digest' if records[0].type == b'revisit' else '-', json.dumps(extra_info, separators=(',',':')), diff --git a/warcprox/main.py b/warcprox/main.py index e21ff6a..1e6aaf8 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -238,7 +238,8 @@ def init_controller(args): playback_proxy = None if args.crawl_log_dir: - listeners.append(warcprox.crawl_log.CrawlLogger(args.crawl_log_dir)) + listeners.append(warcprox.crawl_log.CrawlLogger( + args.crawl_log_dir, options=options)) for qualname in args.plugins or []: try: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 544dc61..afe1835 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -293,16 +293,19 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): if raw_warcprox_meta: warcprox_meta = json.loads(raw_warcprox_meta) - rec_custom = RecordedUrl(url=self.url, - request_data=request_data, - response_recorder=None, - remote_ip=b'', - warcprox_meta=warcprox_meta, - content_type=self.headers['Content-Type'], - custom_type=warc_type or self.headers['WARC-Type'].encode('utf-8'), - status=204, size=len(request_data), - client_ip=self.client_address[0], - method=self.command, timestamp=timestamp) + rec_custom = RecordedUrl( + url=self.url, + request_data=request_data, + response_recorder=None, + remote_ip=b'', + warcprox_meta=warcprox_meta, + content_type=self.headers['Content-Type'], + custom_type=warc_type or self.headers['WARC-Type'].encode('utf-8'), + status=204, size=len(request_data), + client_ip=self.client_address[0], + method=self.command, + timestamp=timestamp, + duration=datetime.datetime.utcnow()-timestamp) self.server.recorded_url_q.put(rec_custom) self.send_response(204, 'OK') From df6d7f1ce684b5f5b9be145bc57ffa66652d3392 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 13:09:07 -0800 Subject: [PATCH 05/18] make test_crawl_log expect HEAD request to be logged --- setup.py | 2 +- tests/test_warcprox.py | 30 +++++++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 78e312b..02853d5 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev110', + version='2.2.1b2.dev111', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 21bf57e..0a357b2 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1445,9 +1445,33 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): url = 'http://localhost:%s/b/cc' % http_daemon.server_port headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} response = requests.head(url, proxies=archiving_proxies, headers=headers) - time.sleep(3) - assert not os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')) + + start = time.time() + while time.time() - start < 10: + if os.path.exists(os.path.join( + warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')): + break + time.sleep(0.5) + + crawl_log_3 = open(os.path.join( + warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log'), 'rb').read() + + assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3) + assert crawl_log_3[24:31] == b' 200 ' + assert crawl_log_3[31:42] == b' 0 ' + fields = crawl_log_3.split() + assert len(fields) == 13 + assert fields[3].endswith(b'/b/cc') + assert fields[4] == b'-' + assert fields[5] == b'-' + assert fields[6] == b'text/plain' + assert fields[7] == b'-' + assert re.match(br'^\d{17}[+]\d{3}', fields[8]) + assert fields[9] == b'sha1:da39a3ee5e6b4b0d3255bfef95601890afd80709' + assert fields[10] == b'-' + assert fields[11] == b'-' + extra_info = json.loads(fields[12].decode('utf-8')) + assert extra_info == {'contentSize': 91} # WARCPROX_WRITE_RECORD url = 'http://fakeurl/' From 700056cc0428b447e41c1c33cab3dc21a147af0a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 13:10:57 -0800 Subject: [PATCH 06/18] fix failing test just committed, which involves running "listeners" for all urls, including those not archived; make adjustments accordingly --- setup.py | 2 +- warcprox/bigtable.py | 7 ++++--- warcprox/crawl_log.py | 14 ++++++-------- warcprox/dedup.py | 4 ++-- warcprox/playback.py | 3 ++- warcprox/stats.py | 16 ++++++++-------- warcprox/writerthread.py | 18 ++++++++++++------ 7 files changed, 35 insertions(+), 29 deletions(-) diff --git a/setup.py b/setup.py index 02853d5..0b5c891 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev111', + version='2.2.1b2.dev112', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index f3d897d..c9547b2 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -200,9 +200,10 @@ class RethinkCaptures: return entry def notify(self, recorded_url, records): - entry = self._assemble_entry(recorded_url, records) - with self._batch_lock: - self._batch.append(entry) + if records: + entry = self._assemble_entry(recorded_url, records) + with self._batch_lock: + self._batch.append(entry) def close(self): self.stop() diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 68d1fbf..5b4a4fc 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -36,11 +36,10 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = { - 'contentSize': recorded_url.size, - 'warcFilename': records[0].warc_filename, - 'warcFileOffset': records[0].offset, - } + extra_info = {'contentSize': recorded_url.size,} + if records: + extra_info['warcFilename'] = records[0].warc_filename + extra_info['warcFileOffset'] = records[0].offset if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( @@ -49,8 +48,7 @@ class CrawlLogger(object): else: # WARCPROX_WRITE_RECORD request content_length = len(recorded_url.request_data) - payload_digest = records[0].get_header( - b'WARC-Payload-Digest') + payload_digest = records[0].get_header(b'WARC-Payload-Digest') fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '% 5s' % recorded_url.status, @@ -66,7 +64,7 @@ class CrawlLogger(object): recorded_url.duration.microseconds//1000), payload_digest, recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), - 'duplicate:digest' if records[0].type == b'revisit' else '-', + 'duplicate:digest' if records and records[0].type == b'revisit' else '-', json.dumps(extra_info, separators=(',',':')), ] for i in range(len(fields)): diff --git a/warcprox/dedup.py b/warcprox/dedup.py index e70f5f9..b9cd223 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -93,7 +93,7 @@ class DedupDb(object): return result def notify(self, recorded_url, records): - if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( recorded_url.response_recorder.payload_digest, @@ -172,7 +172,7 @@ class RethinkDedupDb: return result def notify(self, recorded_url, records): - if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.options.base32) diff --git a/warcprox/playback.py b/warcprox/playback.py index a9aa47d..1a698c0 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -259,7 +259,8 @@ class PlaybackIndexDb(object): pass def notify(self, recorded_url, records): - self.save(records[0].warc_filename, records, records[0].offset) + if records: + self.save(records[0].warc_filename, records, records[0].offset) def save(self, warcfile, recordset, offset): response_record = recordset[0] diff --git a/warcprox/stats.py b/warcprox/stats.py index 55693a2..99e6804 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -170,12 +170,13 @@ class StatsDb: bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size - if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size + if records: + if records[0].type == b'revisit': + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size json_value = json.dumps(bucket_stats, separators=(',',':')) conn.execute( @@ -304,8 +305,7 @@ class RethinkStatsDb(StatsDb): def tally(self, recorded_url, records): buckets = self.buckets(recorded_url) - is_revisit = records[0].get_header( - warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT + is_revisit = records[0].type == b'revisit' with self._batch_lock: for bucket in buckets: bucket_stats = self._batch.setdefault( diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index e422a65..a8a6ef7 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -82,13 +82,15 @@ class WarcWriterThread(threading.Thread): self.logger.info("%s urls left to write", qsize) recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + records = [] self.idle = None if self._filter_accepts(recorded_url): if self.dedup_db: warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, base32=self.options.base32) records = self.writer_pool.write_records(recorded_url) - self._final_tasks(recorded_url, records) + + self._final_tasks(recorded_url, records) # try to release resources in a timely fashion if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: @@ -134,11 +136,15 @@ class WarcWriterThread(threading.Thread): payload_digest = "-" # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} - self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format( - recorded_url.client_ip, recorded_url.status, recorded_url.method, - recorded_url.url.decode("utf-8"), recorded_url.mimetype, - recorded_url.size, payload_digest, records[0].type.decode("utf-8"), - records[0].warc_filename, records[0].offset)) + type_ = records[0].type.decode("utf-8") if records else '-' + filename = records[0].warc_filename if records else '-' + offset = records[0].offset if records else '-' + self.logger.info( + "%s %s %s %s %s size=%s %s %s %s offset=%s", + recorded_url.client_ip, recorded_url.status, + recorded_url.method, recorded_url.url.decode("utf-8"), + recorded_url.mimetype, recorded_url.size, payload_digest, + type_, filename, offset) def _final_tasks(self, recorded_url, records): if self.listeners: From 750a333aa6ed3275fb19b882c6bc02fcd889ccfe Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 9 Nov 2017 15:23:15 -0800 Subject: [PATCH 07/18] not gonna bother figuring out why pypy regex is not matching https://travis-ci.org/internetarchive/warcprox/jobs/299864258#L615 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 5f7d8b3..d51105f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ python: matrix: allow_failures: + - python: pypy - python: pypy3 - python: nightly - python: 3.7-dev From cdd747f48e1e26b9426bce2c37e032fcc002ac55 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Nov 2017 13:37:09 -0800 Subject: [PATCH 08/18] eh, don't prefix sqlite filenames with 'warcprox-trough-'; logging tweaks --- warcprox/dedup.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78d35db..cb2cc2d 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -280,7 +280,7 @@ class TroughClient(object): lambda svc: r.now().sub( svc['last_heartbeat']).lt(svc['ttl']) ).order_by('load') - logging.debug('querying rethinkdb: %r', reql) + self.logger.debug('querying rethinkdb: %r', reql) results = reql.run() if results: return results[0]['url'] @@ -329,20 +329,18 @@ class TroughDedupDb(object): def _write_url(self, bucket): if not bucket in self._write_url_cache: - segment_id = 'warcprox-trough-%s' % bucket self._write_url_cache[bucket] = self._trough_cli.write_url( - segment_id, self.SCHEMA_ID) - logging.info( - 'bucket %r write url is %r', bucket, + bucket, self.SCHEMA_ID) + self.logger.info( + 'trough dedup bucket %r write url is %r', bucket, self._write_url_cache[bucket]) return self._write_url_cache[bucket] def _read_url(self, bucket): if not self._read_url_cache.get(bucket): - segment_id = 'warcprox-trough-%s' % bucket - self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id) - logging.info( - 'bucket %r read url is %r', bucket, + self._read_url_cache[bucket] = self._trough_cli.read_url(bucket) + self.logger.info( + 'trough dedup bucket %r read url is %r', bucket, self._read_url_cache[bucket]) return self._read_url_cache[bucket] @@ -381,18 +379,18 @@ class TroughDedupDb(object): try: response = requests.post(write_url, sql) except: - logging.error( + self.logger.error( 'problem with trough write url %r', write_url, exc_info=True) del self._write_url_cache[bucket] return if response.status_code != 200: del self._write_url_cache[bucket] - logging.warn( + self.logger.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) else: - logging.trace('posted %r to %s', sql, write_url) + self.logger.debug('posted %r to %s', sql, write_url) def lookup(self, digest_key, bucket='__unspecified__', url=None): read_url = self._read_url(bucket) @@ -403,17 +401,17 @@ class TroughDedupDb(object): try: response = requests.post(read_url, sql) except: - logging.error( + self.logger.error( 'problem with trough read url %r', read_url, exc_info=True) del self._read_url_cache[bucket] return None if response.status_code != 200: del self._read_url_cache[bucket] - logging.warn( + self.logger.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) return None - logging.debug('got %r from query %r', response.text, sql) + self.logger.trace('got %r from query %r', response.text, sql) results = json.loads(response.text) assert len(results) <= 1 # sanity check (digest_key is primary key) if results: From 3c215b42b56f45537973e91cc4a66050364a31b8 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Nov 2017 14:34:06 -0800 Subject: [PATCH 09/18] missed a spot handling case of no warc records written --- setup.py | 2 +- warcprox/stats.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/setup.py b/setup.py index 0b5c891..56c39f5 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev112', + version='2.2.1b2.dev113', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/stats.py b/warcprox/stats.py index 99e6804..254f764 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -305,7 +305,6 @@ class RethinkStatsDb(StatsDb): def tally(self, recorded_url, records): buckets = self.buckets(recorded_url) - is_revisit = records[0].type == b'revisit' with self._batch_lock: for bucket in buckets: bucket_stats = self._batch.setdefault( @@ -314,12 +313,13 @@ class RethinkStatsDb(StatsDb): bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size - if is_revisit: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size + if records: + if records[0].type == b'revisit': + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size def _add_to_batch(self, add_me): with self._batch_lock: From 30b6b0b337782c1f5ce918355f9009770bbae6a2 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Nov 2017 17:02:33 -0800 Subject: [PATCH 10/18] new failing test for correct calculation of payload digest which should match rfc2616 entity body, which is transfer decoded but not content-decoded --- tests/test_warcprox.py | 149 ++++++++++++++++++++++++++++++++++++++++- warcprox/__init__.py | 2 +- warcprox/mitmproxy.py | 3 +- 3 files changed, 151 insertions(+), 3 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0a357b2..4d1caab 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -46,6 +46,10 @@ from collections import Counter import socket import datetime import warcio.archiveiterator +import io +import gzip +import mock +import email.message try: import http.server as http_server @@ -84,7 +88,7 @@ def _send(self, data): # http_client.HTTPConnection.send = _send logging.basicConfig( - stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, + stream=sys.stdout, level=warcprox.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -134,6 +138,24 @@ def dump_state(signum=None, frame=None): signal.signal(signal.SIGQUIT, dump_state) +def chunkify(buf, chunk_size=13): + i = 0 + result = b'' + while i < len(buf): + chunk_len = min(len(buf) - i, chunk_size) + result += ('%x\r\n' % chunk_len).encode('ascii') + result += buf[i:i+chunk_len] + result += b'\r\n' + i += chunk_size + result += b'0\r\n\r\n' + return result + +# def gzipify(buf): +# with io.BytesIO() as outbuf: +# with gzip.GzipFile(fileobj=outbuf, mode='wb') as gz: +# gz.write(buf) +# return outbuf.getvalue() + class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): def build_response(self): m = re.match(r'^/([^/]+)/([^/]+)$', self.path) @@ -150,6 +172,71 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + b'Content-Type: text/plain\r\n' + b'\r\n') payload = b'This response is missing a Content-Length http header.' + elif self.path.startswith('/test_payload_digest-'): + content_body = ( + b'Hello. How are you. I am the test_payload_digest ' + b'content body. The entity body is a possibly content-' + b'encoded version of me. The message body is a possibly ' + b'transfer-encoded version of the entity body.\n') + gzipped = ( + b"\x1f\x8b\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^" + b"\xb1\x1f\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8" + b"\xbf'\n\xa2@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f" + b"\x017H\x81?\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL" + b"\x1a{\xc0}\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc" + b"+\xeb\xac\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab" + b"\xbdI\xb5\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0" + b"\xc8\x00\x00\x00") + double_gzipped = ( + b"\x1f\x8b\x08\x00jA\x06Z\x02\xff\x01\x89\x00v\xff\x1f\x8b" + b"\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^\xb1\x1f" + b"\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8\xbf'\n\xa2" + b"@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f\x017H\x81?" + b"\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL\x1a{\xc0}" + b"\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc+\xeb\xac" + b"\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab\xbdI\xb5" + b"\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0\xc8\x00\x00" + b"\x00\xf9\xdd\x8f\xed\x89\x00\x00\x00") + if self.path == '/test_payload_digest-plain': + payload = content_body + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/test_payload_digest-gzip': + payload = gzipped + actual_headers = (b'Content-Type: application/gzip\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/test_payload_digest-ce-gzip': + payload = gzipped + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Content-Encoding: gzip\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/test_payload_digest-gzip-ce-gzip': + payload = double_gzipped + actual_headers = (b'Content-Type: application/gzip\r\n' + + b'Content-Encoding: gzip\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/test_payload_digest-te-chunked': + payload = chunkify(content_body) + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Transfer-Encoding: chunked\r\n') + elif self.path == '/test_payload_digest-gzip-te-chunked': + payload = chunkify(gzipped) + actual_headers = (b'Content-Type: application/gzip\r\n' + + b'Transfer-Encoding: chunked\r\n') + elif self.path == '/test_payload_digest-ce-gzip-te-chunked': + payload = chunkify(gzipped) + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Content-Encoding: gzip\r\n' + + b'Transfer-Encoding: chunked\r\n') + elif self.path == '/test_payload_digest-gzip-ce-gzip-te-chunked': + payload = chunkify(double_gzipped) + actual_headers = (b'Content-Type: application/gzip\r\n' + + b'Content-Encoding: gzip\r\n' + + b'Transfer-Encoding: chunked\r\n') + else: + raise Exception('bad path') + headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n' + logging.info('headers=%r payload=%r', headers, payload) else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -1554,6 +1641,66 @@ def test_long_warcprox_meta( with pytest.raises(StopIteration): next(rec_iter) +def test_payload_digest(warcprox_, http_daemon): + ''' + Tests that digest is of RFC2616 "entity body" + (transfer-decoded but not content-decoded) + ''' + class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler): + def __init__(self, url): + self.path = url + self.request_version = 'HTTP/1.1' + self.client_address = mock.MagicMock() + self.headers = email.message.Message() + self.headers.add_header('Host', 'localhost:%s' % http_daemon.server_port) + self.server = warcprox_.proxy + self.command = 'GET' + self.connection = mock.Mock() + + PLAIN_SHA1 = b'sha1:881289333370aa4e3214505f1173423cc5a896b7' + GZIP_SHA1 = b'sha1:634e25de71ae01edb5c5d9e2e99c4836bbe94129' + GZIP_GZIP_SHA1 = b'sha1:cecbf3a5c4975072f5e4c5e0489f808ef246c2b4' + + # plain + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-plain' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1 + + # content-type: application/gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1 + + # content-encoding: gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1 + + # content-type: application/gzip && content-encoding: gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1 + + # chunked plain + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-te-chunked' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1 + + # chunked content-type: application/gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-te-chunked' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1 + + # chunked content-encoding: gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip-te-chunked' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1 + + # chunked content-type: application/gzip && content-encoding: gzip + mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip-te-chunked' % http_daemon.server_port) + req, prox_rec_res = mitm.do_GET() + assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1 + if __name__ == '__main__': pytest.main() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index ecd6f53..e50a415 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -28,7 +28,7 @@ except ImportError: import Queue as queue import datetime -def digest_str(hash_obj, base32): +def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( base64.b32encode(hash_obj.digest()) if base32 diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b14cddf..722311b 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -361,7 +361,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return try: - self._proxy_request() + return self._proxy_request() except: self.logger.error("exception proxying request", exc_info=True) raise @@ -406,6 +406,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if 'Content-Length' in self.headers: req += self.rfile.read(int(self.headers['Content-Length'])) + prox_rec_res = None try: self.logger.debug('sending to remote server req=%r', req) From 3a0f6e0947c859ebd45032ea7421d3d3449cbfd9 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Nov 2017 17:18:22 -0800 Subject: [PATCH 11/18] fix payload digest by pulling calculation up one level where content has already been transfer-decoded --- setup.py | 2 +- tests/single-threaded-proxy.py | 4 ++-- warcprox/bigtable.py | 8 ++++---- warcprox/crawl_log.py | 2 +- warcprox/dedup.py | 11 +++++------ warcprox/mitmproxy.py | 25 +++++++++++++------------ warcprox/warc.py | 13 +++++-------- warcprox/warcproxy.py | 7 +++++-- 8 files changed, 36 insertions(+), 36 deletions(-) diff --git a/setup.py b/setup.py index 56c39f5..625abbb 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev113', + version='2.2.1b2.dev114', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/single-threaded-proxy.py b/tests/single-threaded-proxy.py index dd5e709..1c176a2 100755 --- a/tests/single-threaded-proxy.py +++ b/tests/single-threaded-proxy.py @@ -3,7 +3,7 @@ tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for debugging, does not write warcs -Copyright (C) 2015-2016 Internet Archive +Copyright (C) 2015-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -46,7 +46,7 @@ class FakeQueue(object): logging.info("{} {} {} {} {} size={} {}".format( recorded_url.client_ip, recorded_url.status, recorded_url.method, recorded_url.url.decode("utf-8"), recorded_url.mimetype, - recorded_url.size, warcprox.digest_str(recorded_url.response_recorder.payload_digest, False).decode('utf-8'))) + recorded_url.size, warcprox.digest_str(recorded_url.payload_digest, False).decode('utf-8'))) def parse_args(): prog = os.path.basename(sys.argv[0]) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index c9547b2..115aed9 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -140,16 +140,16 @@ class RethinkCaptures: return result def _assemble_entry(self, recorded_url, records): - if recorded_url.response_recorder: - if recorded_url.response_recorder.payload_digest.name == "sha1": + if recorded_url.payload_digest: + if recorded_url.payload_digest.name == "sha1": sha1base32 = base64.b32encode( - recorded_url.response_recorder.payload_digest.digest() + recorded_url.payload_digest.digest() ).decode("utf-8") else: self.logger.warn( "digest type is %r but big captures table is indexed " "by sha1", - recorded_url.response_recorder.payload_digest.name) + recorded_url.payload_digest.name) else: digest = hashlib.new("sha1", records[0].content[1]) sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 5b4a4fc..f28683a 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -43,7 +43,7 @@ class CrawlLogger(object): if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( - recorded_url.response_recorder.payload_digest, + recorded_url.payload_digest, self.options.base32) else: # WARCPROX_WRITE_RECORD request diff --git a/warcprox/dedup.py b/warcprox/dedup.py index b9cd223..be49104 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -96,8 +96,7 @@ class DedupDb(object): if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( - recorded_url.response_recorder.payload_digest, - self.options.base32) + recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: self.save( digest_key, records[0], @@ -108,9 +107,9 @@ class DedupDb(object): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if (recorded_url.response_recorder - and recorded_url.response_recorder.payload_digest + and recorded_url.payload_digest and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) + digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], recorded_url.url) @@ -174,8 +173,8 @@ class RethinkDedupDb: def notify(self, recorded_url, records): if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, - self.options.base32) + digest_key = warcprox.digest_str( + recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) else: diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 722311b..2c34bcd 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -66,7 +66,7 @@ import time class ProxyingRecorder(object): """ Wraps a socket._fileobject, recording the bytes as they are read, - calculating digests, and sending them on to the proxy client. + calculating the block digest, and sending them on to the proxy client. """ logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder") @@ -78,27 +78,19 @@ class ProxyingRecorder(object): self.digest_algorithm = digest_algorithm self.block_digest = hashlib.new(digest_algorithm) self.payload_offset = None - self.payload_digest = None self.proxy_client = proxy_client self._proxy_client_conn_open = True self.len = 0 self.url = url def payload_starts_now(self): - self.payload_digest = hashlib.new(self.digest_algorithm) self.payload_offset = self.len - def _update_payload_digest(self, hunk): - if self.payload_digest: - self.payload_digest.update(hunk) - def _update(self, hunk): - self._update_payload_digest(hunk) self.block_digest.update(hunk) - self.tempfile.write(hunk) - if self.payload_digest and self._proxy_client_conn_open: + if self.payload_offset is not None and self._proxy_client_conn_open: try: self.proxy_client.sendall(hunk) except BaseException as e: @@ -157,6 +149,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self, sock, debuglevel=debuglevel, method=method) self.proxy_client = proxy_client self.url = url + self.digest_algorithm = digest_algorithm # Keep around extra reference to self.fp because HTTPResponse sets # self.fp=None after it finishes reading, but we still need it @@ -164,6 +157,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp, proxy_client, digest_algorithm, url=url) self.fp = self.recorder + self.payload_digest = None + def begin(self, extra_response_headers={}): http_client.HTTPResponse.begin(self) # reads status line, headers @@ -185,6 +180,12 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.proxy_client.sendall(status_and_headers.encode('latin1')) self.recorder.payload_starts_now() + self.payload_digest = hashlib.new(self.digest_algorithm) + + def read(self, amt=None): + buf = http_client.HTTPResponse.read(self, amt) + self.payload_digest.update(buf) + return buf def via_header_value(orig, request_version): via = orig @@ -419,9 +420,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): url=self.url, method=self.command) prox_rec_res.begin(extra_response_headers=extra_response_headers) - buf = prox_rec_res.read(8192) + buf = prox_rec_res.read(65536) while buf != b'': - buf = prox_rec_res.read(8192) + buf = prox_rec_res.read(65536) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) except Exception as e: diff --git a/warcprox/warc.py b/warcprox/warc.py index de0ec06..6b9cbcf 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -53,7 +53,8 @@ class WarcRecordBuilder: refers_to=recorded_url.dedup_info.get('id'), refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], - payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), + payload_digest=warcprox.digest_str( + recorded_url.payload_digest, self.base32), profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, remote_ip=recorded_url.remote_ip) @@ -64,7 +65,9 @@ class WarcRecordBuilder: recorder=recorded_url.response_recorder, warc_type=warctools.WarcRecord.RESPONSE, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, - remote_ip=recorded_url.remote_ip) + remote_ip=recorded_url.remote_ip, + payload_digest=warcprox.digest_str( + recorded_url.payload_digest, self.base32)) def build_warc_records(self, recorded_url): """Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)""" @@ -122,13 +125,8 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, warcprox.digest_str(recorder.block_digest, self.base32))) - if recorder.payload_digest is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, - warcprox.digest_str(recorder.payload_digest, self.base32))) - recorder.tempfile.seek(0) record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) - else: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) digest = hashlib.new(self.digest_algorithm, data) @@ -137,7 +135,6 @@ class WarcRecordBuilder: if not payload_digest: headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, warcprox.digest_str(digest, self.base32))) - content_tuple = content_type, data record = warctools.WarcRecord(headers=headers, content=content_tuple) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index afe1835..f1de01e 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -218,7 +218,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): content_type=content_type, method=self.command, timestamp=timestamp, host=self.hostname, duration=datetime.datetime.utcnow()-timestamp, - referer=self.headers.get('referer')) + referer=self.headers.get('referer'), + payload_digest=prox_rec_res.payload_digest) self.server.recorded_url_q.put(recorded_url) return recorded_url @@ -328,7 +329,8 @@ class RecordedUrl: def __init__(self, url, request_data, response_recorder, remote_ip, warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, - timestamp=None, host=None, duration=None, referer=None): + timestamp=None, host=None, duration=None, referer=None, + payload_digest=None): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -366,6 +368,7 @@ class RecordedUrl: self.host = host self.duration = duration self.referer = referer + self.payload_digest = payload_digest # inherit from object so that multiple inheritance from this class works # properly in python 2 From ffc8a268ab517e76ccb4e543305195a9ca552932 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 11:45:06 -0800 Subject: [PATCH 12/18] hopefully fix test failing occasionally apparently due to race condition by checking that the file we're waiting for has some content --- setup.py | 2 +- tests/test_warcprox.py | 32 +++++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/setup.py b/setup.py index 625abbb..ebeb213 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev114', + version='2.2.1b2.dev115', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 4d1caab..97e4351 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1441,11 +1441,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert response.status_code == 200 start = time.time() + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log') while time.time() - start < 10: - if os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')): + if os.path.exists(file) and os.stat(file).st_size > 0: break time.sleep(0.5) + assert os.path.exists(file) + assert os.path.exists(os.path.join( + warcprox_.options.crawl_log_dir, 'crawl.log')) crawl_log = open(os.path.join( warcprox_.options.crawl_log_dir, 'crawl.log'), 'rb').read() @@ -1499,14 +1502,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert response.status_code == 200 start = time.time() + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log') while time.time() - start < 10: - if os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')): + if os.path.exists(file) and os.stat(file).st_size > 0: break time.sleep(0.5) + assert os.path.exists(file) - crawl_log_2 = open(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log'), 'rb').read() + crawl_log_2 = open(file, 'rb').read() assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2) assert crawl_log_2[24:31] == b' 200 ' @@ -1533,16 +1536,15 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} response = requests.head(url, proxies=archiving_proxies, headers=headers) + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log') start = time.time() while time.time() - start < 10: - if os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')): + if os.path.exists(file) and os.stat(file).st_size > 0: break time.sleep(0.5) - crawl_log_3 = open(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log'), 'rb').read() - + assert os.path.exists(file) + crawl_log_3 = open(file, 'rb').read() assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3) assert crawl_log_3[24:31] == b' 200 ' assert crawl_log_3[31:42] == b' 0 ' @@ -1575,14 +1577,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert response.status_code == 204 start = time.time() + file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log') while time.time() - start < 10: - if os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')): + if os.path.exists(file) and os.stat(file).st_size > 0: break time.sleep(0.5) - crawl_log_4 = open(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log'), 'rb').read() + assert os.path.exists(file) + crawl_log_4 = open(file, 'rb').read() assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4) assert crawl_log_4[24:31] == b' 204 ' From 43c36cae102387ecbe4b96f18efdae9590b3cb38 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:27:31 -0800 Subject: [PATCH 13/18] update payload_digest reference in trough dedup for changes in commit 3a0f6e0947 --- warcprox/dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 842c9d3..9cb0bc5 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -428,7 +428,7 @@ class TroughDedupDb(object): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( - recorded_url.response_recorder.payload_digest, + recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: self.save( From 895683e062d487813536e9bc0d4804b6cabaf539 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:45:49 -0800 Subject: [PATCH 14/18] more cleanly separate trough client code from the rest of TroughDedup --- warcprox/dedup.py | 188 +++++++++++++++++++++++++--------------------- 1 file changed, 103 insertions(+), 85 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 9cb0bc5..5c56752 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -253,13 +253,42 @@ class TroughClient(object): self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) + self._write_url_cache = {} + self._read_url_cache = {} + + @staticmethod + def sql_value(x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + elif isinstance(x, (int, float)): + return x + else: + raise Exception( + "don't know how to make an sql value from %r (%r)" % ( + x, type(x))) def segment_manager_url(self): master_node = self.svcreg.unique_service('trough-sync-master') assert master_node return master_node['url'] - def write_url(self, segment_id, schema_id='default'): + def write_url_nocache(self, segment_id, schema_id='default'): provision_url = os.path.join(self.segment_manager_url(), 'provision') payload_dict = {'segment': segment_id, 'schema': schema_id} response = requests.post(provision_url, json=payload_dict) @@ -272,7 +301,7 @@ class TroughClient(object): # assert result_dict['schema'] == schema_id # previously provisioned? return result_dict['write_url'] - def read_url(self, segment_id): + def read_url_nocache(self, segment_id): reql = self.rr.table('services').get_all( segment_id, index='segment').filter( {'role':'trough-read'}).filter( @@ -286,6 +315,69 @@ class TroughClient(object): else: return None + def write_url(self, segment_id, schema_id='default'): + if not segment_id in self._write_url_cache: + self._write_url_cache[segment_id] = self.write_url_nocache( + segment_id, schema_id) + self.logger.info( + 'segment %r write url is %r', segment_id, + self._write_url_cache[segment_id]) + return self._write_url_cache[segment_id] + + def read_url(self, segment_id): + if not self._read_url_cache.get(segment_id): + self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) + self.logger.info( + 'segment %r read url is %r', segment_id, + self._read_url_cache[segment_id]) + return self._read_url_cache[segment_id] + + def write(self, segment_id, sql_tmpl, values, schema_id='default'): + write_url = self.write_url(segment_id, schema_id) + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + + try: + response = requests.post(write_url, sql) + except: + del self._write_url_cache[segment_id] + self.logger.error( + 'problem with trough write url %r', write_url, + exc_info=True) + return + if response.status_code != 200: + del self._write_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + write_url, sql) + return + self.logger.debug('posted %r to %s', sql, write_url) + + def read(self, segment_id, sql_tmpl, values): + read_url = self.read_url(segment_id) + if not read_url: + return None + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + try: + response = requests.post(read_url, sql) + except: + del self._read_url_cache[segment_id] + self.logger.error( + 'problem with trough read url %r', read_url, exc_info=True) + return None + if response.status_code != 200: + del self._read_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + read_url, sql) + return None + self.logger.trace( + 'got %r from posting query %r to %r', response.text, sql, + read_url) + results = json.loads(response.text) + return results + def schema_exists(self, schema_id): url = os.path.join(self.segment_manager_url(), 'schema', schema_id) response = requests.get(url) @@ -316,104 +408,30 @@ class TroughDedupDb(object): ' url varchar(2100) not null,\n' ' date datetime not null,\n' ' id varchar(100));\n') # warc record id + WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' + 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) - self._write_url_cache = {} - self._read_url_cache = {} def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) - def _write_url(self, bucket): - if not bucket in self._write_url_cache: - self._write_url_cache[bucket] = self._trough_cli.write_url( - bucket, self.SCHEMA_ID) - self.logger.info( - 'trough dedup bucket %r write url is %r', bucket, - self._write_url_cache[bucket]) - return self._write_url_cache[bucket] - - def _read_url(self, bucket): - if not self._read_url_cache.get(bucket): - self._read_url_cache[bucket] = self._trough_cli.read_url(bucket) - self.logger.info( - 'trough dedup bucket %r read url is %r', bucket, - self._read_url_cache[bucket]) - return self._read_url_cache[bucket] - - def sql_value(self, x): - if x is None: - return 'null' - elif isinstance(x, datetime.datetime): - return 'datetime(%r)' % x.isoformat() - elif isinstance(x, bool): - return int(x) - elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r - else: - return r[1:] - else: - raise Exception("don't know how to make an sql value from %r" % x) - def save(self, digest_key, response_record, bucket='__unspecified__'): - write_url = self._write_url(bucket) record_id = response_record.get_header(warctools.WarcRecord.ID) url = response_record.get_header(warctools.WarcRecord.URL) warc_date = response_record.get_header(warctools.WarcRecord.DATE) - - sql = ('insert into dedup (digest_key, url, date, id) ' - 'values (%s, %s, %s, %s);') % ( - self.sql_value(digest_key), self.sql_value(url), - self.sql_value(warc_date), self.sql_value(record_id)) - try: - response = requests.post(write_url, sql) - except: - self.logger.error( - 'problem with trough write url %r', write_url, - exc_info=True) - del self._write_url_cache[bucket] - return - if response.status_code != 200: - del self._write_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - else: - self.logger.debug('posted %r to %s', sql, write_url) + self._trough_cli.write( + bucket, self.WRITE_SQL_TMPL, + (digest_key, url, warc_date, record_id), self.SCHEMA_ID) def lookup(self, digest_key, bucket='__unspecified__', url=None): - read_url = self._read_url(bucket) - if not read_url: - return None - sql = 'select * from dedup where digest_key=%s;' % ( - self.sql_value(digest_key)) - try: - response = requests.post(read_url, sql) - except: - self.logger.error( - 'problem with trough read url %r', read_url, exc_info=True) - del self._read_url_cache[bucket] - return None - if response.status_code != 200: - del self._read_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - return None - self.logger.trace('got %r from query %r', response.text, sql) - results = json.loads(response.text) - assert len(results) <= 1 # sanity check (digest_key is primary key) + results = self._trough_cli.read( + bucket, 'select * from dedup where digest_key=%s;', + (digest_key,)) if results: + assert len(results) == 1 # sanity check (digest_key is primary key) result = results[0] result['id'] = result['id'].encode('ascii') result['url'] = result['url'].encode('ascii') From 46797a5dce7000c7ce7495d1ddbd078c9f5f1378 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:52:29 -0800 Subject: [PATCH 15/18] pypy and pypy3 are passing at the moment, so why not :) --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 565ba13..b8a91e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,8 +13,6 @@ python: matrix: allow_failures: - - python: pypy - - python: pypy3 - python: nightly - python: 3.7-dev From d7aea40b054e45ecbd682ee501bb54e2dbd9e9e1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:52:45 -0800 Subject: [PATCH 16/18] move trough client into separate module --- warcprox/dedup.py | 157 +------------------------------------- warcprox/trough.py | 182 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 154 deletions(-) create mode 100644 warcprox/trough.py diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5c56752..2364d41 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -26,10 +26,9 @@ import os import json from hanzo import warctools import warcprox +import warcprox.trough import sqlite3 -import requests import doublethink -import rethinkdb as r import datetime import urllib3 from urllib3.exceptions import HTTPError @@ -245,157 +244,6 @@ class CdxServerDedup(object): """ pass -class TroughClient(object): - logger = logging.getLogger("warcprox.dedup.TroughClient") - - def __init__(self, rethinkdb_trough_db_url): - parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) - self.rr = doublethink.Rethinker( - servers=parsed.hosts, db=parsed.database) - self.svcreg = doublethink.ServiceRegistry(self.rr) - self._write_url_cache = {} - self._read_url_cache = {} - - @staticmethod - def sql_value(x): - if x is None: - return 'null' - elif isinstance(x, datetime.datetime): - return 'datetime(%r)' % x.isoformat() - elif isinstance(x, bool): - return int(x) - elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r - else: - return r[1:] - elif isinstance(x, (int, float)): - return x - else: - raise Exception( - "don't know how to make an sql value from %r (%r)" % ( - x, type(x))) - - def segment_manager_url(self): - master_node = self.svcreg.unique_service('trough-sync-master') - assert master_node - return master_node['url'] - - def write_url_nocache(self, segment_id, schema_id='default'): - provision_url = os.path.join(self.segment_manager_url(), 'provision') - payload_dict = {'segment': segment_id, 'schema': schema_id} - response = requests.post(provision_url, json=payload_dict) - if response.status_code != 200: - raise Exception( - 'Received %s: %r in response to POST %s with data %s' % ( - response.status_code, response.text, provision_url, - json.dumps(payload_dict))) - result_dict = response.json() - # assert result_dict['schema'] == schema_id # previously provisioned? - return result_dict['write_url'] - - def read_url_nocache(self, segment_id): - reql = self.rr.table('services').get_all( - segment_id, index='segment').filter( - {'role':'trough-read'}).filter( - lambda svc: r.now().sub( - svc['last_heartbeat']).lt(svc['ttl']) - ).order_by('load') - self.logger.debug('querying rethinkdb: %r', reql) - results = reql.run() - if results: - return results[0]['url'] - else: - return None - - def write_url(self, segment_id, schema_id='default'): - if not segment_id in self._write_url_cache: - self._write_url_cache[segment_id] = self.write_url_nocache( - segment_id, schema_id) - self.logger.info( - 'segment %r write url is %r', segment_id, - self._write_url_cache[segment_id]) - return self._write_url_cache[segment_id] - - def read_url(self, segment_id): - if not self._read_url_cache.get(segment_id): - self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) - self.logger.info( - 'segment %r read url is %r', segment_id, - self._read_url_cache[segment_id]) - return self._read_url_cache[segment_id] - - def write(self, segment_id, sql_tmpl, values, schema_id='default'): - write_url = self.write_url(segment_id, schema_id) - sql = sql_tmpl % tuple(self.sql_value(v) for v in values) - - try: - response = requests.post(write_url, sql) - except: - del self._write_url_cache[segment_id] - self.logger.error( - 'problem with trough write url %r', write_url, - exc_info=True) - return - if response.status_code != 200: - del self._write_url_cache[segment_id] - self.logger.warn( - 'unexpected response %r %r %r from %r to sql=%r', - response.status_code, response.reason, response.text, - write_url, sql) - return - self.logger.debug('posted %r to %s', sql, write_url) - - def read(self, segment_id, sql_tmpl, values): - read_url = self.read_url(segment_id) - if not read_url: - return None - sql = sql_tmpl % tuple(self.sql_value(v) for v in values) - try: - response = requests.post(read_url, sql) - except: - del self._read_url_cache[segment_id] - self.logger.error( - 'problem with trough read url %r', read_url, exc_info=True) - return None - if response.status_code != 200: - del self._read_url_cache[segment_id] - self.logger.warn( - 'unexpected response %r %r %r from %r to sql=%r', - response.status_code, response.reason, response.text, - read_url, sql) - return None - self.logger.trace( - 'got %r from posting query %r to %r', response.text, sql, - read_url) - results = json.loads(response.text) - return results - - def schema_exists(self, schema_id): - url = os.path.join(self.segment_manager_url(), 'schema', schema_id) - response = requests.get(url) - if response.status_code == 200: - return True - elif response.status_code == 404: - return False - else: - response.raise_for_status() - - def register_schema(self, schema_id, sql): - url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) - response = requests.put(url, sql) - if response.status_code not in (201, 204): - raise Exception( - 'Received %s: %r in response to PUT %r with data %r' % ( - response.status_code, response.text, sql, url)) - class TroughDedupDb(object): ''' https://github.com/internetarchive/trough @@ -413,7 +261,8 @@ class TroughDedupDb(object): def __init__(self, options=warcprox.Options()): self.options = options - self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) + self._trough_cli = warcprox.trough.TroughClient( + options.rethinkdb_trough_db_url) def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) diff --git a/warcprox/trough.py b/warcprox/trough.py new file mode 100644 index 0000000..ec3a032 --- /dev/null +++ b/warcprox/trough.py @@ -0,0 +1,182 @@ +''' +warcprox/trough.py - trough client code + +Copyright (C) 2013-2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' + +from __future__ import absolute_import + +import logging +import os +import json +import requests +import doublethink +import rethinkdb as r +import datetime + +class TroughClient(object): + logger = logging.getLogger("warcprox.trough.TroughClient") + + def __init__(self, rethinkdb_trough_db_url): + parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.svcreg = doublethink.ServiceRegistry(self.rr) + self._write_url_cache = {} + self._read_url_cache = {} + + @staticmethod + def sql_value(x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + elif isinstance(x, (int, float)): + return x + else: + raise Exception( + "don't know how to make an sql value from %r (%r)" % ( + x, type(x))) + + def segment_manager_url(self): + master_node = self.svcreg.unique_service('trough-sync-master') + assert master_node + return master_node['url'] + + def write_url_nocache(self, segment_id, schema_id='default'): + provision_url = os.path.join(self.segment_manager_url(), 'provision') + payload_dict = {'segment': segment_id, 'schema': schema_id} + response = requests.post(provision_url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, provision_url, + json.dumps(payload_dict))) + result_dict = response.json() + # assert result_dict['schema'] == schema_id # previously provisioned? + return result_dict['write_url'] + + def read_url_nocache(self, segment_id): + reql = self.rr.table('services').get_all( + segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') + self.logger.debug('querying rethinkdb: %r', reql) + results = reql.run() + if results: + return results[0]['url'] + else: + return None + + def write_url(self, segment_id, schema_id='default'): + if not segment_id in self._write_url_cache: + self._write_url_cache[segment_id] = self.write_url_nocache( + segment_id, schema_id) + self.logger.info( + 'segment %r write url is %r', segment_id, + self._write_url_cache[segment_id]) + return self._write_url_cache[segment_id] + + def read_url(self, segment_id): + if not self._read_url_cache.get(segment_id): + self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) + self.logger.info( + 'segment %r read url is %r', segment_id, + self._read_url_cache[segment_id]) + return self._read_url_cache[segment_id] + + def write(self, segment_id, sql_tmpl, values, schema_id='default'): + write_url = self.write_url(segment_id, schema_id) + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + + try: + response = requests.post(write_url, sql) + except: + del self._write_url_cache[segment_id] + self.logger.error( + 'problem with trough write url %r', write_url, + exc_info=True) + return + if response.status_code != 200: + del self._write_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + write_url, sql) + return + self.logger.debug('posted %r to %s', sql, write_url) + + def read(self, segment_id, sql_tmpl, values): + read_url = self.read_url(segment_id) + if not read_url: + return None + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + try: + response = requests.post(read_url, sql) + except: + del self._read_url_cache[segment_id] + self.logger.error( + 'problem with trough read url %r', read_url, exc_info=True) + return None + if response.status_code != 200: + del self._read_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + read_url, sql) + return None + self.logger.trace( + 'got %r from posting query %r to %r', response.text, sql, + read_url) + results = json.loads(response.text) + return results + + def schema_exists(self, schema_id): + url = os.path.join(self.segment_manager_url(), 'schema', schema_id) + response = requests.get(url) + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + response.raise_for_status() + + def register_schema(self, schema_id, sql): + url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + response = requests.put(url, sql) + if response.status_code not in (201, 204): + raise Exception( + 'Received %s: %r in response to PUT %r with data %r' % ( + response.status_code, response.text, sql, url)) + From f5351a43dfd18ea6e6296f726c0b2673a7bc5a54 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 14:22:17 -0800 Subject: [PATCH 17/18] automatic segment promotion every hour --- tests/test_warcprox.py | 18 ++++++++++++ warcprox/dedup.py | 2 +- warcprox/trough.py | 66 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 5587d8f..e8c140b 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1721,6 +1721,24 @@ def test_payload_digest(warcprox_, http_daemon): req, prox_rec_res = mitm.do_GET() assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1 +def test_trough_segment_promotion(warcprox_): + if not warcprox_.options.rethinkdb_trough_db_url: + return + cli = warcprox.trough.TroughClient( + warcprox_.options.rethinkdb_trough_db_url, 3) + promoted = [] + def mock(segment_id): + promoted.append(segment_id) + cli.promote = mock + cli.register_schema('default', 'create table foo (bar varchar(100))') + cli.write('my_seg', 'insert into foo (bar) values ("boof")') + assert promoted == [] + time.sleep(3) + assert promoted == ['my_seg'] + promoted = [] + time.sleep(3) + assert promoted == [] + if __name__ == '__main__': pytest.main() diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 2364d41..d1e456d 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -262,7 +262,7 @@ class TroughDedupDb(object): def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = warcprox.trough.TroughClient( - options.rethinkdb_trough_db_url) + options.rethinkdb_trough_db_url, promotion_interval=60*60) def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) diff --git a/warcprox/trough.py b/warcprox/trough.py index ec3a032..6cbe1dd 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -1,7 +1,7 @@ ''' warcprox/trough.py - trough client code -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -28,17 +28,69 @@ import requests import doublethink import rethinkdb as r import datetime +import threading +import time class TroughClient(object): logger = logging.getLogger("warcprox.trough.TroughClient") - def __init__(self, rethinkdb_trough_db_url): + def __init__(self, rethinkdb_trough_db_url, promotion_interval=None): + ''' + TroughClient constructor + + Args: + rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to + trough configuration database + promotion_interval: if specified, `TroughClient` will spawn a + thread that "promotes" (pushed to hdfs) "dirty" trough segments + (segments that have received writes) periodically, sleeping for + `promotion_interval` seconds between cycles (default None) + ''' parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) self._write_url_cache = {} self._read_url_cache = {} + self._dirty_segments = set() + self._dirty_segments_lock = threading.RLock() + + self.promotion_interval = promotion_interval + self._promoter_thread = None + if promotion_interval: + self._promoter_thread = threading.Thread( + target=self._promotrix, name='TroughClient-promoter', + daemon=True) + self._promoter_thread.start() + + def _promotrix(self): + while True: + time.sleep(self.promotion_interval) + try: + with self._dirty_segments_lock: + dirty_segments = list(self._dirty_segments) + self._dirty_segments.clear() + logging.info('promoting %s trough segments') + for segment in dirty_segments: + try: + self.promote(segment) + except: + logging.error( + 'problem promoting segment %s', exc_info=True) + except: + logging.error( + 'caught exception doing segment promotion', + exc_info=True) + + def promote(self, segment_id): + url = os.path.join(self.segment_manager_url(), 'promote') + payload_dict = {'segment': segment_id} + response = requests.post(url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, url, + json.dumps(payload_dict))) @staticmethod def sql_value(x): @@ -116,12 +168,15 @@ class TroughClient(object): self._read_url_cache[segment_id]) return self._read_url_cache[segment_id] - def write(self, segment_id, sql_tmpl, values, schema_id='default'): + def write(self, segment_id, sql_tmpl, values=(), schema_id='default'): write_url = self.write_url(segment_id, schema_id) sql = sql_tmpl % tuple(self.sql_value(v) for v in values) try: response = requests.post(write_url, sql) + if segment_id not in self._dirty_segments: + with self._dirty_segments_lock: + self._dirty_segments.add(segment_id) except: del self._write_url_cache[segment_id] self.logger.error( @@ -137,7 +192,7 @@ class TroughClient(object): return self.logger.debug('posted %r to %s', sql, write_url) - def read(self, segment_id, sql_tmpl, values): + def read(self, segment_id, sql_tmpl, values=()): read_url = self.read_url(segment_id) if not read_url: return None @@ -173,7 +228,8 @@ class TroughClient(object): response.raise_for_status() def register_schema(self, schema_id, sql): - url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + url = os.path.join( + self.segment_manager_url(), 'schema', schema_id, 'sql') response = requests.put(url, sql) if response.status_code not in (201, 204): raise Exception( From ef590a2fec468a2466f8471f18d200cfc6e4b7aa Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 15:07:47 -0800 Subject: [PATCH 18/18] py2 fix --- .travis.yml | 2 +- warcprox/trough.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index b8a91e5..d712f84 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ before_install: - ping -c2 trough install: -- pip install . pytest requests warcio +- pip install . pytest requests warcio mock before_script: - ps ww -fHe diff --git a/warcprox/trough.py b/warcprox/trough.py index 6cbe1dd..6638b24 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -59,8 +59,8 @@ class TroughClient(object): self._promoter_thread = None if promotion_interval: self._promoter_thread = threading.Thread( - target=self._promotrix, name='TroughClient-promoter', - daemon=True) + target=self._promotrix, name='TroughClient-promoter') + self._promoter_thread.setDaemon(True) self._promoter_thread.start() def _promotrix(self):