mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
dump thread tracebacks on sigquit, more logging and exception handling tweaks
This commit is contained in:
parent
86eab2119a
commit
084bd75ed6
@ -4,7 +4,6 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import signal
|
||||
import time
|
||||
|
||||
import warcprox.warcprox
|
||||
@ -36,12 +35,10 @@ class WarcproxController(object):
|
||||
|
||||
self.playback_proxy = playback_proxy
|
||||
|
||||
|
||||
def run_until_shutdown(self):
|
||||
"""Start warcprox and run until shut down.
|
||||
|
||||
If running in the main thread, SIGTERM initiates a graceful shutdown.
|
||||
Otherwise, call warcprox_controller.stop.set().
|
||||
"""
|
||||
Start warcprox and run until shut down. Call
|
||||
warcprox_controller.stop.set() to initiate graceful shutdown.
|
||||
"""
|
||||
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
|
||||
proxy_thread.start()
|
||||
@ -53,16 +50,11 @@ class WarcproxController(object):
|
||||
|
||||
self.stop = threading.Event()
|
||||
|
||||
try:
|
||||
signal.signal(signal.SIGTERM, self.stop.set)
|
||||
self.logger.info('SIGTERM will initiate graceful shutdown')
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
while not self.stop.is_set():
|
||||
time.sleep(0.5)
|
||||
except:
|
||||
self.logger.critical("fatal exception, shutting down", exc_info=1)
|
||||
pass
|
||||
finally:
|
||||
self.warc_writer_thread.stop.set()
|
||||
|
@ -14,6 +14,10 @@ import hashlib
|
||||
import argparse
|
||||
import os
|
||||
import socket
|
||||
import pprint
|
||||
import traceback
|
||||
import signal
|
||||
import threading
|
||||
|
||||
import certauth.certauth
|
||||
|
||||
@ -76,6 +80,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
||||
return arg_parser
|
||||
|
||||
|
||||
def dump_state(signum=None, frame=None):
|
||||
pp = pprint.PrettyPrinter(indent=4)
|
||||
state_strs = []
|
||||
|
||||
for th in threading.enumerate():
|
||||
state_strs.append(str(th))
|
||||
stack = traceback.format_stack(sys._current_frames()[th.ident])
|
||||
state_strs.append("".join(stack))
|
||||
|
||||
logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
|
||||
|
||||
|
||||
def main(argv=sys.argv):
|
||||
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
|
||||
args = arg_parser.parse_args(args=argv[1:])
|
||||
@ -133,6 +149,11 @@ def main(argv=sys.argv):
|
||||
default_warc_writer=default_warc_writer)
|
||||
|
||||
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
||||
|
||||
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
|
||||
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
|
||||
signal.signal(signal.SIGQUIT, dump_state)
|
||||
|
||||
controller.run_until_shutdown()
|
||||
|
||||
|
||||
|
@ -83,13 +83,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
||||
self._transition_to_ssl()
|
||||
except Exception as e:
|
||||
try:
|
||||
self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e))
|
||||
self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e), exc_info=True)
|
||||
if type(e) is socket.timeout:
|
||||
self.send_error(504, str(e))
|
||||
else:
|
||||
self.send_error(500, str(e))
|
||||
except Exception as f:
|
||||
self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f))
|
||||
self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f), exc_info=True)
|
||||
return
|
||||
|
||||
# Reload!
|
||||
|
@ -48,7 +48,7 @@ class ProxyingRecorder(object):
|
||||
|
||||
logger = logging.getLogger("warcprox.warcprox.ProxyingRecorder")
|
||||
|
||||
def __init__(self, fp, proxy_dest, digest_algorithm='sha1'):
|
||||
def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None):
|
||||
self.fp = fp
|
||||
# "The file has no name, and will cease to exist when it is closed."
|
||||
self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024)
|
||||
@ -60,6 +60,7 @@ class ProxyingRecorder(object):
|
||||
self._proxy_dest_conn_open = True
|
||||
self._prev_hunk_last_two_bytes = b''
|
||||
self.len = 0
|
||||
self.url = url
|
||||
|
||||
def _update_payload_digest(self, hunk):
|
||||
if self.payload_digest is None:
|
||||
@ -103,8 +104,8 @@ class ProxyingRecorder(object):
|
||||
self.proxy_dest.sendall(hunk)
|
||||
except BaseException as e:
|
||||
self._proxy_dest_conn_open = False
|
||||
self.logger.warn('{} sending data to proxy client'.format(e))
|
||||
self.logger.info('will continue downloading from remote server without sending to client')
|
||||
self.logger.warn('{} sending data to proxy client for url {}'.format(e, self.url))
|
||||
self.logger.info('will continue downloading from remote server without sending to client {}'.format(self.url))
|
||||
|
||||
self.len += len(hunk)
|
||||
|
||||
@ -140,12 +141,13 @@ class ProxyingRecorder(object):
|
||||
|
||||
class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
|
||||
|
||||
def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1'):
|
||||
def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None):
|
||||
http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method)
|
||||
self.url = url
|
||||
|
||||
# Keep around extra reference to self.fp because HTTPResponse sets
|
||||
# self.fp=None after it finishes reading, but we still need it
|
||||
self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm)
|
||||
self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url)
|
||||
self.fp = self.recorder
|
||||
|
||||
|
||||
@ -193,7 +195,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
# Proxy and record the response
|
||||
h = ProxyingRecordingHTTPResponse(self._proxy_sock,
|
||||
proxy_dest=self.connection,
|
||||
digest_algorithm=self.server.digest_algorithm)
|
||||
digest_algorithm=self.server.digest_algorithm,
|
||||
url=self.url)
|
||||
h.begin()
|
||||
|
||||
buf = h.read(8192)
|
||||
|
@ -252,7 +252,10 @@ class WarcWriter:
|
||||
except:
|
||||
payload_digest = "-"
|
||||
mimetype = self._decode(recorded_url.content_type)
|
||||
mimetype = mimetype[:mimetype.find(";")]
|
||||
if mimetype:
|
||||
n = mimetype.find(";")
|
||||
if n >= 0:
|
||||
mimetype = mimetype[:n]
|
||||
|
||||
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
|
||||
self.logger.info("{} {} {} {} {} size={} {} {} offset={}".format(
|
||||
@ -327,32 +330,34 @@ class WarcWriterThread(threading.Thread):
|
||||
w.write_records(recorded_url)
|
||||
|
||||
def run(self):
|
||||
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
||||
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
|
||||
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
|
||||
try:
|
||||
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
||||
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
|
||||
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
|
||||
|
||||
self._last_sync = time.time()
|
||||
self._last_sync = time.time()
|
||||
|
||||
while not self.stop.is_set():
|
||||
try:
|
||||
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
||||
self.write_records(recorded_url)
|
||||
except queue.Empty:
|
||||
self.default_warc_writer.maybe_idle_rollover()
|
||||
for w in self.warc_writers.values():
|
||||
w.maybe_idle_rollover()
|
||||
while not self.stop.is_set():
|
||||
try:
|
||||
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
||||
self.write_records(recorded_url)
|
||||
except queue.Empty:
|
||||
self.default_warc_writer.maybe_idle_rollover()
|
||||
for w in self.warc_writers.values():
|
||||
w.maybe_idle_rollover()
|
||||
|
||||
# XXX prob doesn't belong here (do we need it at all?)
|
||||
if time.time() - self._last_sync > 60:
|
||||
if self.default_warc_writer.dedup_db:
|
||||
self.default_warc_writer.dedup_db.sync()
|
||||
if self.default_warc_writer.playback_index_db:
|
||||
self.default_warc_writer.playback_index_db.sync()
|
||||
self._last_sync = time.time()
|
||||
|
||||
self.logger.info('WarcWriterThread shutting down')
|
||||
self.default_warc_writer.close_writer()
|
||||
for w in self.warc_writers.values():
|
||||
w.close_writer()
|
||||
# XXX prob doesn't belong here (do we need it at all?)
|
||||
if time.time() - self._last_sync > 60:
|
||||
if self.default_warc_writer.dedup_db:
|
||||
self.default_warc_writer.dedup_db.sync()
|
||||
if self.default_warc_writer.playback_index_db:
|
||||
self.default_warc_writer.playback_index_db.sync()
|
||||
self._last_sync = time.time()
|
||||
|
||||
self.logger.info('WarcWriterThread shutting down')
|
||||
self.default_warc_writer.close_writer()
|
||||
for w in self.warc_writers.values():
|
||||
w.close_writer()
|
||||
except:
|
||||
self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user