Merge branch 'master' into qa

* master:
  bump dev version after pull request
  dumb mistake
  hopefully fix a trough dedup concurrency bug
  some logging improvements
  test should expose trough dedup concurrency bug
  run trough with python 3.6 plus travis cleanup
  record request method in crawl log if not GET
  back to dev version number
  2.4b2 for pypi
  setuptools likes README.rst not readme.rst
This commit is contained in:
Noah Levitt 2018-07-19 11:19:27 -05:00
commit 966c386ac3
12 changed files with 96 additions and 30 deletions

View File

@ -31,7 +31,7 @@ before_install:
- docker network create --driver=bridge trough
- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb
- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed
- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh
- docker run --detach --network=trough --hostname=trough --name=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3.6 bash /run-trough.sh
- cat /etc/hosts
- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄
- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts
@ -44,7 +44,10 @@ install:
- pip install . pytest requests warcio mock
before_script:
- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true
- docker logs --timestamps --details trough
- ps ww -fHe
- docker ps
script:
- py.test -v tests
@ -55,8 +58,8 @@ script:
after_script:
- ps ww -fHe
- docker exec trough cat /tmp/trough-write.out
- docker exec trough cat /tmp/trough-write-provisioner-server.out
- docker exec trough cat /tmp/trough-write-provisioner-local.out
- docker exec trough cat /tmp/trough-segment-manager-server.out
- docker exec trough cat /tmp/trough-segment-manager-local.out
- docker exec trough cat /tmp/trough-sync-server.out
- docker exec trough cat /tmp/trough-sync-local.out
- docker exec trough cat /tmp/trough-read.out

View File

@ -125,7 +125,7 @@ configuration information and metadata with each proxy request to warcprox. The
value is a json blob. There are several fields understood by warcprox, and
arbitrary additional fields can be included. If warcprox doesn't recognize a
field it simply ignores it. Custom fields may be useful for custom warcprox
plugins (see `<readme.rst#plugins>`_).
plugins (see `<README.rst#plugins>`_).
Warcprox strips the ``warcprox-meta`` header out before sending the request to
remote server, and does not write it in the warc request record.
@ -152,7 +152,7 @@ Example::
``dedup-bucket`` (string)
~~~~~~~~~~~~~~~~~~~~~~~~~
Specifies the deduplication bucket. For more information about deduplication
see `<readme.rst#deduplication>`_.
see `<README.rst#deduplication>`_.
Example::
@ -206,7 +206,7 @@ of the bucket. The other currently recognized key is ``tally-domains``, which
if supplied should be a list of domains. This instructs warcprox to
additionally tally substats of the given bucket by domain.
See `<readme.rst#statistics>`_ for more information on statistics kept by
See `<README.rst#statistics>`_ for more information on statistics kept by
warcprox.
Examples::
@ -223,7 +223,7 @@ limit on a domain specified in ``tally-domains``.
~~~~~~~~~~~~~~~~~~~~~~~
Specifies quantitative limits for warcprox to enforce. The structure of the
dictionary is ``{stats_key: numerical_limit, ...}`` where stats key has the
format ``"bucket/sub-bucket/statistic"``. See `readme.rst#statistics`_ for
format ``"bucket/sub-bucket/statistic"``. See `README.rst#statistics`_ for
further explanation of what "bucket", "sub-bucket", and "statistic" mean here.
If processing a request would result in exceeding a limit, warcprox aborts

View File

@ -40,12 +40,12 @@ except:
setuptools.setup(
name='warcprox',
version='2.4b2.dev176',
version='2.4b3.dev180',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
author_email='nlevitt@archive.org',
long_description=open('readme.rst').read(),
long_description=open('README.rst').read(),
license='GPL',
packages=['warcprox'],
install_requires=deps,

View File

@ -3,6 +3,8 @@
# this is used by .travis.yml
#
set -x
pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string
pip install git+https://github.com/internetarchive/trough.git

View File

@ -51,6 +51,7 @@ import gzip
import mock
import email.message
import socketserver
from concurrent import futures
try:
import http.server as http_server
@ -886,6 +887,57 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
finally:
fh.close()
def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
revisits_before = warcprox_.proxy.stats_db.value(
'__all__', 'revisit', 'urls') or 0
# fire off 20 initial requests simultaneously-ish
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before
# fire off 20 requests to the same urls but different buckets
# none should be deduped
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, -i - 1)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 40)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before
# fire off 20 requests same as the initial requests, all should be deduped
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 60)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + 20
def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
@ -1800,7 +1852,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert extra_info == {'contentSize': 91}
assert extra_info == {'contentSize': 91, 'method': 'HEAD'}
# WARCPROX_WRITE_RECORD
url = 'http://fakeurl/'
@ -1839,8 +1891,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert set(extra_info.keys()) == {
'contentSize', 'warcFilename', 'warcFileOffset'}
'contentSize', 'warcFilename', 'warcFileOffset', 'method'}
assert extra_info['contentSize'] == 38
assert extra_info['method'] == 'WARCPROX_WRITE_RECORD'
def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies):

View File

@ -148,6 +148,8 @@ class BasePostfetchProcessor(threading.Thread):
raise Exception('not implemented')
def _run(self):
threading.current_thread().name = '%s(tid=%s)' % (
threading.current_thread().name, gettid())
self.logger.info('%s starting up', self)
self._startup()
while not self.stop.is_set():

View File

@ -40,6 +40,8 @@ class CrawlLogger(object):
if records:
extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset
if recorded_url.method != 'GET':
extra_info['method'] = recorded_url.method
if recorded_url.response_recorder:
content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset
payload_digest = warcprox.digest_str(

View File

@ -405,7 +405,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
recorded_url.payload_digest, self.options.base32)
if recorded_url.payload_digest else 'n/a')
self.logger.debug(
'filtered out digests (not loading dedup): %r', discards)
'len(batch)=%s len(discards)=%s buckets=%s',
len(batch), len(discards),
{bucket: len(buckets[bucket]) for bucket in buckets})
return buckets
def _build_key_index(self, batch):
@ -432,11 +434,12 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
fs = {}
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
# send off the trough requests in parallel
key_indexes = {}
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
key_indexes[bucket] = self._build_key_index(buckets[bucket])
future = pool.submit(
self.trough_dedup_db.batch_lookup,
key_index.keys(), bucket)
key_indexes[bucket].keys(), bucket)
fs[future] = bucket
# process results as they come back
@ -444,6 +447,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for future in futures.as_completed(fs, timeout=20):
bucket = fs[future]
try:
key_index = key_indexes[bucket]
for entry in future.result():
for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry
@ -459,8 +463,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
novel = sorted([
k for k in key_index.keys() if k not in dups])
self.logger.debug(
'bucket %s: dups=%r novel=%r',
bucket, dups, novel)
'bucket %s: dups(%s)=%r novel(%s)=%r',
bucket, len(dups), dups, len(novel), novel)
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,

View File

@ -193,7 +193,7 @@ def _build_arg_parser(prog='warcprox'):
action='append', help=(
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
'May be used multiple times to register multiple plugins. '
'See readme.rst for more information.'))
'See README.rst for more information.'))
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')

View File

@ -204,13 +204,14 @@ class WarcWriter:
record.offset = offset
record.length = warc.f.tell() - offset
record.warc_filename = warc.finalname
self.logger.debug(
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE),
'digest=%s offset=%d warc=%s url=%s',
record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
warc.path, record.offset)
record.get_header(b'WARC-Payload-Digest'),
record.offset, warc.path,
record.get_header(warctools.WarcRecord.URL))
return records

View File

@ -117,19 +117,18 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
and self._filter_accepts(recorded_url))
def _log(self, recorded_url, records):
try:
payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8")
except:
payload_digest = "-"
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
type_ = records[0].type.decode("utf-8") if records else '-'
try:
payload_digest = records[0].get_header(b'WARC-Payload-Digest').decode('utf-8')
except:
payload_digest = '-'
type_ = records[0].type.decode('utf-8') if records else '-'
filename = records[0].warc_filename if records else '-'
offset = records[0].offset if records else '-'
self.logger.info(
"%s %s %s %s %s size=%s %s %s %s offset=%s",
'%s %s %s %s %s size=%s %s %s %s offset=%s',
recorded_url.client_ip, recorded_url.status,
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.method, recorded_url.url.decode('utf-8'),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)