From a3dde3d97fef98b67f71311f52b9f2065adf316f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 12 May 2017 14:18:35 -0700 Subject: [PATCH] fix mistake (incorrect interpration of concurrent.futures.ThreadPoolExecutor internals) that caused unnecessary waits, and unnecessarily long waits, before calling socket.accept() --- setup.py | 2 +- warcprox/mitmproxy.py | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index 5d76de5..661950c 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev81', + version='2.1b1.dev82', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index c768b8a..d949072 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -435,6 +435,7 @@ class PooledMixIn(socketserver.ThreadingMixIn): If max_threads is not supplied, calculates a reasonable value based on system resource limits. ''' + self.active_requests = set() if not max_threads: # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, # more precisely on Linux, threads) that can be created for the @@ -448,10 +449,19 @@ class PooledMixIn(socketserver.ThreadingMixIn): self.logger.info( "max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", max_threads, rlimit_nproc, rlimit_nofile) + self.max_threads = max_threads self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) def process_request(self, request, client_address): - self.pool.submit(self.process_request_thread, request, client_address) + self.active_requests.add(request) + future = self.pool.submit( + self.process_request_thread, request, client_address) + future.add_done_callback( + lambda f: self.active_requests.discard(request)) + if future.done(): + # avoid theoretical timing issue, in case process_request_thread + # managed to finish before future.add_done_callback() ran + self.active_requests.discard(request) def get_request(self): ''' @@ -466,12 +476,16 @@ class PooledMixIn(socketserver.ThreadingMixIn): ''' # neither threading.Condition Queue.not_empty nor Queue.not_full do # what we need here, right? + start = time.time() self.logger.trace( - 'someone is connecting qsize=%s', self.pool._work_queue.qsize()) - while self.pool._work_queue.qsize() > 0: - time.sleep(0.5) + 'someone is connecting active_requests=%s', + len(self.active_requests)) + while len(self.active_requests) > self.max_threads: + time.sleep(0.05) res = self.socket.accept() - self.logger.trace('accepted socket=%s', res) + self.logger.trace( + 'accepted after %.1f sec active_requests=%s socket=%s', + time.time() - start, len(self.active_requests), res[0]) return res class MitmProxy(http_server.HTTPServer):