From cc8fb4c608fcd9aab929a85598405be274632959 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Apr 2018 22:29:50 +0000 Subject: [PATCH] cap the number of urls queued for warc writing --- setup.py | 2 +- warcprox/__init__.py | 15 ++++++++++++++- warcprox/writerthread.py | 8 +++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 824d2ba..a00566f 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev165', + version='2.4b2.dev166', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 76d733d..20f0de4 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -26,12 +26,14 @@ import time import logging from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution -__version__ = _get_distribution('warcprox').version +import concurrent.futures try: import queue except ImportError: import Queue as queue +__version__ = _get_distribution('warcprox').version + def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( @@ -45,6 +47,17 @@ class Options(_Namespace): except AttributeError: return None +class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + ''' + `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size. + + If `max_queued` is set, calls to `submit()` will block if necessary until a + free slot is available. + ''' + def __init__(self, max_queued=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._work_queue = queue.Queue(maxsize=max_queued or 0) + class TimestampedQueue(queue.Queue): """ A queue.Queue that exposes the time enqueued of the oldest item in the diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 927c628..03aee9e 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -41,7 +41,13 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) - self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + + # set max_queued small, because self.inq is already handling queueing + # for us; but give it a little breathing room to make sure it can keep + # worker threads busy + self.pool = warcprox.ThreadPoolExecutor( + max_workers=options.writer_threads or 1, + max_queued=10 * (options.writer_threads or 1)) self.batch = set() def _startup(self):