fix shutdown

at shutdown, abort active connections, but allow completed fetches to
finish processing

this should fix race condition issue at shutdown, where postfetch
processor B would shut down, then postfetch processor A would try to
enqueue more urls, filling up the queue to the point where it blocks
forever, since B is no longer pulling urls off the queue
This commit is contained in:
Noah Levitt 2018-10-26 13:21:15 -07:00
parent 4f01772782
commit e993b0c28c
4 changed files with 39 additions and 4 deletions

View File

@ -40,7 +40,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.4b3.dev185', version='2.4b3.dev186',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -321,15 +321,24 @@ class WarcproxController(object):
processor.start() processor.start()
def shutdown(self): def shutdown(self):
'''
Shut down, aborting active connections, but allowing completed fetches
to finish processing.
1. stop accepting new connections
2. shut down active connections to remote servers (resulting in sending
http 502 to the proxy clients)
3. shut down the postfetch processors one by one, in order, letting
them finish process their queues
'''
with self._start_stop_lock: with self._start_stop_lock:
if not self.proxy_thread or not self.proxy_thread.is_alive(): if not self.proxy_thread or not self.proxy_thread.is_alive():
self.logger.info('warcprox is not running') self.logger.info('warcprox is not running')
return return
for processor in self._postfetch_chain:
processor.stop.set()
self.proxy.shutdown() self.proxy.shutdown()
self.proxy.server_close() self.proxy.server_close()
self.proxy_thread.join()
if self.playback_proxy is not None: if self.playback_proxy is not None:
self.playback_proxy.shutdown() self.playback_proxy.shutdown()
@ -338,9 +347,9 @@ class WarcproxController(object):
self.playback_proxy.playback_index_db.close() self.playback_proxy.playback_index_db.close()
for processor in self._postfetch_chain: for processor in self._postfetch_chain:
processor.stop.set()
processor.join() processor.join()
self.proxy_thread.join()
if self.playback_proxy is not None: if self.playback_proxy is not None:
self.playback_proxy_thread.join() self.playback_proxy_thread.join()

View File

@ -382,6 +382,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
return return
try: try:
self.server.register_remote_server_sock(
self._remote_server_conn.sock)
return self._proxy_request() return self._proxy_request()
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
@ -389,6 +391,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.requestline, e, exc_info=True) self.requestline, e, exc_info=True)
self.send_error(502, str(e)) self.send_error(502, str(e))
return return
finally:
self.server.unregister_remote_server_sock(
self._remote_server_conn.sock)
def send_error(self, code, message=None, explain=None): def send_error(self, code, message=None, explain=None):
# BaseHTTPRequestHandler.send_response_only() in http/server.py # BaseHTTPRequestHandler.send_response_only() in http/server.py
@ -556,6 +561,18 @@ class PooledMixIn(socketserver.ThreadingMixIn):
return res return res
class MitmProxy(http_server.HTTPServer): class MitmProxy(http_server.HTTPServer):
def __init__(self, *args, **kwargs):
self.remote_server_socks = set()
self.remote_server_socks_lock = threading.Lock()
def register_remote_server_sock(self, sock):
with self.remote_server_socks_lock:
self.remote_server_socks.add(sock)
def unregister_remote_server_sock(self, sock):
with self.remote_server_socks_lock:
self.remote_server_socks.discard(sock)
def finish_request(self, request, client_address): def finish_request(self, request, client_address):
''' '''
We override socketserver.BaseServer.finish_request to get at We override socketserver.BaseServer.finish_request to get at
@ -593,6 +610,7 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
PooledMixIn.__init__(self, options.max_threads) PooledMixIn.__init__(self, options.max_threads)
MitmProxy.__init__(self)
self.profilers = collections.defaultdict(cProfile.Profile) self.profilers = collections.defaultdict(cProfile.Profile)
if options.profile: if options.profile:
@ -622,3 +640,10 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
self.handle_error(request, client_address) self.handle_error(request, client_address)
self.shutdown_request(request) self.shutdown_request(request)
def server_close(self):
'''
Abort active connections to remote servers to achieve prompt shutdown.
'''
for sock in self.remote_server_socks:
self.shutdown_request(sock)

View File

@ -525,6 +525,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
def server_close(self): def server_close(self):
self.logger.notice('shutting down') self.logger.notice('shutting down')
http_server.HTTPServer.server_close(self) http_server.HTTPServer.server_close(self)
warcprox.mitmproxy.PooledMitmProxy.server_close(self)
self.remote_connection_pool.clear() self.remote_connection_pool.clear()
def handle_error(self, request, client_address): def handle_error(self, request, client_address):