register self for service discovery

This commit is contained in:
Noah Levitt 2015-10-31 01:17:45 +00:00
parent 7e731d40bc
commit d7d992731c
2 changed files with 38 additions and 8 deletions

View File

@ -6,11 +6,16 @@ import time
import warcprox
import sys
import gc
import datetime
class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController")
def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None, options=warcprox.Options()):
HEARTBEAT_INTERVAL = 20.0
def __init__(self, proxy=None, warc_writer_thread=None,
playback_proxy=None, service_registry=None,
options=warcprox.Options()):
"""
Create warcprox controller.
@ -32,6 +37,7 @@ class WarcproxController(object):
self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q)
self.playback_proxy = playback_proxy
self.service_registry = service_registry
self.options = options
self._last_rss = None
@ -102,6 +108,19 @@ class WarcproxController(object):
self._last_rss = rss
def _service_heartbeat(self):
if hasattr(self, 'status_info'):
status_info = self.status_info
else:
status_info = {
'role': 'warcprox',
'heartbeat_interval': self.HEARTBEAT_INTERVAL,
}
status_info['load'] = self.proxy.recorded_url_q.qsize() / (self.proxy.recorded_url_q.maxsize or 100)
self.status_info = self.service_registry.heartbeat(status_info)
self.logger.debug("status in service registry: %s", self.status_info)
def run_until_shutdown(self):
"""
Start warcprox and run until shut down. Call
@ -117,15 +136,18 @@ class WarcproxController(object):
self.stop = threading.Event()
self.debug_mem()
last_mem_dbg = datetime.datetime.utcfromtimestamp(0)
try:
t = time.time() - 30
while not self.stop.is_set():
time.sleep(0.5)
if time.time() - t > 60:
if not hasattr(self, "status_info") or (datetime.datetime.now(datetime.timezone.utc) - self.status_info["last_heartbeat"]).total_seconds() > self.HEARTBEAT_INTERVAL:
self._service_heartbeat()
if (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60:
self.debug_mem()
t = time.time()
last_mem_dbg = datetime.datetime.utcnow()
time.sleep(0.5)
except:
self.logger.critical("fatal exception, shutting down", exc_info=True)
pass

View File

@ -92,7 +92,10 @@ def dump_state(signum=None, frame=None):
state_strs = []
for th in threading.enumerate():
state_strs.append(str(th))
try:
state_strs.append(str(th))
except AssertionError:
state_strs.append("<n/a:AssertionError>")
stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack))
@ -163,7 +166,12 @@ def init_controller(args):
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=listeners, options=options)
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options=options)
if args.rethinkdb_servers:
svcreg = rethinkstuff.ServiceRegistry(r)
controller = warcprox.controller.WarcproxController(proxy,
warc_writer_thread, playback_proxy, service_registry=svcreg,
options=options)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())