diff --git a/.gitignore b/.gitignore index 72e3644..1da5ebc 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ warcs build dist .tox +out.* diff --git a/warcprox/controller.py b/warcprox/controller.py index 63af764..3c7dfe1 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -44,9 +44,6 @@ class WarcproxController(object): def debug_mem(self): self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) - if self.proxy.stats_db and hasattr(self.proxy.stats_db, "_executor"): - self.logger.info("self.proxy.stats_db._executor._work_queue.qsize()=%s", - self.proxy.stats_db._executor._work_queue.qsize()) with open("/proc/self/status") as f: for line in f: fields = line.split() @@ -118,6 +115,7 @@ class WarcproxController(object): 'port': self.options.port, } status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / (self.proxy.recorded_url_q.maxsize or 100) + status_info['queue_size'] = self.proxy.recorded_url_q.qsize() self.status_info = self.service_registry.heartbeat(status_info) self.logger.debug("status in service registry: %s", self.status_info) @@ -154,9 +152,9 @@ class WarcproxController(object): if self.service_registry and (not hasattr(self, "status_info") or (datetime.datetime.now(utc) - self.status_info["last_heartbeat"]).total_seconds() > self.HEARTBEAT_INTERVAL): self._service_heartbeat() - # if (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60: - # self.debug_mem() - # last_mem_dbg = datetime.datetime.utcnow() + if self.options.profile and (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60: + self.debug_mem() + last_mem_dbg = datetime.datetime.utcnow() time.sleep(0.5) except: @@ -176,10 +174,15 @@ class WarcproxController(object): # wait for threads to finish self.warc_writer_thread.join() - if self.warc_writer_thread.dedup_db is not None: + if self.proxy.stats_db: + self.proxy.stats_db.close() + if self.warc_writer_thread.dedup_db: self.warc_writer_thread.dedup_db.close() proxy_thread.join() if self.playback_proxy is not None: playback_proxy_thread.join() + if self.service_registry and hasattr(self, "status_info"): + self.service_registry.unregister(self.status_info["id"]) + diff --git a/warcprox/main.py b/warcprox/main.py index 7976753..0854cee 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -80,6 +80,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default=None, help='kafka capture feed topic') arg_parser.add_argument('--queue-size', dest='queue_size', default=1000, help=argparse.SUPPRESS) + arg_parser.add_argument('--profile', action='store_true', default=False, + help=argparse.SUPPRESS) arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.__version__)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') diff --git a/warcprox/stats.py b/warcprox/stats.py index 44b724b..316531d 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -16,6 +16,7 @@ import random import warcprox import threading import rethinkdb as r +import datetime def _empty_bucket(bucket): return { @@ -160,7 +161,7 @@ class RethinkStatsDb: if not self._stop.is_set(): self._timer = threading.Timer(0.5, self._update_batch) - self._timer.name = "RethinkCaptures-batch-insert-timer" + self._timer.name = "RethinkStats-batch-update-timer-%s" % datetime.datetime.utcnow().isoformat() self._timer.start() else: self.logger.info("finished") diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 4e19d4f..93107b1 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# vim:set sw=4 et: # """ WARC writing MITM HTTP/S proxy @@ -151,8 +150,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): key, limit = item bucket0, bucket1, bucket2 = key.rsplit(".", 2) value = self.server.stats_db.value(bucket0, bucket1, bucket2) - # self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s", - # warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize()) + self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s", + warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize()) if value and value >= limit: body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8") self.send_response(420, "Reached limit") @@ -369,7 +368,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): if recorded_url_q is not None: self.recorded_url_q = recorded_url_q else: - self.recorded_url_q = queue.Queue() + self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000) self.stats_db = stats_db @@ -383,6 +382,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): self.logger.info('WarcProxy shutting down') http_server.HTTPServer.server_close(self) + def handle_error(self, request, client_address): + self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True) class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy): pass diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index c69e514..25beff8 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -36,7 +36,10 @@ class WarcWriterThread(threading.Thread): self.idle = None def run(self): - cProfile.runctx('self._run()', globals(), locals(), sort='cumulative') + if self.options.profile: + cProfile.runctx('self._run()', globals(), locals(), sort='cumulative') + else: + self._run() def _run(self): while not self.stop.is_set(): @@ -44,6 +47,11 @@ class WarcWriterThread(threading.Thread): self.name = 'WarcWriterThread(tid={})'.format(warcprox.gettid()) while True: try: + if self.stop.is_set(): + qsize = self.recorded_url_q.qsize() + if qsize % 50 == 0: + self.logger.info("%s urls left to write", qsize) + recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) self.idle = None if self.dedup_db: