From 8022257a57d3afda4fab66c9b36cc06d1222d389 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Jul 2018 16:35:05 +0000 Subject: [PATCH 01/10] setuptools likes README.rst not readme.rst --- readme.rst => README.rst | 0 api.rst | 8 ++++---- setup.py | 4 ++-- warcprox/main.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename readme.rst => README.rst (100%) diff --git a/readme.rst b/README.rst similarity index 100% rename from readme.rst rename to README.rst diff --git a/api.rst b/api.rst index bac642b..4605bd3 100644 --- a/api.rst +++ b/api.rst @@ -125,7 +125,7 @@ configuration information and metadata with each proxy request to warcprox. The value is a json blob. There are several fields understood by warcprox, and arbitrary additional fields can be included. If warcprox doesn't recognize a field it simply ignores it. Custom fields may be useful for custom warcprox -plugins (see ``_). +plugins (see ``_). Warcprox strips the ``warcprox-meta`` header out before sending the request to remote server, and does not write it in the warc request record. @@ -152,7 +152,7 @@ Example:: ``dedup-bucket`` (string) ~~~~~~~~~~~~~~~~~~~~~~~~~ Specifies the deduplication bucket. For more information about deduplication -see ``_. +see ``_. Example:: @@ -206,7 +206,7 @@ of the bucket. The other currently recognized key is ``tally-domains``, which if supplied should be a list of domains. This instructs warcprox to additionally tally substats of the given bucket by domain. -See ``_ for more information on statistics kept by +See ``_ for more information on statistics kept by warcprox. Examples:: @@ -223,7 +223,7 @@ limit on a domain specified in ``tally-domains``. ~~~~~~~~~~~~~~~~~~~~~~~ Specifies quantitative limits for warcprox to enforce. The structure of the dictionary is ``{stats_key: numerical_limit, ...}`` where stats key has the -format ``"bucket/sub-bucket/statistic"``. See `readme.rst#statistics`_ for +format ``"bucket/sub-bucket/statistic"``. See `README.rst#statistics`_ for further explanation of what "bucket", "sub-bucket", and "statistic" mean here. If processing a request would result in exceeding a limit, warcprox aborts diff --git a/setup.py b/setup.py index 6ac73a1..5a43ed9 100755 --- a/setup.py +++ b/setup.py @@ -40,12 +40,12 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev176', + version='2.4b2.dev177', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', author_email='nlevitt@archive.org', - long_description=open('readme.rst').read(), + long_description=open('README.rst').read(), license='GPL', packages=['warcprox'], install_requires=deps, diff --git a/warcprox/main.py b/warcprox/main.py index 5f45a13..6fb46ef 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -193,7 +193,7 @@ def _build_arg_parser(prog='warcprox'): action='append', help=( 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' 'May be used multiple times to register multiple plugins. ' - 'See readme.rst for more information.')) + 'See README.rst for more information.')) arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.__version__)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') From 6786a668b135744ab06124d15e373db361590d04 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Jul 2018 12:03:26 -0500 Subject: [PATCH 02/10] 2.4b2 for pypi --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5a43ed9..7a65a0a 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev177', + version='2.4b2', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 8c22c559551b068b422c692a42b61cc64af044d4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Jul 2018 12:04:08 -0500 Subject: [PATCH 03/10] back to dev version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7a65a0a..6233983 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2', + version='2.4b3.dev178', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 2df82bd4031e3e23aa49ded2416f9ea0a1b3751d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Jul 2018 13:47:52 -0500 Subject: [PATCH 04/10] record request method in crawl log if not GET --- setup.py | 2 +- tests/test_warcprox.py | 5 +++-- warcprox/crawl_log.py | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 6233983..221ff7b 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b3.dev178', + version='2.4b3.dev179', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 13b6bad..0375ca1 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1799,7 +1799,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[10] == b'-' assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) - assert extra_info == {'contentSize': 91} + assert extra_info == {'contentSize': 91, 'method': 'HEAD'} # WARCPROX_WRITE_RECORD url = 'http://fakeurl/' @@ -1838,8 +1838,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) assert set(extra_info.keys()) == { - 'contentSize', 'warcFilename', 'warcFileOffset'} + 'contentSize', 'warcFilename', 'warcFileOffset', 'method'} assert extra_info['contentSize'] == 38 + assert extra_info['method'] == 'WARCPROX_WRITE_RECORD' def test_long_warcprox_meta( warcprox_, http_daemon, archiving_proxies, playback_proxies): diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index f28683a..a953402 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -40,6 +40,8 @@ class CrawlLogger(object): if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset + if recorded_url.method != 'GET': + extra_info['method'] = recorded_url.method if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( From 46d5b0e82cd9603e25e267bd77e1c59054eede0b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Jul 2018 16:09:42 -0500 Subject: [PATCH 05/10] run trough with python 3.6 plus travis cleanup docker image python:3 is now using 3.7 and building pyyaml < 3.13 fails yaml/pyyaml#126 also filed pull request to update trough's pyyaml dependency spec internetarchive/trough#20 --- .travis.yml | 9 ++++++--- tests/run-trough.sh | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 19a3e67..c427b37 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ before_install: - docker network create --driver=bridge trough - docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb - docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- docker run --detach --network=trough --hostname=trough --name=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3.6 bash /run-trough.sh - cat /etc/hosts - echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄 - echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts @@ -44,7 +44,10 @@ install: - pip install . pytest requests warcio mock before_script: +- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true +- docker logs --timestamps --details trough - ps ww -fHe +- docker ps script: - py.test -v tests @@ -55,8 +58,8 @@ script: after_script: - ps ww -fHe - docker exec trough cat /tmp/trough-write.out -- docker exec trough cat /tmp/trough-write-provisioner-server.out -- docker exec trough cat /tmp/trough-write-provisioner-local.out +- docker exec trough cat /tmp/trough-segment-manager-server.out +- docker exec trough cat /tmp/trough-segment-manager-local.out - docker exec trough cat /tmp/trough-sync-server.out - docker exec trough cat /tmp/trough-sync-local.out - docker exec trough cat /tmp/trough-read.out diff --git a/tests/run-trough.sh b/tests/run-trough.sh index 81e0e68..ce80488 100644 --- a/tests/run-trough.sh +++ b/tests/run-trough.sh @@ -3,6 +3,8 @@ # this is used by .travis.yml # +set -x + pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string pip install git+https://github.com/internetarchive/trough.git From f4cf7829228edc607251cf2cb15d69fc16a54744 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Jul 2018 19:23:24 -0500 Subject: [PATCH 06/10] test should expose trough dedup concurrency bug --- tests/test_warcprox.py | 52 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0375ca1..fad7130 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -51,6 +51,7 @@ import gzip import mock import email.message import socketserver +from concurrent import futures try: import http.server as http_server @@ -886,6 +887,57 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + revisits_before = warcprox_.proxy.stats_db.value( + '__all__', 'revisit', 'urls') or 0 + + # fire off 20 initial requests simultaneously-ish + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests to the same urls but different buckets + # none should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, -i - 1) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 40) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests same as the initial requests, all should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 60) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + 20 + def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls From b7e12a3ec21df793bb594a4e8eff3e599783d39b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Jul 2018 19:25:43 -0500 Subject: [PATCH 07/10] some logging improvements --- warcprox/__init__.py | 2 ++ warcprox/dedup.py | 8 +++++--- warcprox/writer.py | 11 ++++++----- warcprox/writerthread.py | 15 +++++++-------- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 4825e29..2dcc838 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -148,6 +148,8 @@ class BasePostfetchProcessor(threading.Thread): raise Exception('not implemented') def _run(self): + threading.current_thread().name = '%s(tid=%s)' % ( + threading.current_thread().name, gettid()) self.logger.info('%s starting up', self) self._startup() while not self.stop.is_set(): diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 81be2ea..c0bc817 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -405,7 +405,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url.payload_digest, self.options.base32) if recorded_url.payload_digest else 'n/a') self.logger.debug( - 'filtered out digests (not loading dedup): %r', discards) + 'len(batch)=%s len(discards)=%s buckets=%s', + len(batch), len(discards), + {bucket: len(buckets[bucket]) for bucket in buckets}) return buckets def _build_key_index(self, batch): @@ -459,8 +461,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): novel = sorted([ k for k in key_index.keys() if k not in dups]) self.logger.debug( - 'bucket %s: dups=%r novel=%r', - bucket, dups, novel) + 'bucket %s: dups(%s)=%r novel(%s)=%r', + bucket, len(dups), dups, len(novel), novel) except futures.TimeoutError as e: # the remaining threads actually keep running in this case, diff --git a/warcprox/writer.py b/warcprox/writer.py index 9ab249c..ffdca8f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -204,13 +204,14 @@ class WarcWriter: record.offset = offset record.length = warc.f.tell() - offset record.warc_filename = warc.finalname - self.logger.debug( + self.logger.trace( 'wrote warc record: warc_type=%s content_length=%s ' - 'url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), + 'digest=%s offset=%d warc=%s url=%s', + record.type, record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - warc.path, record.offset) + record.get_header(b'WARC-Payload-Digest'), + record.offset, warc.path, + record.get_header(warctools.WarcRecord.URL)) return records diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index ef0bd2d..9d85b6f 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -117,19 +117,18 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): and self._filter_accepts(recorded_url)) def _log(self, recorded_url, records): - try: - payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8") - except: - payload_digest = "-" - # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} - type_ = records[0].type.decode("utf-8") if records else '-' + try: + payload_digest = records[0].get_header(b'WARC-Payload-Digest').decode('utf-8') + except: + payload_digest = '-' + type_ = records[0].type.decode('utf-8') if records else '-' filename = records[0].warc_filename if records else '-' offset = records[0].offset if records else '-' self.logger.info( - "%s %s %s %s %s size=%s %s %s %s offset=%s", + '%s %s %s %s %s size=%s %s %s %s offset=%s', recorded_url.client_ip, recorded_url.status, - recorded_url.method, recorded_url.url.decode("utf-8"), + recorded_url.method, recorded_url.url.decode('utf-8'), recorded_url.mimetype, recorded_url.size, payload_digest, type_, filename, offset) From d3314d7904783100f3a4c83fca7a9c7c00aebd90 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Jul 2018 19:26:16 -0500 Subject: [PATCH 08/10] hopefully fix a trough dedup concurrency bug --- warcprox/dedup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c0bc817..11fff7a 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -434,8 +434,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): fs = {} with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: # send off the trough requests in parallel + key_indexes = {} for bucket in buckets: - key_index = self._build_key_index(buckets[bucket]) + key_indexes[bucket] = self._build_key_index(buckets[bucket]) future = pool.submit( self.trough_dedup_db.batch_lookup, key_index.keys(), bucket) @@ -446,6 +447,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for future in futures.as_completed(fs, timeout=20): bucket = fs[future] try: + key_index = key_indexes[bucket] for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry From fde443070c116f994adcb5e0c322828f73afaa1a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Jul 2018 20:10:30 -0500 Subject: [PATCH 09/10] dumb mistake --- warcprox/dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 11fff7a..5e26062 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -439,7 +439,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): key_indexes[bucket] = self._build_key_index(buckets[bucket]) future = pool.submit( self.trough_dedup_db.batch_lookup, - key_index.keys(), bucket) + key_indexes[bucket].keys(), bucket) fs[future] = bucket # process results as they come back From fbce243787980871bc7e69a6a8d3c6fd1251de42 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 19 Jul 2018 11:18:31 -0500 Subject: [PATCH 10/10] bump dev version after pull request --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 221ff7b..1707f1f 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b3.dev179', + version='2.4b3.dev180', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt',