fix mistake (incorrect interpration of concurrent.futures.ThreadPoolExecutor internals) that caused unnecessary waits, and unnecessarily long waits, before calling socket.accept()

This commit is contained in:
Noah Levitt 2017-05-12 14:18:35 -07:00
parent fd770b71bc
commit a3dde3d97f
2 changed files with 20 additions and 6 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.1b1.dev81',
version='2.1b1.dev82',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -435,6 +435,7 @@ class PooledMixIn(socketserver.ThreadingMixIn):
If max_threads is not supplied, calculates a reasonable value based
on system resource limits.
'''
self.active_requests = set()
if not max_threads:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
@ -448,10 +449,19 @@ class PooledMixIn(socketserver.ThreadingMixIn):
self.logger.info(
"max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)",
max_threads, rlimit_nproc, rlimit_nofile)
self.max_threads = max_threads
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
def process_request(self, request, client_address):
self.pool.submit(self.process_request_thread, request, client_address)
self.active_requests.add(request)
future = self.pool.submit(
self.process_request_thread, request, client_address)
future.add_done_callback(
lambda f: self.active_requests.discard(request))
if future.done():
# avoid theoretical timing issue, in case process_request_thread
# managed to finish before future.add_done_callback() ran
self.active_requests.discard(request)
def get_request(self):
'''
@ -466,12 +476,16 @@ class PooledMixIn(socketserver.ThreadingMixIn):
'''
# neither threading.Condition Queue.not_empty nor Queue.not_full do
# what we need here, right?
start = time.time()
self.logger.trace(
'someone is connecting qsize=%s', self.pool._work_queue.qsize())
while self.pool._work_queue.qsize() > 0:
time.sleep(0.5)
'someone is connecting active_requests=%s',
len(self.active_requests))
while len(self.active_requests) > self.max_threads:
time.sleep(0.05)
res = self.socket.accept()
self.logger.trace('accepted socket=%s', res)
self.logger.trace(
'accepted after %.1f sec active_requests=%s socket=%s',
time.time() - start, len(self.active_requests), res[0])
return res
class MitmProxy(http_server.HTTPServer):