diff --git a/.travis.yml b/.travis.yml index 19a3e67..c427b37 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ before_install: - docker network create --driver=bridge trough - docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb - docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- docker run --detach --network=trough --hostname=trough --name=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3.6 bash /run-trough.sh - cat /etc/hosts - echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄 - echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts @@ -44,7 +44,10 @@ install: - pip install . pytest requests warcio mock before_script: +- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true +- docker logs --timestamps --details trough - ps ww -fHe +- docker ps script: - py.test -v tests @@ -55,8 +58,8 @@ script: after_script: - ps ww -fHe - docker exec trough cat /tmp/trough-write.out -- docker exec trough cat /tmp/trough-write-provisioner-server.out -- docker exec trough cat /tmp/trough-write-provisioner-local.out +- docker exec trough cat /tmp/trough-segment-manager-server.out +- docker exec trough cat /tmp/trough-segment-manager-local.out - docker exec trough cat /tmp/trough-sync-server.out - docker exec trough cat /tmp/trough-sync-local.out - docker exec trough cat /tmp/trough-read.out diff --git a/readme.rst b/README.rst similarity index 100% rename from readme.rst rename to README.rst diff --git a/api.rst b/api.rst index bac642b..4605bd3 100644 --- a/api.rst +++ b/api.rst @@ -125,7 +125,7 @@ configuration information and metadata with each proxy request to warcprox. The value is a json blob. There are several fields understood by warcprox, and arbitrary additional fields can be included. If warcprox doesn't recognize a field it simply ignores it. Custom fields may be useful for custom warcprox -plugins (see ``_). +plugins (see ``_). Warcprox strips the ``warcprox-meta`` header out before sending the request to remote server, and does not write it in the warc request record. @@ -152,7 +152,7 @@ Example:: ``dedup-bucket`` (string) ~~~~~~~~~~~~~~~~~~~~~~~~~ Specifies the deduplication bucket. For more information about deduplication -see ``_. +see ``_. Example:: @@ -206,7 +206,7 @@ of the bucket. The other currently recognized key is ``tally-domains``, which if supplied should be a list of domains. This instructs warcprox to additionally tally substats of the given bucket by domain. -See ``_ for more information on statistics kept by +See ``_ for more information on statistics kept by warcprox. Examples:: @@ -223,7 +223,7 @@ limit on a domain specified in ``tally-domains``. ~~~~~~~~~~~~~~~~~~~~~~~ Specifies quantitative limits for warcprox to enforce. The structure of the dictionary is ``{stats_key: numerical_limit, ...}`` where stats key has the -format ``"bucket/sub-bucket/statistic"``. See `readme.rst#statistics`_ for +format ``"bucket/sub-bucket/statistic"``. See `README.rst#statistics`_ for further explanation of what "bucket", "sub-bucket", and "statistic" mean here. If processing a request would result in exceeding a limit, warcprox aborts diff --git a/setup.py b/setup.py index 6ac73a1..1707f1f 100755 --- a/setup.py +++ b/setup.py @@ -40,12 +40,12 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev176', + version='2.4b3.dev180', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', author_email='nlevitt@archive.org', - long_description=open('readme.rst').read(), + long_description=open('README.rst').read(), license='GPL', packages=['warcprox'], install_requires=deps, diff --git a/tests/run-trough.sh b/tests/run-trough.sh index 81e0e68..ce80488 100644 --- a/tests/run-trough.sh +++ b/tests/run-trough.sh @@ -3,6 +3,8 @@ # this is used by .travis.yml # +set -x + pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string pip install git+https://github.com/internetarchive/trough.git diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index c149caf..3511fe2 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -51,6 +51,7 @@ import gzip import mock import email.message import socketserver +from concurrent import futures try: import http.server as http_server @@ -886,6 +887,57 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + revisits_before = warcprox_.proxy.stats_db.value( + '__all__', 'revisit', 'urls') or 0 + + # fire off 20 initial requests simultaneously-ish + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests to the same urls but different buckets + # none should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, -i - 1) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 40) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests same as the initial requests, all should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 60) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + 20 + def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls @@ -1800,7 +1852,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[10] == b'-' assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) - assert extra_info == {'contentSize': 91} + assert extra_info == {'contentSize': 91, 'method': 'HEAD'} # WARCPROX_WRITE_RECORD url = 'http://fakeurl/' @@ -1839,8 +1891,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert fields[11] == b'-' extra_info = json.loads(fields[12].decode('utf-8')) assert set(extra_info.keys()) == { - 'contentSize', 'warcFilename', 'warcFileOffset'} + 'contentSize', 'warcFilename', 'warcFileOffset', 'method'} assert extra_info['contentSize'] == 38 + assert extra_info['method'] == 'WARCPROX_WRITE_RECORD' def test_long_warcprox_meta( warcprox_, http_daemon, archiving_proxies, playback_proxies): diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 4825e29..2dcc838 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -148,6 +148,8 @@ class BasePostfetchProcessor(threading.Thread): raise Exception('not implemented') def _run(self): + threading.current_thread().name = '%s(tid=%s)' % ( + threading.current_thread().name, gettid()) self.logger.info('%s starting up', self) self._startup() while not self.stop.is_set(): diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index f28683a..a953402 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -40,6 +40,8 @@ class CrawlLogger(object): if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset + if recorded_url.method != 'GET': + extra_info['method'] = recorded_url.method if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 81be2ea..5e26062 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -405,7 +405,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url.payload_digest, self.options.base32) if recorded_url.payload_digest else 'n/a') self.logger.debug( - 'filtered out digests (not loading dedup): %r', discards) + 'len(batch)=%s len(discards)=%s buckets=%s', + len(batch), len(discards), + {bucket: len(buckets[bucket]) for bucket in buckets}) return buckets def _build_key_index(self, batch): @@ -432,11 +434,12 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): fs = {} with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: # send off the trough requests in parallel + key_indexes = {} for bucket in buckets: - key_index = self._build_key_index(buckets[bucket]) + key_indexes[bucket] = self._build_key_index(buckets[bucket]) future = pool.submit( self.trough_dedup_db.batch_lookup, - key_index.keys(), bucket) + key_indexes[bucket].keys(), bucket) fs[future] = bucket # process results as they come back @@ -444,6 +447,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for future in futures.as_completed(fs, timeout=20): bucket = fs[future] try: + key_index = key_indexes[bucket] for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry @@ -459,8 +463,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): novel = sorted([ k for k in key_index.keys() if k not in dups]) self.logger.debug( - 'bucket %s: dups=%r novel=%r', - bucket, dups, novel) + 'bucket %s: dups(%s)=%r novel(%s)=%r', + bucket, len(dups), dups, len(novel), novel) except futures.TimeoutError as e: # the remaining threads actually keep running in this case, diff --git a/warcprox/main.py b/warcprox/main.py index 5f45a13..6fb46ef 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -193,7 +193,7 @@ def _build_arg_parser(prog='warcprox'): action='append', help=( 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' 'May be used multiple times to register multiple plugins. ' - 'See readme.rst for more information.')) + 'See README.rst for more information.')) arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.__version__)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') diff --git a/warcprox/writer.py b/warcprox/writer.py index 9ab249c..ffdca8f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -204,13 +204,14 @@ class WarcWriter: record.offset = offset record.length = warc.f.tell() - offset record.warc_filename = warc.finalname - self.logger.debug( + self.logger.trace( 'wrote warc record: warc_type=%s content_length=%s ' - 'url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), + 'digest=%s offset=%d warc=%s url=%s', + record.type, record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - warc.path, record.offset) + record.get_header(b'WARC-Payload-Digest'), + record.offset, warc.path, + record.get_header(warctools.WarcRecord.URL)) return records diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index ef0bd2d..9d85b6f 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -117,19 +117,18 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): and self._filter_accepts(recorded_url)) def _log(self, recorded_url, records): - try: - payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8") - except: - payload_digest = "-" - # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} - type_ = records[0].type.decode("utf-8") if records else '-' + try: + payload_digest = records[0].get_header(b'WARC-Payload-Digest').decode('utf-8') + except: + payload_digest = '-' + type_ = records[0].type.decode('utf-8') if records else '-' filename = records[0].warc_filename if records else '-' offset = records[0].offset if records else '-' self.logger.info( - "%s %s %s %s %s size=%s %s %s %s offset=%s", + '%s %s %s %s %s size=%s %s %s %s offset=%s', recorded_url.client_ip, recorded_url.status, - recorded_url.method, recorded_url.url.decode("utf-8"), + recorded_url.method, recorded_url.url.decode('utf-8'), recorded_url.mimetype, recorded_url.size, payload_digest, type_, filename, offset)