diff --git a/warcprox/controller.py b/warcprox/controller.py index 26e88fc..89d420d 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,6 @@ from __future__ import absolute_import import logging import threading -import signal import time import warcprox.warcprox @@ -36,12 +35,10 @@ class WarcproxController(object): self.playback_proxy = playback_proxy - def run_until_shutdown(self): - """Start warcprox and run until shut down. - - If running in the main thread, SIGTERM initiates a graceful shutdown. - Otherwise, call warcprox_controller.stop.set(). + """ + Start warcprox and run until shut down. Call + warcprox_controller.stop.set() to initiate graceful shutdown. """ proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') proxy_thread.start() @@ -53,16 +50,11 @@ class WarcproxController(object): self.stop = threading.Event() - try: - signal.signal(signal.SIGTERM, self.stop.set) - self.logger.info('SIGTERM will initiate graceful shutdown') - except ValueError: - pass - try: while not self.stop.is_set(): time.sleep(0.5) except: + self.logger.critical("fatal exception, shutting down", exc_info=1) pass finally: self.warc_writer_thread.stop.set() diff --git a/warcprox/main.py b/warcprox/main.py index 147a030..0ab6885 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -14,6 +14,10 @@ import hashlib import argparse import os import socket +import pprint +import traceback +import signal +import threading import certauth.certauth @@ -76,6 +80,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): return arg_parser +def dump_state(signum=None, frame=None): + pp = pprint.PrettyPrinter(indent=4) + state_strs = [] + + for th in threading.enumerate(): + state_strs.append(str(th)) + stack = traceback.format_stack(sys._current_frames()[th.ident]) + state_strs.append("".join(stack)) + + logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + + def main(argv=sys.argv): arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) args = arg_parser.parse_args(args=argv[1:]) @@ -133,6 +149,11 @@ def main(argv=sys.argv): default_warc_writer=default_warc_writer) controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) + + signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) + signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) + signal.signal(signal.SIGQUIT, dump_state) + controller.run_until_shutdown() diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b1a9a5b..edc9657 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -83,13 +83,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._transition_to_ssl() except Exception as e: try: - self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e)) + self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e), exc_info=True) if type(e) is socket.timeout: self.send_error(504, str(e)) else: self.send_error(500, str(e)) except Exception as f: - self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f)) + self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f), exc_info=True) return # Reload! diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 4f0cafe..930a290 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -48,7 +48,7 @@ class ProxyingRecorder(object): logger = logging.getLogger("warcprox.warcprox.ProxyingRecorder") - def __init__(self, fp, proxy_dest, digest_algorithm='sha1'): + def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None): self.fp = fp # "The file has no name, and will cease to exist when it is closed." self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) @@ -60,6 +60,7 @@ class ProxyingRecorder(object): self._proxy_dest_conn_open = True self._prev_hunk_last_two_bytes = b'' self.len = 0 + self.url = url def _update_payload_digest(self, hunk): if self.payload_digest is None: @@ -103,8 +104,8 @@ class ProxyingRecorder(object): self.proxy_dest.sendall(hunk) except BaseException as e: self._proxy_dest_conn_open = False - self.logger.warn('{} sending data to proxy client'.format(e)) - self.logger.info('will continue downloading from remote server without sending to client') + self.logger.warn('{} sending data to proxy client for url {}'.format(e, self.url)) + self.logger.info('will continue downloading from remote server without sending to client {}'.format(self.url)) self.len += len(hunk) @@ -140,12 +141,13 @@ class ProxyingRecorder(object): class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): - def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1'): + def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None): http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method) + self.url = url # Keep around extra reference to self.fp because HTTPResponse sets # self.fp=None after it finishes reading, but we still need it - self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm) + self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url) self.fp = self.recorder @@ -193,7 +195,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # Proxy and record the response h = ProxyingRecordingHTTPResponse(self._proxy_sock, proxy_dest=self.connection, - digest_algorithm=self.server.digest_algorithm) + digest_algorithm=self.server.digest_algorithm, + url=self.url) h.begin() buf = h.read(8192) diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py index 936e485..0d57bda 100644 --- a/warcprox/warcwriter.py +++ b/warcprox/warcwriter.py @@ -252,7 +252,10 @@ class WarcWriter: except: payload_digest = "-" mimetype = self._decode(recorded_url.content_type) - mimetype = mimetype[:mimetype.find(";")] + if mimetype: + n = mimetype.find(";") + if n >= 0: + mimetype = mimetype[:n] # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} self.logger.info("{} {} {} {} {} size={} {} {} offset={}".format( @@ -327,32 +330,34 @@ class WarcWriterThread(threading.Thread): w.write_records(recorded_url) def run(self): - self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, - self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) + try: + self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, + self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) - self._last_sync = time.time() + self._last_sync = time.time() - while not self.stop.is_set(): - try: - recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - self.write_records(recorded_url) - except queue.Empty: - self.default_warc_writer.maybe_idle_rollover() - for w in self.warc_writers.values(): - w.maybe_idle_rollover() + while not self.stop.is_set(): + try: + recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + self.write_records(recorded_url) + except queue.Empty: + self.default_warc_writer.maybe_idle_rollover() + for w in self.warc_writers.values(): + w.maybe_idle_rollover() - # XXX prob doesn't belong here (do we need it at all?) - if time.time() - self._last_sync > 60: - if self.default_warc_writer.dedup_db: - self.default_warc_writer.dedup_db.sync() - if self.default_warc_writer.playback_index_db: - self.default_warc_writer.playback_index_db.sync() - self._last_sync = time.time() - - self.logger.info('WarcWriterThread shutting down') - self.default_warc_writer.close_writer() - for w in self.warc_writers.values(): - w.close_writer() + # XXX prob doesn't belong here (do we need it at all?) + if time.time() - self._last_sync > 60: + if self.default_warc_writer.dedup_db: + self.default_warc_writer.dedup_db.sync() + if self.default_warc_writer.playback_index_db: + self.default_warc_writer.playback_index_db.sync() + self._last_sync = time.time() + self.logger.info('WarcWriterThread shutting down') + self.default_warc_writer.close_writer() + for w in self.warc_writers.values(): + w.close_writer() + except: + self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)