From 25281376f637ac75dc023d6c87ea231a09c338b3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 23 Jan 2019 11:07:46 +0000 Subject: [PATCH 01/54] Configurable max threads in CdxServerDedupLoader `CdxServerDedupLoader` used `max_workers=400` by default. We make it a CLI option `--cdxserver-dedup-max-threads` with a default value of 400. We need to be able to tweak this setting because it creates too many CDX queries which cause problems with our production CDX servers. --- warcprox/dedup.py | 2 +- warcprox/main.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5e26062..4c9f9f1 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -296,7 +296,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) DedupableMixin.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=400) + self.pool = futures.ThreadPoolExecutor(max_workers=options.cdxserver_dedup_max_threads) self.batch = set() self.cdx_dedup = cdx_dedup diff --git a/warcprox/main.py b/warcprox/main.py index 8dab727..4b13479 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -168,6 +168,10 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): help=suppress( 'value of Cookie header to include in requests to the cdx ' 'server, when using --cdxserver-dedup')) + hidden.add_argument( + '--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads', + type=int, default=400, help=suppress( + 'maximum number of cdx server dedup threads')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) From e04ffa5a36393cb673aad2df89026be6b436969c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 23 Jan 2019 18:34:33 +0000 Subject: [PATCH 02/54] Change default --cdxserver-dedup-max-threads from 400 to 50 --- warcprox/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/main.py b/warcprox/main.py index 4b13479..e38bb02 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -170,7 +170,7 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): 'server, when using --cdxserver-dedup')) hidden.add_argument( '--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads', - type=int, default=400, help=suppress( + type=int, default=50, help=suppress( 'maximum number of cdx server dedup threads')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, From 53f13d3536de2bf5dcb68bf8d54d3628b5db0127 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 7 Feb 2019 09:08:11 +0000 Subject: [PATCH 03/54] Use in-memory LRU cache in CDX Server dedup Add option `--cdxserver-dedup-lru-cache-size=N` (default None) to enable in-memory caching of CDX dedup requests using stdlib `lru_cache` method. Cache memory info is available on `INFO` logging outputs like: ``` CacheInfo(hits=3172, misses=3293, maxsize=1024, currsize=1024) `` --- warcprox/dedup.py | 10 +++++++++- warcprox/main.py | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 4c9f9f1..d9e9335 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -34,6 +34,7 @@ import urllib3 from urllib3.exceptions import HTTPError import collections from concurrent import futures +from functools import lru_cache urllib3.disable_warnings() @@ -236,6 +237,8 @@ class CdxServerDedup(DedupDb): headers['Cookie'] = options.cdxserver_dedup_cookies self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, timeout=2.0, headers=headers) + if options.cdxserver_dedup_lru_cache_size: + self.cached_lookup = lru_cache(maxsize=options.cdxserver_dedup_lru_cache_size)(self.lookup) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -299,6 +302,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) self.pool = futures.ThreadPoolExecutor(max_workers=options.cdxserver_dedup_max_threads) self.batch = set() self.cdx_dedup = cdx_dedup + self.use_lru_cache = options.cdxserver_dedup_lru_cache_size != None def _get_process_put(self): recorded_url = self.inq.get(block=True, timeout=0.5) @@ -315,7 +319,11 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) try: digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url) + if self.use_lru_cache: + dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url) + self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) + else: + dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url) if dedup_info: recorded_url.dedup_info = dedup_info except ValueError as exc: diff --git a/warcprox/main.py b/warcprox/main.py index e38bb02..4f9be61 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -172,6 +172,10 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): '--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads', type=int, default=50, help=suppress( 'maximum number of cdx server dedup threads')) + hidden.add_argument( + '--cdxserver-dedup-lru-cache-size', dest='cdxserver_dedup_lru_cache_size', + type=int, help=suppress( + 'enable in-memory LRU cache to reduce duplicate CDX server requests')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) From 1133715331a69afe4e8421a77d5f64caf3cf2052 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 12 Feb 2019 08:28:15 +0000 Subject: [PATCH 04/54] Enable cdx dedup lru cache by default use default value 1024 --- warcprox/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/main.py b/warcprox/main.py index 4f9be61..e73170b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -174,7 +174,7 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): 'maximum number of cdx server dedup threads')) hidden.add_argument( '--cdxserver-dedup-lru-cache-size', dest='cdxserver_dedup_lru_cache_size', - type=int, help=suppress( + type=int, default=1024, help=suppress( 'enable in-memory LRU cache to reduce duplicate CDX server requests')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, From 660989939efc9a98d3145ed3169716149ba75b6f Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 12 Feb 2019 20:43:27 +0000 Subject: [PATCH 05/54] Remove cli option cdxserver-dedup-lru-cache-size LRU cache is always enabled for cdxserver dedup module with a default cache size of 1024. --- warcprox/dedup.py | 11 +++-------- warcprox/main.py | 4 ---- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d9e9335..d86f4f8 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -237,8 +237,7 @@ class CdxServerDedup(DedupDb): headers['Cookie'] = options.cdxserver_dedup_cookies self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, timeout=2.0, headers=headers) - if options.cdxserver_dedup_lru_cache_size: - self.cached_lookup = lru_cache(maxsize=options.cdxserver_dedup_lru_cache_size)(self.lookup) + self.cached_lookup = lru_cache(maxsize=1024)(self.lookup) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -302,7 +301,6 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) self.pool = futures.ThreadPoolExecutor(max_workers=options.cdxserver_dedup_max_threads) self.batch = set() self.cdx_dedup = cdx_dedup - self.use_lru_cache = options.cdxserver_dedup_lru_cache_size != None def _get_process_put(self): recorded_url = self.inq.get(block=True, timeout=0.5) @@ -319,11 +317,8 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) try: digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if self.use_lru_cache: - dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url) - self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) - else: - dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url) + dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url) + self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) if dedup_info: recorded_url.dedup_info = dedup_info except ValueError as exc: diff --git a/warcprox/main.py b/warcprox/main.py index e73170b..e38bb02 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -172,10 +172,6 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): '--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads', type=int, default=50, help=suppress( 'maximum number of cdx server dedup threads')) - hidden.add_argument( - '--cdxserver-dedup-lru-cache-size', dest='cdxserver_dedup_lru_cache_size', - type=int, default=1024, help=suppress( - 'enable in-memory LRU cache to reduce duplicate CDX server requests')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) From 99fb998e1dc4e91da18182cfa59cf1201d0edc4f Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 12 Feb 2019 21:46:49 +0000 Subject: [PATCH 06/54] log LRU cache info every 1000 requests to avoid writing to the log too often. --- warcprox/dedup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d86f4f8..0bb15f6 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -318,7 +318,9 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url) - self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) + cache_info = self.cdx_dedup.cached_lookup.cache_info() + if (cache_info.hits + cache_info.misses) % 1000 == 0: + self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) if dedup_info: recorded_url.dedup_info = dedup_info except ValueError as exc: From 2824ee6a5bd5dd709fd3a7c61cef54266f2e2d49 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 12 Feb 2019 14:59:54 -0800 Subject: [PATCH 07/54] omfg too many warcs --- warcprox/writer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/warcprox/writer.py b/warcprox/writer.py index 96293db..fecb533 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -149,6 +149,7 @@ class WarcWriter: record.get_header(b'WARC-Payload-Digest'), record.offset, self.path, record.get_header(warctools.WarcRecord.URL)) self.f.flush() + self.last_activity = time.time() return records From 5a7a4ff7104389a4cea2c5aa18bd6a3be9c0b64c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 12 Feb 2019 15:00:22 -0800 Subject: [PATCH 08/54] pypi release --- setup.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/setup.py b/setup.py index ceb9886..b26cf7a 100755 --- a/setup.py +++ b/setup.py @@ -26,13 +26,13 @@ import setuptools deps = [ 'certauth==1.1.6', 'warctools>=4.10.0,<=4.10.0', - 'urlcanon>=0.1.dev16', - 'doublethink>=0.2.0.dev87', - 'urllib3>=1.23', - 'requests>=2.0.1', - 'PySocks>=1.6.8', - 'cryptography>=2.3', - 'idna>=2.5', + 'urlcanon>=0.1.dev16,<=0.3.dev28', + 'doublethink>=0.2.0.dev87,<=0.2.0.dev94', + 'urllib3>=1.14,<=1.24.1', + 'requests>=2.0.1,<=2.21.0', + 'PySocks>=1.6.8,<=1.6.8', + 'cryptography>=2.3,<=2.5', + 'idna>=2.5,<=2.8', ] try: import concurrent.futures @@ -41,7 +41,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b4.dev195', + version='2.4b6', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From adca46427defca5961a5e9c062848d474e3dd6db Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 12 Feb 2019 15:04:22 -0800 Subject: [PATCH 09/54] back to dev version number --- setup.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/setup.py b/setup.py index b26cf7a..5052a64 100755 --- a/setup.py +++ b/setup.py @@ -25,14 +25,14 @@ import setuptools deps = [ 'certauth==1.1.6', - 'warctools>=4.10.0,<=4.10.0', - 'urlcanon>=0.1.dev16,<=0.3.dev28', - 'doublethink>=0.2.0.dev87,<=0.2.0.dev94', - 'urllib3>=1.14,<=1.24.1', - 'requests>=2.0.1,<=2.21.0', - 'PySocks>=1.6.8,<=1.6.8', - 'cryptography>=2.3,<=2.5', - 'idna>=2.5,<=2.8', + 'warctools>=4.10.0', + 'urlcanon>=0.1.dev16', + 'doublethink>=0.2.0.dev87', + 'urllib3>=1.14', + 'requests>=2.0.1', + 'PySocks>=1.6.8', + 'cryptography>=2.3', + 'idna>=2.5', ] try: import concurrent.futures @@ -41,7 +41,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b6', + version='2.4b7.dev196', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From c70bf2e2b932c55b6d4db167af687ef5cd6ec6ac Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 27 Feb 2019 12:36:35 -0800 Subject: [PATCH 10/54] debugging a shutdown issue --- setup.py | 2 +- warcprox/main.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 5052a64..4fa0885 100755 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b7.dev196', + version='2.4b7.dev197', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/main.py b/warcprox/main.py index e38bb02..7fe5011 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -316,7 +316,11 @@ def main(argv=None): # SIGQUIT does not exist on some platforms (windows) pass - controller.run_until_shutdown() + try: + controller.run_until_shutdown() + except: + logging.fatal('unhandled exception in controller', exc_info=True) + sys.exit(1) def ensure_rethinkdb_tables(argv=None): ''' From 6e6b43eb795652f8c0f489ac3e2abef6364481cf Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 20 Mar 2019 11:53:32 +0000 Subject: [PATCH 11/54] Add option to load logging conf from JSON file New option `--logging-conf-file` to load `logging` conf from a JSON file. Prefer JSON over the `configparser` format supported by `logging.config.fileConfig` because JSON format is much better (nesting is supported) and its easier to detect errors. --- warcprox/main.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/warcprox/main.py b/warcprox/main.py index 7fe5011..98867b6 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -29,7 +29,9 @@ try: except ImportError: import Queue as queue +import json import logging +import logging.config import sys import hashlib import argparse @@ -239,6 +241,9 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): arg_parser.add_argument( '--trace', dest='trace', action='store_true', help='very verbose logging') + arg_parser.add_argument( + '--logging-conf-file', dest='logging_conf_file', default=None, + help=('reads logging configuration from a JSON file')) arg_parser.add_argument( '--version', action='version', version="warcprox {}".format(warcprox.__version__)) @@ -302,6 +307,11 @@ def main(argv=None): '%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) + if args.logging_conf_file: + with open(args.logging_conf_file, 'r') as fd: + conf = json.load(fd) + logging.config.dictConfig(conf) + # see https://github.com/pyca/cryptography/issues/2911 cryptography.hazmat.backends.openssl.backend.activate_builtin_random() From c8f1c644942c318ab46275056c0865b0fee6428e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 12:15:39 -0700 Subject: [PATCH 12/54] travis-ci python 3.7 --- .travis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0ad15d4..2cd7d71 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,20 +1,19 @@ sudo: required - +dist: xenial language: python python: +- 3.7 - 3.6 - 3.5 - 3.4 - 2.7 - pypy - pypy3 -- 3.7-dev - nightly matrix: allow_failures: - python: nightly - - python: 3.7-dev - python: 2.7 - python: pypy From 878ab0977f17f46dc981699cd6ca617b576b5994 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 21 Mar 2019 19:18:55 +0000 Subject: [PATCH 13/54] Use YAML instead of JSON Add PyYAML<=3.13 dependency. --- setup.py | 1 + warcprox/main.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 4fa0885..6274eeb 100755 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ deps = [ 'PySocks>=1.6.8', 'cryptography>=2.3', 'idna>=2.5', + 'PyYAML<=3.13', ] try: import concurrent.futures diff --git a/warcprox/main.py b/warcprox/main.py index 98867b6..06d8bfc 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -29,7 +29,6 @@ try: except ImportError: import Queue as queue -import json import logging import logging.config import sys @@ -41,6 +40,7 @@ import traceback import signal import threading import certauth.certauth +import yaml import warcprox import doublethink import cryptography.hazmat.backends.openssl @@ -243,7 +243,7 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): help='very verbose logging') arg_parser.add_argument( '--logging-conf-file', dest='logging_conf_file', default=None, - help=('reads logging configuration from a JSON file')) + help=('reads logging configuration from a YAML file')) arg_parser.add_argument( '--version', action='version', version="warcprox {}".format(warcprox.__version__)) @@ -309,7 +309,7 @@ def main(argv=None): if args.logging_conf_file: with open(args.logging_conf_file, 'r') as fd: - conf = json.load(fd) + conf = yaml.load(fd) logging.config.dictConfig(conf) # see https://github.com/pyca/cryptography/issues/2911 From b0367a9c8293833f0ca7c51057120c71bc8c921c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 12:25:51 -0700 Subject: [PATCH 14/54] fix pypy3? see: https://docs.travis-ci.com/user/languages/python/ --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2cd7d71..1a351d3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ python: - 3.4 - 2.7 - pypy -- pypy3 +- pypy3.5 - nightly matrix: From 436a27b19e12ac831e3d511fd4541fc56902c7d5 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 21 Mar 2019 19:34:52 +0000 Subject: [PATCH 15/54] Upgrade PyYAML to >=5.1 --- setup.py | 2 +- warcprox/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 6274eeb..56e8390 100755 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ deps = [ 'PySocks>=1.6.8', 'cryptography>=2.3', 'idna>=2.5', - 'PyYAML<=3.13', + 'PyYAML>=5.1', ] try: import concurrent.futures diff --git a/warcprox/main.py b/warcprox/main.py index 06d8bfc..0ef5c58 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -309,7 +309,7 @@ def main(argv=None): if args.logging_conf_file: with open(args.logging_conf_file, 'r') as fd: - conf = yaml.load(fd) + conf = yaml.safe_load(fd) logging.config.dictConfig(conf) # see https://github.com/pyca/cryptography/issues/2911 From 1e0a0ca63ac924e4a62d72000796b57d8dac85aa Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 12:38:29 -0700 Subject: [PATCH 16/54] every change is a point release now --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 56e8390..f9916ae 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b7.dev197', + version='2.4.0', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From cb2a07bff2e6df60171274de93d0760f0d8f8e58 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 12:59:32 -0700 Subject: [PATCH 17/54] account for surt fix in urlcanon 0.3.0 --- setup.py | 2 +- tests/test_warcprox.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index f9916ae..d3e73cd 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ import setuptools deps = [ 'certauth==1.1.6', 'warctools>=4.10.0', - 'urlcanon>=0.1.dev16', + 'urlcanon>=0.3.0', 'doublethink>=0.2.0.dev87', 'urllib3>=1.14', 'requests>=2.0.1', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 7c6d21a..3c38d4e 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -965,12 +965,12 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): }, { "url_match": "SURT_MATCH", - "value": "http://(localhost:%s,)/fuh/" % (http_daemon.server_port), + "value": "http://(localhost,:%s)/fuh/" % (http_daemon.server_port), }, { "url_match": "SURT_MATCH", # this rule won't match because of http scheme, https port - "value": "http://(localhost:%s,)/fuh/" % (https_daemon.server_port), + "value": "http://(localhost,:%s)/fuh/" % (https_daemon.server_port), }, { "domain": "bad.domain.com", From a25971e06bc098e8c372dfe1b1b43095e350af31 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 14:17:24 -0700 Subject: [PATCH 18/54] appease some warnings --- tests/test_warcprox.py | 18 +++++++++++------- warcprox/bigtable.py | 4 ++-- warcprox/dedup.py | 6 +++--- warcprox/main.py | 4 ++-- warcprox/mitmproxy.py | 10 +++++----- warcprox/stats.py | 2 +- warcprox/trough.py | 2 +- warcprox/warcproxy.py | 2 +- 8 files changed, 26 insertions(+), 22 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 3c38d4e..e1ff80a 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -68,6 +68,10 @@ import certauth.certauth import warcprox import warcprox.main +# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings +import urllib3 +urllib3.disable_warnings() + try: import http.client as http_client except ImportError: @@ -144,7 +148,7 @@ def dump_state(signum=None, frame=None): stack = traceback.format_stack(sys._current_frames()[th.ident]) state_strs.append("".join(stack)) - logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + logging.warning("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) signal.signal(signal.SIGQUIT, dump_state) @@ -446,7 +450,7 @@ def warcprox_(request, http_daemon, https_daemon): logging.info('dropping rethinkdb database %r', parsed.database) rr.db_drop(parsed.database).run() except Exception as e: - logging.warn( + logging.warning( 'problem deleting rethinkdb database %r: %s', parsed.database, e) logging.info('deleting working directory %r', work_dir) @@ -1762,7 +1766,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): crawl_log = open(default_crawl_log_path, 'rb').read() # tests will fail in year 3000 :) - assert re.match(b'\A2[^\n]+\n\Z', crawl_log) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log) assert crawl_log[24:31] == b' 200 ' assert crawl_log[31:42] == b' 54 ' fields = crawl_log.split() @@ -1782,7 +1786,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert extra_info['contentSize'] == 145 crawl_log_1 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_1) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_1) assert crawl_log_1[24:31] == b' 200 ' assert crawl_log_1[31:42] == b' 54 ' fields = crawl_log_1.split() @@ -1820,7 +1824,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): crawl_log_2 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_2) assert crawl_log_2[24:31] == b' 200 ' assert crawl_log_2[31:42] == b' 54 ' fields = crawl_log_2.split() @@ -1853,7 +1857,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert os.path.exists(file) crawl_log_3 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_3) assert crawl_log_3[24:31] == b' 200 ' assert crawl_log_3[31:42] == b' 0 ' fields = crawl_log_3.split() @@ -1893,7 +1897,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert os.path.exists(file) crawl_log_4 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_4) assert crawl_log_4[24:31] == b' 204 ' assert crawl_log_4[31:42] == b' 38 ' fields = crawl_log_4.split() diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 0d98270..4df8ab3 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -71,7 +71,7 @@ class RethinkCaptures: "unexpected result saving batch of %s: %s " "entries" % (len(self._batch), result)) if result["replaced"] > 0 or result["unchanged"] > 0: - self.logger.warn( + self.logger.warning( "inserted=%s replaced=%s unchanged=%s in big " "captures table (normally replaced=0 and " "unchanged=0)", result["inserted"], @@ -148,7 +148,7 @@ class RethinkCaptures: recorded_url.payload_digest.digest() ).decode("utf-8") else: - self.logger.warn( + self.logger.warning( "digest type is %r but big captures table is indexed " "by sha1", recorded_url.payload_digest.name) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0bb15f6..7889cd9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -374,7 +374,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): except futures.TimeoutError as e: # the remaining threads actually keep running in this case, # there's no way to stop them, but that should be harmless - logging.warn( + logging.warning( 'timed out saving dedup info to trough', exc_info=True) class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): @@ -458,7 +458,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url.dedup_info = entry except Exception as e: # batch_lookup raised exception or something - logging.warn( + logging.warning( 'problem looking up dedup info for %s urls ' 'in bucket %s', len(buckets[bucket]), bucket, exc_info=True) @@ -474,7 +474,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): except futures.TimeoutError as e: # the remaining threads actually keep running in this case, # there's no way to stop them, but that should be harmless - self.logger.warn( + self.logger.warning( 'timed out loading dedup info from trough', exc_info=True) class TroughDedupDb(DedupDb, DedupableMixin): diff --git a/warcprox/main.py b/warcprox/main.py index 0ef5c58..d61e6b1 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -264,7 +264,7 @@ def dump_state(signum=None, frame=None): except Exception as e: state_strs.append('' % e) - logging.warn( + logging.warning( 'dumping state (caught signal %s)\n%s', signum, '\n'.join(state_strs)) @@ -402,7 +402,7 @@ def ensure_rethinkdb_tables(argv=None): did_something = True if args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) - logging.warn( + logging.warning( 'trough is responsible for creating most of the rethinkdb ' 'tables that it uses') did_something = True diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 1fc0c72..b8e7d74 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -100,7 +100,7 @@ class ProxyingRecorder(object): self.proxy_client.sendall(hunk) except BaseException as e: self._proxy_client_conn_open = False - self.logger.warn( + self.logger.warning( '%s sending data to proxy client for url %s', e, self.url) self.logger.info( @@ -283,7 +283,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_conn.sock = ssl.wrap_socket( self._remote_server_conn.sock) except ssl.SSLError: - self.logger.warn( + self.logger.warning( "failed to establish ssl connection to %s; " "python ssl library does not support SNI, " "consider upgrading to python 2.7.9+ or 3.4+", @@ -332,7 +332,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.send_error(500, str(e)) except Exception as f: - self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f)) + self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f)) return # Reload! @@ -386,7 +386,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return self._proxy_request() except Exception as e: if self.server.shutting_down: - self.logger.warn( + self.logger.warning( 'sending 503 warcprox shutting down %r: %r', self.requestline, e) self.send_error(503, 'warcprox shutting down') @@ -521,7 +521,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return self.do_COMMAND def log_error(self, fmt, *args): - self.logger.warn(fmt, *args) + self.logger.warning(fmt, *args) class PooledMixIn(socketserver.ThreadingMixIn): logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") diff --git a/warcprox/stats.py b/warcprox/stats.py index 64ff2d7..1a71cad 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -81,7 +81,7 @@ def unravel_buckets(url, warcprox_meta): for bucket in warcprox_meta["stats"]["buckets"]: if isinstance(bucket, dict): if not 'bucket' in bucket: - self.logger.warn( + self.logger.warning( 'ignoring invalid stats bucket in ' 'warcprox-meta header %s', bucket) continue diff --git a/warcprox/trough.py b/warcprox/trough.py index b7db127..d0839d1 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -190,7 +190,7 @@ class TroughClient(object): return if response.status_code != 200: self._write_url_cache.pop(segment_id, None) - self.logger.warn( + self.logger.warning( 'unexpected response %r %r %r from %r to sql=%r', response.status_code, response.reason, response.text, write_url, sql) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index c0cdcb1..8066ace 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -530,6 +530,6 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): self.remote_connection_pool.clear() def handle_error(self, request, client_address): - self.logger.warn( + self.logger.warning( "exception processing request %s from %s", request, client_address, exc_info=True) From 3f08639553fd4108248d613450edf4ca3ebeba8e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 16:00:36 -0700 Subject: [PATCH 19/54] =?UTF-8?q?still=20seeing=20a=20warning=20but=20?= =?UTF-8?q?=F0=9F=A4=B7=E2=80=8D=E2=99=82=EF=B8=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_warcprox.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index e1ff80a..18bcf37 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -68,9 +68,6 @@ import certauth.certauth import warcprox import warcprox.main -# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings -import urllib3 -urllib3.disable_warnings() try: import http.client as http_client @@ -97,9 +94,11 @@ logging.basicConfig( stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +logging.getLogger("urllib3").setLevel(logging.WARN) logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) -warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning) -warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) +import urllib3 ; urllib3.disable_warnings() +import requests.packages.urllib3 ; requests.packages.urllib3.disable_warnings() def wait(callback, timeout=10): start = time.time() From 794cc29c80e76e883eeba389f7a0bf1af44369c0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 21 Mar 2019 16:04:05 -0700 Subject: [PATCH 20/54] bump version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d3e73cd..7cb8570 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.0', + version='2.4.1', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 0cab6fc4bf4b13e1ecadd862e84d44ce70b8e08c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 8 Apr 2019 16:13:14 +0000 Subject: [PATCH 21/54] Increase the MAXHEADERS limit of http client `http.client` has an arbitrary limit of MAXHEADERS=100. If a target URL has more it raises an HTTPException and the request fails. (The target pages are perfectly fine besides having more than 100 headers). https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L113 We increase this limit to 7000. We currently use this in production WBM. We bumped into the same issue trying to replay pages with too many HTTP headers. We increased the limit progressively from 100 to 500, 1000 etc and we found that 7000 is a good place to stop. --- warcprox/mitmproxy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b8e7d74..5b24f30 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -45,6 +45,11 @@ try: http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client +# http_client has an arbitrary limit of 100 HTTP Headers which is too low and +# it raises an HTTPException if the target URL has more. +# https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L113 +http_client._MAXHEADERS = 7000 + import json import socket import logging From ac3d238a3dde430fd3fce1c8a5fd24ebfb18d51e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 8 Apr 2019 11:11:51 -0700 Subject: [PATCH 22/54] new snakebite git url --- tests/Dockerfile | 2 +- tests/run-trough.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Dockerfile b/tests/Dockerfile index 6a97ac0..df9a688 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -80,7 +80,7 @@ RUN apt-get install -y libsqlite3-dev # trough itself RUN virtualenv -p python3 /opt/trough-ve3 \ && . /opt/trough-ve3/bin/activate \ - && pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \ + && pip install git+https://github.com/nlevitt/snakebite.git@py3 \ && pip install git+https://github.com/internetarchive/trough.git RUN mkdir -vp /etc/service/trough-sync-local \ diff --git a/tests/run-trough.sh b/tests/run-trough.sh index ce80488..64939a6 100644 --- a/tests/run-trough.sh +++ b/tests/run-trough.sh @@ -5,7 +5,7 @@ set -x -pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string +pip install git+https://github.com/nlevitt/snakebite.git@py3 pip install git+https://github.com/internetarchive/trough.git mkdir /etc/trough From 2ca84ae023b4e9cd50902d4dc8627754fa65a07a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 8 Apr 2019 11:50:27 -0700 Subject: [PATCH 23/54] bump version after merge --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7cb8570..f2f490b 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.1', + version='2.4.2', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 7560c0946d4b9d17d907555350ec1d4cd69b8021 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 9 Apr 2019 21:16:45 +0000 Subject: [PATCH 24/54] avoid exception sending error to client this is a slightly different approach to https://github.com/internetarchive/warcprox/pull/121 --- warcprox/mitmproxy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 5b24f30..a9e7e38 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -415,9 +415,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): try: return http_server.BaseHTTPRequestHandler.send_error( self, code, message, explain) - except: - self.logger.error( - 'send_error(%r, %r, %r) raised exception', exc_info=True) + except Exception as e: + level = logging.ERROR + if isinstance(e, OSError) and e.errno == 9: + level = logging.TRACE + self.logger.log( + level, 'send_error(%r, %r, %r) raised exception', + exc_info=True) return None def _proxy_request(self, extra_response_headers={}): From 98b3c1f80b5634c9a9ec31fa2b907db2fcd5c75f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 9 Apr 2019 21:52:31 +0000 Subject: [PATCH 25/54] bump version after merge --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f2f490b..594b8d8 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.2', + version='2.4.3', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 5ced2588d4cad5dc14bf47ebde124a3eaa3823b8 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 13 Apr 2019 17:33:38 -0700 Subject: [PATCH 26/54] failing test test_incomplete_read --- tests/test_warcprox.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 18bcf37..7e6b19f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -68,7 +68,6 @@ import certauth.certauth import warcprox import warcprox.main - try: import http.client as http_client except ImportError: @@ -282,6 +281,15 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): payload = b'Test.' actual_headers = (b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/incomplete-read': + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Transfer-Encoding: chunked\r\n' + + b'\r\n') + # payload = b'''1\r\na''' + payload = chunkify( + b'Server closes connection when client expects next chunk') + payload = payload[:-7] else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -295,7 +303,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): headers, payload = self.build_response() self.connection.sendall(headers) self.connection.sendall(payload) - if self.path in ('/missing-content-length', '/empty-response'): + if self.path in ( + '/missing-content-length', '/empty-response', + '/incomplete-read'): # server must close the connection, else client has no idea if # there is more data coming self.connection.shutdown(socket.SHUT_RDWR) @@ -1614,13 +1624,11 @@ def test_controller_with_defaults(): assert not wwp.writer_pool.default_warc_writer.record_builder.base32 assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' - class EarlyPlugin(warcprox.BaseStandardPostfetchProcessor): CHAIN_POSITION = 'early' def _process_url(self): pass - def test_load_plugin(): options = warcprox.Options(port=0, plugins=[ 'warcprox.stats.RunningStats', @@ -2226,6 +2234,18 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies): with pytest.raises(StopIteration): next(rec_iter) +def test_incomplete_read(http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + # see https://github.com/internetarchive/warcprox/pull/123 + url = 'http://localhost:%s/incomplete-read' % http_daemon.server_port + with pytest.raises(requests.exceptions.ChunkedEncodingError): + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + if __name__ == '__main__': pytest.main() From 0d268659abf54c9771cee9dbbe0f755d7f769080 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 13 Apr 2019 17:46:52 -0700 Subject: [PATCH 27/54] handle incomplete read see Vangelis's writeup at https://github.com/internetarchive/warcprox/pull/123 --- warcprox/mitmproxy.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index a9e7e38..ae6a9f0 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -487,9 +487,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): tmp_file_max_memory_size=self._tmp_file_max_memory_size) prox_rec_res.begin(extra_response_headers=extra_response_headers) - buf = prox_rec_res.read(65536) + buf = None while buf != b'': - buf = prox_rec_res.read(65536) + try: + buf = prox_rec_res.read(65536) + except http_client.IncompleteRead as e: + self.logger.warn('%s from %s', e, self.url) + buf = b'' + if (self._max_resource_size and prox_rec_res.recorder.len > self._max_resource_size): prox_rec_res.truncated = b'length' From 5de25694302c099014e117dd17b2fc66e369b432 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 13 Apr 2019 18:10:46 -0700 Subject: [PATCH 28/54] bump version after #124 merge --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 594b8d8..9180617 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.3', + version='2.4.4', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From f207e32f50c6134cc50f1b7e09d9869927f55920 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 15 Apr 2019 00:17:50 -0700 Subject: [PATCH 29/54] followup on IncompleteRead --- setup.py | 2 +- tests/test_warcprox.py | 5 +++++ warcprox/mitmproxy.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 9180617..b635ff1 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.4', + version='2.4.5', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 7e6b19f..6c49f0a 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -2243,6 +2243,11 @@ def test_incomplete_read(http_daemon, warcprox_, archiving_proxies): response = requests.get( url, proxies=archiving_proxies, verify=False, timeout=10) + # although `requests.get` raises exception here, other clients like + # browsers put up with the server misbehavior; warcprox does too, and will + # record the response verbatim in the warc; this `wait()` call tests + # that a warc record is written + # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index ae6a9f0..705589e 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -493,7 +493,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): buf = prox_rec_res.read(65536) except http_client.IncompleteRead as e: self.logger.warn('%s from %s', e, self.url) - buf = b'' + buf = e.partial if (self._max_resource_size and prox_rec_res.recorder.len > self._max_resource_size): From 3298128e0ce21f73dfc242166ff1f06073f3282c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 24 Apr 2019 10:40:22 -0700 Subject: [PATCH 30/54] deal with bad content-type header we had bad stuff get into a crawl log because of a url that returned a Content-Type header value with spaces in it (but no semicolon) --- setup.py | 2 +- warcprox/warcproxy.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index b635ff1..e762b13 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.5', + version='2.4.6', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 8066ace..4a5312e 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -47,6 +47,7 @@ from urllib3 import PoolManager import tempfile import hashlib import doublethink +import re class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -387,9 +388,8 @@ class RecordedUrl: self.mimetype = content_type if self.mimetype: - n = self.mimetype.find(";") - if n >= 0: - self.mimetype = self.mimetype[:n] + # chop off subtype, and ensure there's no whitespace + self.mimetype = re.split(r'[;\s]', self.mimetype, 2)[0] self.custom_type = custom_type self.status = status From de01d498cbf0b48bc45070f85f4558948c418ca9 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 24 Apr 2019 12:11:20 -0700 Subject: [PATCH 31/54] requests/urllib3 version conflict --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index e762b13..bbdd9bd 100755 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ deps = [ 'warctools>=4.10.0', 'urlcanon>=0.3.0', 'doublethink>=0.2.0.dev87', - 'urllib3>=1.14', + 'urllib3>=1.14,<1.25', 'requests>=2.0.1', 'PySocks>=1.6.8', 'cryptography>=2.3', @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.6', + version='2.4.7', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 38d6e4337d13fe9238fc48716cdf5aa773941d7d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 24 Apr 2019 13:14:12 -0700 Subject: [PATCH 32/54] handle graceful shutdown failure print stack trace and kill myself -9 --- setup.py | 2 +- warcprox/controller.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index bbdd9bd..75177d8 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.7', + version='2.4.8', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index fcdaa58..9a2880e 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -441,7 +441,12 @@ class WarcproxController(object): exc_info=True) pass finally: - self.shutdown() + try: + self.shutdown() + except: + self.logger.critical("graceful shutdown failed", exc_info=True) + self.logger.critical("killing myself -9") + os.kill(os.getpid(), 9) def _dump_profiling(self): import pstats, tempfile, os, io From be7048844bfa7db7580eb7cd9cd64a6ef24b725f Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 2 May 2019 07:11:24 +0000 Subject: [PATCH 33/54] Compile RecordedUrl regex to improve performance Minor optimisation. --- warcprox/warcproxy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 4a5312e..8898898 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -350,6 +350,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # logging better handled elsewhere? pass +RE_MIMETYPE = re.compile(r'[;\s]') class RecordedUrl: logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") @@ -389,7 +390,7 @@ class RecordedUrl: self.mimetype = content_type if self.mimetype: # chop off subtype, and ensure there's no whitespace - self.mimetype = re.split(r'[;\s]', self.mimetype, 2)[0] + self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] self.custom_type = custom_type self.status = status From ddcde369825c3cfb66164905007191bf09fef9e3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 2 May 2019 07:29:27 +0000 Subject: [PATCH 34/54] Increase urllib parse cache size In python2/3, urllib parse caches in memory URL parsing results to avoid repeating the process for the same URL. The problem is that the default in memory cache size is just 20. https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py#L80 Since we do a lot of URL parsing, it makes sense to increase cache size. --- warcprox/mitmproxy.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 705589e..51b80e9 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -35,6 +35,13 @@ try: import urllib.parse as urllib_parse except ImportError: import urlparse as urllib_parse +# In python2/3, urllib parse caches in memory URL parsing results to avoid +# repeating the process for the same URL. The problem is that the default +# in memory cache size is just 20. +# https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py#L80 +# since we do a lot of URL parsing, it makes sense to increase cache size. +urllib_parse.MAX_CACHE_SIZE = 2000 + try: import http.client as http_client # In python3 http.client.parse_headers() enforces http_client._MAXLINE From dfc081fff82c53039a9332141d8a99cab753df07 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 2 May 2019 14:25:29 -0700 Subject: [PATCH 35/54] do not write incorrect warc-payload-digest to... ... request records see https://github.com/webrecorder/warcio/issues/74#issuecomment-487816378 --- setup.py | 2 +- warcprox/warc.py | 51 +++++++++++++++++++++++++++++------------------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/setup.py b/setup.py index 75177d8..9ab99c9 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.8', + version='2.4.9', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/warc.py b/warcprox/warc.py index 94fe137..1eceee2 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -125,48 +125,59 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) if content_type is not None: headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) - if payload_digest is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) # truncated value may be 'length' or 'time' if truncated is not None: headers.append((b'WARC-Truncated', truncated)) + if content_length is not None: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(content_length).encode('latin1'))) if recorder is not None: - if content_length is not None: - headers.append(( - warctools.WarcRecord.CONTENT_LENGTH, - str(content_length).encode('latin1'))) - else: + if payload_digest is not None: + headers.append( + (warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + if content_length is None: headers.append(( warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, warcprox.digest_str(recorder.block_digest, self.base32))) recorder.tempfile.seek(0) - record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) + record = warctools.WarcRecord( + headers=headers, content_file=recorder.tempfile) else: - if content_length is not None: - headers.append(( - warctools.WarcRecord.CONTENT_LENGTH, - str(content_length).encode('latin1'))) - else: + if content_length is None: headers.append(( warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) - # no http headers so block digest == payload digest - if not payload_digest: - payload_digest = warcprox.digest_str( + + block_digest = None + if not hasattr(data, 'read'): + block_digest = warcprox.digest_str( hashlib.new(self.digest_algorithm, data), self.base32) - headers.append(( - warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, payload_digest)) + + if not content_type.lower().startswith(b'application/http'): + # no http headers, so block digest == payload digest + if payload_digest and not block_digest: + block_digest = payload_digest + elif block_digest and not payload_digest: + payload_digest = block_digest + + if block_digest: + headers.append( + (warctools.WarcRecord.BLOCK_DIGEST, block_digest)) + if payload_digest: + headers.append( + (warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + if hasattr(data, 'read'): record = warctools.WarcRecord( headers=headers, content_file=data) else: content_tuple = content_type, data record = warctools.WarcRecord( - headers=headers, content=content_tuple) + headers=headers, content=(content_type, data)) return record From 16489b99d958524be0244eff02c9f267f912b101 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 6 May 2019 21:23:10 +0000 Subject: [PATCH 36/54] Improve target url validation In addition to checking for scheme='http', we should also check that netloc has a value. There are many meaningless URLs that pass the current check. For instance: ``` In [5]: urlparse("http://") Out[5]: ParseResult(scheme='http', netloc='', path='', params='', query='', fragment='') In [6]: urlparse("http:///") Out[6]: ParseResult(scheme='http', netloc='', path='/', params='', query='', fragment='') ``` netloc should always have a value. --- warcprox/mitmproxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 705589e..f8b49dc 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -233,7 +233,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.url = self.path u = urllib_parse.urlparse(self.url) - if u.scheme != 'http': + if u.scheme != 'http' or u.netloc == '': raise Exception( 'unable to parse request %r as a proxy request' % ( self.requestline)) From 41d7f0be53f4789ccfda89516d9f6401ea749a70 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 6 May 2019 16:49:35 -0700 Subject: [PATCH 37/54] bump version after merges --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9ab99c9..2f3aeda 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.9', + version='2.4.10', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 89d987a181d650b2438251c878793a42388aa2e0 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 10:03:16 +0000 Subject: [PATCH 38/54] Cache bad target hostname:port to avoid reconnection attempts If connection to a hostname:port fails, add it to a `TTLCache` with 60 sec expiration time. Subsequent requests to the same hostname:port return really quickly as we check the cache and avoid trying a new network connection. The short expiration time guarantees that if a host becomes OK again, we'll be able to connect to it quickly. Adding `cachetools` dependency was necessary as there isn't any other way to have an expiring in-memory cache using stdlib. The library doesn't have any other dependencies, it has good test coverage and seems maintained. It also supports Python 3.7. --- setup.py | 1 + warcprox/mitmproxy.py | 31 +++++++++++++++++++++++++++++-- warcprox/warcproxy.py | 7 +++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 2f3aeda..61f474e 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ deps = [ 'cryptography>=2.3', 'idna>=2.5', 'PyYAML>=5.1', + 'cachetools', ] try: import concurrent.futures diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b29dcaf..c2c4183 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -77,6 +77,7 @@ import time import collections import cProfile from urllib3.util import is_connection_dropped +from urllib3.exceptions import NewConnectionError import doublethink class ProxyingRecorder(object): @@ -252,6 +253,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): query=u.query, fragment=u.fragment)) self.hostname = urlcanon.normalize_host(host).decode('ascii') + def _hostname_port_cache_key(self): + return '%s:%s' % (self.hostname, self.port) + def _connect_to_remote_server(self): ''' Connect to destination. @@ -380,7 +384,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self._determine_host_port() assert self.url - + # Check if target hostname:port is in `bad_hostnames_ports` cache + # to avoid retrying to connect. + with self.server.bad_hostnames_ports_lock: + hostname_port = self._hostname_port_cache_key() + if hostname_port in self.server.bad_hostnames_ports: + self.logger.info('Cannot connect to %s (cache)', + hostname_port) + self.send_error(502, 'message timed out') + return # Connect to destination self._connect_to_remote_server() except warcprox.RequestBlockedByRule as e: @@ -388,6 +400,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.info("%r: %r", self.requestline, e) return except Exception as e: + # If connection fails, add hostname:port to cache to avoid slow + # subsequent reconnection attempts. `NewConnectionError` can be + # caused by many types of errors which are handled by urllib3. + if type(e) in (socket.timeout, NewConnectionError): + with self.server.bad_hostnames_ports_lock: + host_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[host_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) @@ -527,7 +548,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # put it back in the pool to reuse it later. if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) - except: + except Exception as e: + if type(e) in (socket.timeout, NewConnectionError): + with self.server.bad_hostnames_ports_lock: + hostname_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[hostname_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 8898898..2d072b9 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -48,6 +48,8 @@ import tempfile import hashlib import doublethink import re +from threading import RLock +from cachetools import TTLCache class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -431,6 +433,11 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.status_callback = status_callback self.stats_db = stats_db self.options = options + # TTLCache is not thread-safe. Access to the shared cache from multiple + # threads must be properly synchronized with an RLock according to ref: + # https://cachetools.readthedocs.io/en/latest/ + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = RLock() self.remote_connection_pool = PoolManager( num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) server_address = ( From bbe41bc900f2216639fe2aa8d3ce5a57abf5794a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 15:57:01 +0000 Subject: [PATCH 39/54] Add bad_hostnames_ports in PlaybackProxy These vars are required also there in addition to `SingleThreadedWarcProxy`. --- warcprox/playback.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/warcprox/playback.py b/warcprox/playback.py index 91f86aa..8bfa42f 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -42,6 +42,7 @@ from warcprox.mitmproxy import MitmProxyHandler import warcprox import sqlite3 import threading +from cachetools import TTLCache class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -219,6 +220,8 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): self.playback_index_db = playback_index_db self.warcs_dir = options.directory self.options = options + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = threading.RLock() def server_activate(self): http_server.HTTPServer.server_activate(self) From 75e789c15fc99518857fecccbb8d0553f03456a3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 20:44:47 +0000 Subject: [PATCH 40/54] Add entries to bad_hostnames_ports only on connection init Do not add entries to bad_hostnames_ports during connection running if an exception occurs. Do it only on connection init because for some unclear reason unit tests fail in that case. --- warcprox/mitmproxy.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index c2c4183..39469d5 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -549,12 +549,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except Exception as e: - if type(e) in (socket.timeout, NewConnectionError): - with self.server.bad_hostnames_ports_lock: - hostname_port = self._hostname_port_cache_key() - self.server.bad_hostnames_ports[hostname_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise From 89041e83b4707a8e4229eef3e7384849c5f1c3fd Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 10 May 2019 07:32:42 +0000 Subject: [PATCH 41/54] Catch RemoteDisconnected case when starting downloading A common error is to connect to the remote server successfully but raise a `http_client.RemoteDisconnected` exception when trying to begin downloading. Its caused by call `prox_rec_res.begin(...)` which calls `http_client._read_status()`. In that case, we also add the target `hostname:port` to the `bad_hostnames_ports` cache. Modify 2 unit tests to clear the `bad_hostnames_ports` cache because localhost is added from previous tests and this breaks them. --- tests/test_warcprox.py | 8 ++++++++ warcprox/mitmproxy.py | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 6c49f0a..4323a6c 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1986,6 +1986,10 @@ def test_socket_timeout_response( def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + url = 'http://localhost:%s/empty-response' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 @@ -2001,6 +2005,10 @@ def test_payload_digest(warcprox_, http_daemon): Tests that digest is of RFC2616 "entity body" (transfer-decoded but not content-decoded) ''' + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler): def __init__(self, url): self.path = url diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 39469d5..cc16281 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -549,6 +549,18 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except Exception as e: + # A common error is to connect to the remote server successfully + # but raise a `RemoteDisconnected` exception when trying to begin + # downloading. Its caused by prox_rec_res.begin(...) which calls + # http_client._read_status(). In that case, the host is also bad + # and we must add it to `bad_hostnames_ports` cache. + if type(e) == http_client.RemoteDisconnected: + with self.server.bad_hostnames_ports_lock: + host_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[host_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise From f0d2898326810f03c00c26c4fc8b84727c32ad2a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 14 May 2019 19:08:30 +0000 Subject: [PATCH 42/54] Tighten up the use of the lock for the TTLCache Move out of the lock instructions that are thread safe. --- warcprox/mitmproxy.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index cc16281..65d2992 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -404,11 +404,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # subsequent reconnection attempts. `NewConnectionError` can be # caused by many types of errors which are handled by urllib3. if type(e) in (socket.timeout, NewConnectionError): + host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - host_port = self._hostname_port_cache_key() self.server.bad_hostnames_ports[host_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) @@ -555,11 +555,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # http_client._read_status(). In that case, the host is also bad # and we must add it to `bad_hostnames_ports` cache. if type(e) == http_client.RemoteDisconnected: + host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - host_port = self._hostname_port_cache_key() self.server.bad_hostnames_ports[host_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() From 5b30dd4576e5f9b1558c8cff5c4aa0102d2aa715 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 14 May 2019 19:35:46 +0000 Subject: [PATCH 43/54] Cache error status and message Instead of returning a generic error status and message when hitting the bad_hostnames_ports cache, we cache and return the original error. --- warcprox/mitmproxy.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 65d2992..b158162 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -385,14 +385,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._determine_host_port() assert self.url # Check if target hostname:port is in `bad_hostnames_ports` cache - # to avoid retrying to connect. + # to avoid retrying to connect. cached is a tuple containing + # (status_code, error message) + cached = None + hostname_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - hostname_port = self._hostname_port_cache_key() - if hostname_port in self.server.bad_hostnames_ports: - self.logger.info('Cannot connect to %s (cache)', - hostname_port) - self.send_error(502, 'message timed out') - return + cached = self.server.bad_hostnames_ports.get(hostname_port) + if cached: + self.logger.info('Cannot connect to %s (cache)', hostname_port) + self.send_error(cached[0], cached[1]) + return # Connect to destination self._connect_to_remote_server() except warcprox.RequestBlockedByRule as e: @@ -406,7 +408,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if type(e) in (socket.timeout, NewConnectionError): host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = 1 + self.server.bad_hostnames_ports[host_port] = (500, str(e)) self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports)) self.logger.error( @@ -557,7 +559,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if type(e) == http_client.RemoteDisconnected: host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = 1 + self.server.bad_hostnames_ports[host_port] = (502, str(e)) self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports)) From 2772b80fab71747902185a1b2cf906c63de6aafe Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 14 May 2019 15:50:59 -0700 Subject: [PATCH 44/54] bump version after merge --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 61f474e..29221af 100755 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.10', + version='2.4.11', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From f51f2ec225d8fec39bdc5848d76e8c6a1c762b86 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 14 May 2019 15:51:11 -0700 Subject: [PATCH 45/54] some tweaks to error responses use 502, 504 when appropriate, and don't send `str(e)` as in the http status line, because that is often an ugly jumble --- setup.py | 2 +- tests/test_warcprox.py | 4 ++-- warcprox/__init__.py | 9 +++++++++ warcprox/mitmproxy.py | 31 +++++++++++++++++++++---------- warcprox/warcproxy.py | 2 +- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/setup.py b/setup.py index 29221af..56e8213 100755 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.11', + version='2.4.12', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 4323a6c..d34bb43 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1724,13 +1724,13 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies): url = 'http://localhost:%s/b/b' % http_daemon.server_port headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"../../../../etc/a"})} response = requests.get(url, proxies=archiving_proxies, headers=headers) - assert response.status_code == 500 + assert response.status_code == 400 assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' url = 'http://localhost:%s/b/c' % http_daemon.server_port headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"..\\..\\..\\derp\\monkey"})} response = requests.get(url, proxies=archiving_proxies, headers=headers) - assert response.status_code == 500 + assert response.status_code == 400 assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' def test_crawl_log(warcprox_, http_daemon, archiving_proxies): diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 852d3fc..9cd09a8 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -78,6 +78,15 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) +class BadRequest(Exception): + ''' + Raised in case of a request deemed unacceptable by warcprox. + ''' + def __init__(self, msg): + self.msg = msg + def __str__(self): + return "%s: %s" % (self.__class__.__name__, self.msg) + class BasePostfetchProcessor(threading.Thread): logger = logging.getLogger("warcprox.BasePostfetchProcessor") diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b158162..c1f01bd 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -77,7 +77,7 @@ import time import collections import cProfile from urllib3.util import is_connection_dropped -from urllib3.exceptions import NewConnectionError +from urllib3.exceptions import TimeoutError, HTTPError import doublethink class ProxyingRecorder(object): @@ -385,15 +385,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._determine_host_port() assert self.url # Check if target hostname:port is in `bad_hostnames_ports` cache - # to avoid retrying to connect. cached is a tuple containing - # (status_code, error message) + # to avoid retrying to connect. Cached value is http status code. cached = None hostname_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: cached = self.server.bad_hostnames_ports.get(hostname_port) if cached: self.logger.info('Cannot connect to %s (cache)', hostname_port) - self.send_error(cached[0], cached[1]) + self.send_error(cached) return # Connect to destination self._connect_to_remote_server() @@ -401,20 +400,32 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # limit enforcers have already sent the appropriate response self.logger.info("%r: %r", self.requestline, e) return + except warcprox.BadRequest as e: + self.send_error(400, e.msg) + return except Exception as e: # If connection fails, add hostname:port to cache to avoid slow # subsequent reconnection attempts. `NewConnectionError` can be # caused by many types of errors which are handled by urllib3. - if type(e) in (socket.timeout, NewConnectionError): + response_code = 500 + cache = False + if isinstance(e, (socket.timeout, TimeoutError,)): + response_code = 504 + cache = True + elif isinstance(e, HTTPError): + response_code = 502 + cache = True + + if cache: host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = (500, str(e)) + self.server.bad_hostnames_ports[host_port] = response_code self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) - self.send_error(500, str(e)) + self.send_error(response_code) return try: @@ -429,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( 'error from remote server(?) %r: %r', self.requestline, e, exc_info=True) - self.send_error(502, str(e)) + self.send_error(502) return def send_error(self, code, message=None, explain=None): @@ -556,10 +567,10 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # downloading. Its caused by prox_rec_res.begin(...) which calls # http_client._read_status(). In that case, the host is also bad # and we must add it to `bad_hostnames_ports` cache. - if type(e) == http_client.RemoteDisconnected: + if isinstance(e, http_client.RemoteDisconnected): host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = (502, str(e)) + self.server.bad_hostnames_ports[host_port] = 502 self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports)) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2d072b9..9b8545d 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -170,7 +170,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): if warcprox_meta and 'warc-prefix' in warcprox_meta and ( '/' in warcprox_meta['warc-prefix'] or '\\' in warcprox_meta['warc-prefix']): - raise Exception( + raise warcprox.BadRequest( "request rejected by warcprox: slash and backslash are not " "permitted in warc-prefix") From bbf3fad1dcd9910ba30d025fc063c17fbf3a8c4d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 15 May 2019 15:58:47 -0700 Subject: [PATCH 46/54] avoid using warcproxy.py stuff in mitmproxy.py --- setup.py | 2 +- warcprox/mitmproxy.py | 56 +++++++++++++++++++++++++++++++++++++++++++ warcprox/warcproxy.py | 50 ++++---------------------------------- 3 files changed, 62 insertions(+), 46 deletions(-) diff --git a/setup.py b/setup.py index 56e8213..48192ba 100755 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.12', + version='2.4.13', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index c1f01bd..6ae52f5 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -76,9 +76,13 @@ import urlcanon import time import collections import cProfile +from urllib3 import PoolManager from urllib3.util import is_connection_dropped from urllib3.exceptions import TimeoutError, HTTPError import doublethink +from cachetools import TTLCache +from threading import RLock +from certauth.certauth import CertificateAuthority class ProxyingRecorder(object): """ @@ -223,9 +227,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): and records the bytes in transit as it proxies them. ''' logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") + _socket_timeout = 60 _max_resource_size = None _tmp_file_max_memory_size = 512 * 1024 + onion_tor_socks_proxy_host = None + onion_tor_socks_proxy_port = None def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) @@ -737,3 +744,52 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): for sock in self.remote_server_socks: self.shutdown_request(sock) +class SingleThreadedMitmProxy(http_server.HTTPServer): + logger = logging.getLogger('warcprox.warcproxy.SingleThreadedMitmProxy') + + def __init__( + self, MitmProxyHandlerClass=MitmProxyHandler, + options=warcprox.Options()): + self.options = options + + # TTLCache is not thread-safe. Access to the shared cache from multiple + # threads must be properly synchronized with an RLock according to ref: + # https://cachetools.readthedocs.io/en/latest/ + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = RLock() + + self.remote_connection_pool = PoolManager( + num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) + + if options.onion_tor_socks_proxy: + try: + host, port = options.onion_tor_socks_proxy.split(':') + MitmProxyHandlerClass.onion_tor_socks_proxy_host = host + MitmProxyHandlerClass.onion_tor_socks_proxy_port = int(port) + except ValueError: + MitmProxyHandlerClass.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy + MitmProxyHandlerClass.onion_tor_socks_proxy_port = None + + if options.socket_timeout: + MitmProxyHandlerClass._socket_timeout = options.socket_timeout + if options.max_resource_size: + MitmProxyHandlerClass._max_resource_size = options.max_resource_size + if options.tmp_file_max_memory_size: + MitmProxyHandlerClass._tmp_file_max_memory_size = options.tmp_file_max_memory_size + + self.digest_algorithm = options.digest_algorithm or 'sha1' + + ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] + self.ca = CertificateAuthority( + ca_file=options.cacert or 'warcprox-ca.pem', + certs_dir=options.certs_dir or './warcprox-ca', + ca_name=ca_name) + + server_address = ( + options.address or 'localhost', + options.port if options.port is not None else 8000) + + http_server.HTTPServer.__init__( + self, server_address, MitmProxyHandlerClass, + bind_and_activate=True) + diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 9b8545d..e5b35d2 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -38,18 +38,14 @@ import logging import json import socket from hanzo import warctools -from certauth.certauth import CertificateAuthority import warcprox import datetime import urlcanon import os -from urllib3 import PoolManager import tempfile import hashlib import doublethink import re -from threading import RLock -from cachetools import TTLCache class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -423,56 +419,20 @@ class RecordedUrl: # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639 -class SingleThreadedWarcProxy(http_server.HTTPServer, object): +class SingleThreadedWarcProxy(warcprox.mitmproxy.SingleThreadedMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__( self, stats_db=None, status_callback=None, options=warcprox.Options()): self.start_time = doublethink.utcnow() + + warcprox.mitmproxy.SingleThreadedMitmProxy.__init__( + self, WarcProxyHandler, options) + self.status_callback = status_callback self.stats_db = stats_db - self.options = options - # TTLCache is not thread-safe. Access to the shared cache from multiple - # threads must be properly synchronized with an RLock according to ref: - # https://cachetools.readthedocs.io/en/latest/ - self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) - self.bad_hostnames_ports_lock = RLock() - self.remote_connection_pool = PoolManager( - num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) - server_address = ( - options.address or 'localhost', - options.port if options.port is not None else 8000) - - if options.onion_tor_socks_proxy: - try: - host, port = options.onion_tor_socks_proxy.split(':') - WarcProxyHandler.onion_tor_socks_proxy_host = host - WarcProxyHandler.onion_tor_socks_proxy_port = int(port) - except ValueError: - WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy - WarcProxyHandler.onion_tor_socks_proxy_port = None - - if options.socket_timeout: - WarcProxyHandler._socket_timeout = options.socket_timeout - if options.max_resource_size: - WarcProxyHandler._max_resource_size = options.max_resource_size - if options.tmp_file_max_memory_size: - WarcProxyHandler._tmp_file_max_memory_size = options.tmp_file_max_memory_size - - http_server.HTTPServer.__init__( - self, server_address, WarcProxyHandler, bind_and_activate=True) - - self.digest_algorithm = options.digest_algorithm or 'sha1' - - ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] - self.ca = CertificateAuthority( - ca_file=options.cacert or 'warcprox-ca.pem', - certs_dir=options.certs_dir or './warcprox-ca', - ca_name=ca_name) - self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000) - self.running_stats = warcprox.stats.RunningStats() def status(self): From 8c31ec29169008a0d9f22120f0871d3aee0879a7 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 15 May 2019 16:06:42 -0700 Subject: [PATCH 47/54] bigger connection pool, for Vangelis --- setup.py | 2 +- warcprox/mitmproxy.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 48192ba..0b6d756 100755 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ except: setuptools.setup( name='warcprox', - version='2.4.13', + version='2.4.14', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 6ae52f5..d6a0593 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -274,7 +274,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): ''' self._conn_pool = self.server.remote_connection_pool.connection_from_host( host=self.hostname, port=int(self.port), scheme='http', - pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout}) + pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout}) self._remote_server_conn = self._conn_pool._get_conn() if is_connection_dropped(self._remote_server_conn): @@ -759,7 +759,7 @@ class SingleThreadedMitmProxy(http_server.HTTPServer): self.bad_hostnames_ports_lock = RLock() self.remote_connection_pool = PoolManager( - num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) + num_pools=max((options.max_threads or 0) // 6, 400)) if options.onion_tor_socks_proxy: try: From 957bd079e8e40a0f1412127fc1e44f9329bb6bb8 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 30 May 2019 19:27:46 -0700 Subject: [PATCH 48/54] WIP (untested): handle multiple dedup-buckets, rw or ro --- warcprox/bigtable.py | 7 ++++-- warcprox/dedup.py | 58 +++++++++++++++++++++++++------------------ warcprox/warcproxy.py | 3 ++- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 4df8ab3..ff2ad0a 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -157,8 +157,11 @@ class RethinkCaptures: sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") if (recorded_url.warcprox_meta - and "dedup-bucket" in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta["dedup-bucket"] + and "dedup-buckets" in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + # maybe this is the right thing to do here? or should we return an entry for each? or ? + break else: bucket = "__unspecified__" diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7889cd9..9562fa5 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -47,11 +47,11 @@ class DedupableMixin(object): def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. - When we use option --dedup-only-with-bucket, `dedup-bucket` is required + When we use option --dedup-only-with-bucket, `dedup-buckets` is required in Warcprox-Meta to perform dedup. Return Boolean. """ - if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: + if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta: return False if recorded_url.is_text(): return recorded_url.response_recorder.payload_size() > self.min_text_size @@ -69,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): and recorded_url.payload_digest and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = self.dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["dedup-bucket"], - recorded_url.url) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, bucket, recorded_url.url) + if recorded_url.dedup_info: + # we found an existing capture + break else: recorded_url.dedup_info = self.dedup_db.lookup( digest_key, url=recorded_url.url) @@ -148,10 +151,12 @@ class DedupDb(DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == "ro": + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) @@ -213,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save(digest_key, records[0], bucket=bucket) else: self.save(digest_key, records[0]) @@ -347,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): and recorded_url.warc_records[0].type == b'response' and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) return buckets def _process_batch(self, batch): @@ -399,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): and recorded_url.payload_digest and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) else: discards.append( warcprox.digest_str( @@ -576,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta['dedup-bucket']) + if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index e5b35d2..625138b 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -377,7 +377,8 @@ class RecordedUrl: if warcprox_meta: if 'captures-bucket' in warcprox_meta: # backward compatibility - warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw' del warcprox_meta['captures-bucket'] self.warcprox_meta = warcprox_meta else: From 6ee7ab36a20478340711efbe662a339436f46dde Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 31 May 2019 17:36:13 -0700 Subject: [PATCH 49/54] fix tests too --- tests/test_warcprox.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d34bb43..3f803c2 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) # archive url1 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -816,7 +816,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert dedup_lookup is None # archive url2 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -845,7 +845,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -928,7 +928,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -944,7 +944,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, -i - 1) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -959,7 +959,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -1500,7 +1500,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive with dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1518,7 +1518,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive without dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag'} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) From d13356506176c1eccc2cc19a03b520b12d066456 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Tue, 4 Jun 2019 14:53:06 -0700 Subject: [PATCH 50/54] continue support for _singular_ dedup-bucket --- warcprox/warcproxy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 625138b..9d23244 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -380,6 +380,11 @@ class RecordedUrl: warcprox_meta['dedup-buckets'] = {} warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw' del warcprox_meta['captures-bucket'] + if 'dedup-bucket' in warcprox_meta: + # more backwards compatibility + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw' + del warcprox_meta['dedup-bucket'] self.warcprox_meta = warcprox_meta else: self.warcprox_meta = {} From 8c52bd8442d75e0a0da628610e77ab5979266980 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 13 Jun 2019 17:18:51 -0700 Subject: [PATCH 51/54] docs updates --- README.rst | 7 ++++--- api.rst | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index b7b5c17..77e7e58 100644 --- a/README.rst +++ b/README.rst @@ -89,12 +89,13 @@ for deduplication works similarly to deduplication by `Heritrix 4. If not found, a. Write ``response`` record with full payload - b. Store new entry in deduplication database + b. Store new entry in deduplication database (can be disabled, see + `Warcprox-Meta HTTP request header ` The deduplication database is partitioned into different "buckets". URLs are deduplicated only against other captures in the same bucket. If specified, the -``dedup-bucket`` field of the `Warcprox-Meta HTTP request header -`_ determines the bucket. Otherwise, +``dedup-buckets`` field of the `Warcprox-Meta HTTP request header +`_ determines the bucket(s). Otherwise, the default bucket is used. Deduplication can be disabled entirely by starting warcprox with the argument diff --git a/api.rst b/api.rst index 1da1898..eee3219 100644 --- a/api.rst +++ b/api.rst @@ -137,14 +137,16 @@ Example:: Warcprox-Meta: {"warc-prefix": "special-warc"} -``dedup-bucket`` (string) +``dedup-buckets`` (string) ~~~~~~~~~~~~~~~~~~~~~~~~~ -Specifies the deduplication bucket. For more information about deduplication +Specifies the deduplication bucket(s). For more information about deduplication see ``_. -Example:: +Examples:: - Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"} + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}} + + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}} ``blocks`` (list) ~~~~~~~~~~~~~~~~~ From 51c4f6d6222a9aac1543bad94a06f6233d6b0b64 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 13 Jun 2019 17:57:29 -0700 Subject: [PATCH 52/54] test_dedup_buckets_multiple --- tests/test_warcprox.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 3f803c2..d051128 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -916,6 +916,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + + # archive url1 + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_multiple", + "dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}}) + } + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + # check url1 in dedup db bucket_1 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 not in dedup db bucket_2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2") + assert dedup_lookup is None + + # close the warc + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + warc_path = os.path.join(writer.directory, writer.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"].close() + assert os.path.exists(warc_path) + + # read the warc + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_1 + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + # check for duplicate warc record headers + assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls revisits_before = warcprox_.proxy.stats_db.value( From 79aab697e2331ef0a564a273956f3d6ed5ea2035 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 14 Jun 2019 12:42:25 -0700 Subject: [PATCH 53/54] more tests --- tests/test_warcprox.py | 94 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 6 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d051128..0c1ae3f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -845,7 +845,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -916,13 +916,95 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() -def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): +def test_multiple_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + + # archive url1 bucket_a1, bucket_b2, bucket_c3 + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_multiple_dedup_buckets", + "dedup-buckets":{"bucket_a1":"rw", "bucket_b2":"", "bucket_c3":"rw"} + })} + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + # check url1 in dedup db bucket_a1 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a1') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a1") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 in dedup db bucket_b2 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_b2') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b2") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 in dedup db bucket_c3 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_c3') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_c3") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # close the warc + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] + warc_path = os.path.join(writer.directory, writer.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"].close() + assert os.path.exists(warc_path) + + # read the warc # should we bother with this? + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_a + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + # check for duplicate warc record headers + assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + +def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) # archive url1 - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_multiple", + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly", "dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}}) } response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) @@ -950,11 +1032,11 @@ def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_ assert dedup_lookup is None # close the warc - assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] - writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] warc_path = os.path.join(writer.directory, writer.finalname) assert not os.path.exists(warc_path) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"].close() + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close() assert os.path.exists(warc_path) # read the warc From c0fcf59c86d68cbc7d49698918ea5b57bf82351e Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 14 Jun 2019 13:34:47 -0700 Subject: [PATCH 54/54] rm test not matching use case --- tests/test_warcprox.py | 86 +----------------------------------------- 1 file changed, 2 insertions(+), 84 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0c1ae3f..884ddd4 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -916,88 +916,6 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() -def test_multiple_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): - urls_before = warcprox_.proxy.running_stats.urls - - url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) - - # archive url1 bucket_a1, bucket_b2, bucket_c3 - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_multiple_dedup_buckets", - "dedup-buckets":{"bucket_a1":"rw", "bucket_b2":"", "bucket_c3":"rw"} - })} - response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) - assert response.status_code == 200 - assert response.headers['warcprox-test-header'] == 'k!' - assert response.content == b'I am the warcprox test payload! llllllllll!\n' - - # wait for postfetch chain - wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) - - # check url1 in dedup db bucket_a1 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a1') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a1") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # check url1 in dedup db bucket_b2 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_b2') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b2") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # check url1 in dedup db bucket_c3 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_c3') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_c3") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # close the warc - assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] - writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] - warc_path = os.path.join(writer.directory, writer.finalname) - assert not os.path.exists(warc_path) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"].close() - assert os.path.exists(warc_path) - - # read the warc # should we bother with this? - fh = warctools.ArchiveRecord.open_archive(warc_path) - record_iter = fh.read_records(limit=None, offsets=True) - try: - (offset, record, errors) = next(record_iter) - assert record.type == b'warcinfo' - - # url1 bucket_a - (offset, record, errors) = next(record_iter) - assert record.type == b'response' - assert record.url == url1.encode('ascii') - # check for duplicate warc record headers - assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 - assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' - (offset, record, errors) = next(record_iter) - assert record.type == b'request' - - # that's all folks - assert next(record_iter)[1] == None - assert next(record_iter, None) == None - - finally: - fh.close() - def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls @@ -1015,7 +933,7 @@ def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_ # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) - # check url1 in dedup db bucket_1 + # check url1 in dedup db bucket_1 (rw) # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1') dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1") @@ -1026,7 +944,7 @@ def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_ record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] - # check url1 not in dedup db bucket_2 + # check url1 not in dedup db bucket_2 (ro) dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2") assert dedup_lookup is None