heuristic to set size of thread pool based on open files limit, to hopefully fix problem where warcprox got stuck because it ran out of file handles

This commit is contained in:
Noah Levitt 2016-03-04 20:59:11 +00:00
parent 46887f7594
commit 918fdd3e9b
2 changed files with 22 additions and 8 deletions

View File

@ -74,13 +74,13 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('--rethinkdb-big-table', arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False, dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list', arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list',
default=None, help='kafka broker list for capture feed') default=None, help='kafka broker list for capture feed')
arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
default=None, help='kafka capture feed topic') default=None, help='kafka capture feed topic')
arg_parser.add_argument('--queue-size', dest='queue_size', default=500, arg_parser.add_argument('--queue-size', dest='queue_size', default=500,
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', default=500, arg_parser.add_argument('--max-threads', dest='max_threads',
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False, arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS) help=argparse.SUPPRESS)

View File

@ -36,6 +36,7 @@ from certauth.certauth import CertificateAuthority
import warcprox import warcprox
import datetime import datetime
import concurrent.futures import concurrent.futures
import resource
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """
@ -394,12 +395,25 @@ class SingleThreadedWarcProxy(http_server.HTTPServer):
class PooledMixIn(socketserver.ThreadingMixIn): class PooledMixIn(socketserver.ThreadingMixIn):
def process_request(self, request, client_address): def process_request(self, request, client_address):
if hasattr(self, 'pool') and self.pool: self.pool.submit(self.process_request_thread, request, client_address)
self.pool.submit(self.process_request_thread, request, client_address)
else:
socketserver.ThreadingMixIn.process_request(self, request, client_address)
class WarcProxy(PooledMixIn, SingleThreadedWarcProxy): class WarcProxy(PooledMixIn, SingleThreadedWarcProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
SingleThreadedWarcProxy.__init__(self, *args, **kwargs) SingleThreadedWarcProxy.__init__(self, *args, **kwargs)
self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.options.max_threads or 500) if self.options.max_threads:
max_threads = self.options.max_threads
self.logger.info("max_threads=%s set by command line option",
max_threads)
else:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
# real user ID of the calling process."
rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0]
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2)
self.logger.info("max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)",
max_threads, rlimit_nproc, rlimit_nofile)
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)