diff --git a/setup.py b/setup.py index 2f41b52..7b595eb 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b3.dev185', + version='2.4b3.dev186', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index 9d20e71..9ec369d 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -321,15 +321,24 @@ class WarcproxController(object): processor.start() def shutdown(self): + ''' + Shut down, aborting active connections, but allowing completed fetches + to finish processing. + + 1. stop accepting new connections + 2. shut down active connections to remote servers (resulting in sending + http 502 to the proxy clients) + 3. shut down the postfetch processors one by one, in order, letting + them finish process their queues + ''' with self._start_stop_lock: if not self.proxy_thread or not self.proxy_thread.is_alive(): self.logger.info('warcprox is not running') return - for processor in self._postfetch_chain: - processor.stop.set() self.proxy.shutdown() self.proxy.server_close() + self.proxy_thread.join() if self.playback_proxy is not None: self.playback_proxy.shutdown() @@ -338,9 +347,9 @@ class WarcproxController(object): self.playback_proxy.playback_index_db.close() for processor in self._postfetch_chain: + processor.stop.set() processor.join() - self.proxy_thread.join() if self.playback_proxy is not None: self.playback_proxy_thread.join() diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 4153e54..13492e0 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -382,6 +382,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return try: + self.server.register_remote_server_sock( + self._remote_server_conn.sock) return self._proxy_request() except Exception as e: self.logger.error( @@ -389,6 +391,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.requestline, e, exc_info=True) self.send_error(502, str(e)) return + finally: + self.server.unregister_remote_server_sock( + self._remote_server_conn.sock) def send_error(self, code, message=None, explain=None): # BaseHTTPRequestHandler.send_response_only() in http/server.py @@ -556,6 +561,18 @@ class PooledMixIn(socketserver.ThreadingMixIn): return res class MitmProxy(http_server.HTTPServer): + def __init__(self, *args, **kwargs): + self.remote_server_socks = set() + self.remote_server_socks_lock = threading.Lock() + + def register_remote_server_sock(self, sock): + with self.remote_server_socks_lock: + self.remote_server_socks.add(sock) + + def unregister_remote_server_sock(self, sock): + with self.remote_server_socks_lock: + self.remote_server_socks.discard(sock) + def finish_request(self, request, client_address): ''' We override socketserver.BaseServer.finish_request to get at @@ -593,6 +610,7 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): def __init__(self, options=warcprox.Options()): PooledMixIn.__init__(self, options.max_threads) + MitmProxy.__init__(self) self.profilers = collections.defaultdict(cProfile.Profile) if options.profile: @@ -622,3 +640,10 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): self.handle_error(request, client_address) self.shutdown_request(request) + def server_close(self): + ''' + Abort active connections to remote servers to achieve prompt shutdown. + ''' + for sock in self.remote_server_socks: + self.shutdown_request(sock) + diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 4ee194c..e2649f0 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -525,6 +525,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): def server_close(self): self.logger.notice('shutting down') http_server.HTTPServer.server_close(self) + warcprox.mitmproxy.PooledMitmProxy.server_close(self) self.remote_connection_pool.clear() def handle_error(self, request, client_address):