cap the number of urls queued for warc writing

This commit is contained in:
Noah Levitt 2018-04-11 22:29:50 +00:00
parent cb0dea3739
commit cc8fb4c608
3 changed files with 22 additions and 3 deletions

View File

@ -40,7 +40,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.4b2.dev165',
version='2.4b2.dev166',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -26,12 +26,14 @@ import time
import logging
from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version
import concurrent.futures
try:
import queue
except ImportError:
import Queue as queue
__version__ = _get_distribution('warcprox').version
def digest_str(hash_obj, base32=False):
import base64
return hash_obj.name.encode('utf-8') + b':' + (
@ -45,6 +47,17 @@ class Options(_Namespace):
except AttributeError:
return None
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
'''
`concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size.
If `max_queued` is set, calls to `submit()` will block if necessary until a
free slot is available.
'''
def __init__(self, max_queued=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queued or 0)
class TimestampedQueue(queue.Queue):
"""
A queue.Queue that exposes the time enqueued of the oldest item in the

View File

@ -41,7 +41,13 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1)
# set max_queued small, because self.inq is already handling queueing
# for us; but give it a little breathing room to make sure it can keep
# worker threads busy
self.pool = warcprox.ThreadPoolExecutor(
max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1))
self.batch = set()
def _startup(self):