diff --git a/warcprox/main.py b/warcprox/main.py index 3ad92fe..de7ce89 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -74,13 +74,13 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('--rethinkdb-big-table', dest='rethinkdb_big_table', action='store_true', default=False, help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list', + arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list', default=None, help='kafka broker list for capture feed') - arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', + arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', default=None, help='kafka capture feed topic') arg_parser.add_argument('--queue-size', dest='queue_size', default=500, help=argparse.SUPPRESS) - arg_parser.add_argument('--max-threads', dest='max_threads', default=500, + arg_parser.add_argument('--max-threads', dest='max_threads', help=argparse.SUPPRESS) arg_parser.add_argument('--profile', action='store_true', default=False, help=argparse.SUPPRESS) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index b46f610..b9ed3ec 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -36,6 +36,7 @@ from certauth.certauth import CertificateAuthority import warcprox import datetime import concurrent.futures +import resource class ProxyingRecorder(object): """ @@ -394,12 +395,25 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): class PooledMixIn(socketserver.ThreadingMixIn): def process_request(self, request, client_address): - if hasattr(self, 'pool') and self.pool: - self.pool.submit(self.process_request_thread, request, client_address) - else: - socketserver.ThreadingMixIn.process_request(self, request, client_address) + self.pool.submit(self.process_request_thread, request, client_address) class WarcProxy(PooledMixIn, SingleThreadedWarcProxy): + logger = logging.getLogger("warcprox.warcproxy.WarcProxy") + def __init__(self, *args, **kwargs): SingleThreadedWarcProxy.__init__(self, *args, **kwargs) - self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.options.max_threads or 500) + if self.options.max_threads: + max_threads = self.options.max_threads + self.logger.info("max_threads=%s set by command line option", + max_threads) + else: + # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, + # more precisely on Linux, threads) that can be created for the + # real user ID of the calling process." + rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0] + rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2) + self.logger.info("max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", + max_threads, rlimit_nproc, rlimit_nofile) + + self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)