From d7d992731c6af30b8ee354da8fd923246dae3925 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 31 Oct 2015 01:17:45 +0000 Subject: [PATCH] register self for service discovery --- warcprox/controller.py | 34 ++++++++++++++++++++++++++++------ warcprox/main.py | 12 ++++++++++-- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/warcprox/controller.py b/warcprox/controller.py index b431895..d8813f6 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -6,11 +6,16 @@ import time import warcprox import sys import gc +import datetime class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") - def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None, options=warcprox.Options()): + HEARTBEAT_INTERVAL = 20.0 + + def __init__(self, proxy=None, warc_writer_thread=None, + playback_proxy=None, service_registry=None, + options=warcprox.Options()): """ Create warcprox controller. @@ -32,6 +37,7 @@ class WarcproxController(object): self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) self.playback_proxy = playback_proxy + self.service_registry = service_registry self.options = options self._last_rss = None @@ -102,6 +108,19 @@ class WarcproxController(object): self._last_rss = rss + def _service_heartbeat(self): + if hasattr(self, 'status_info'): + status_info = self.status_info + else: + status_info = { + 'role': 'warcprox', + 'heartbeat_interval': self.HEARTBEAT_INTERVAL, + } + status_info['load'] = self.proxy.recorded_url_q.qsize() / (self.proxy.recorded_url_q.maxsize or 100) + + self.status_info = self.service_registry.heartbeat(status_info) + self.logger.debug("status in service registry: %s", self.status_info) + def run_until_shutdown(self): """ Start warcprox and run until shut down. Call @@ -117,15 +136,18 @@ class WarcproxController(object): self.stop = threading.Event() - self.debug_mem() + last_mem_dbg = datetime.datetime.utcfromtimestamp(0) try: - t = time.time() - 30 while not self.stop.is_set(): - time.sleep(0.5) - if time.time() - t > 60: + if not hasattr(self, "status_info") or (datetime.datetime.now(datetime.timezone.utc) - self.status_info["last_heartbeat"]).total_seconds() > self.HEARTBEAT_INTERVAL: + self._service_heartbeat() + + if (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60: self.debug_mem() - t = time.time() + last_mem_dbg = datetime.datetime.utcnow() + + time.sleep(0.5) except: self.logger.critical("fatal exception, shutting down", exc_info=True) pass diff --git a/warcprox/main.py b/warcprox/main.py index 8d471de..2d6c2e2 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -92,7 +92,10 @@ def dump_state(signum=None, frame=None): state_strs = [] for th in threading.enumerate(): - state_strs.append(str(th)) + try: + state_strs.append(str(th)) + except AssertionError: + state_strs.append("") stack = traceback.format_stack(sys._current_frames()[th.ident]) state_strs.append("".join(stack)) @@ -163,7 +166,12 @@ def init_controller(args): recorded_url_q=recorded_url_q, writer_pool=writer_pool, dedup_db=dedup_db, listeners=listeners, options=options) - controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options=options) + if args.rethinkdb_servers: + svcreg = rethinkstuff.ServiceRegistry(r) + + controller = warcprox.controller.WarcproxController(proxy, + warc_writer_thread, playback_proxy, service_registry=svcreg, + options=options) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())