diff --git a/warcprox/controller.py b/warcprox/controller.py index 1850857..b431895 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -1,5 +1,3 @@ -# vim: set sw=4 et: - from __future__ import absolute_import import logging @@ -36,6 +34,74 @@ class WarcproxController(object): self.playback_proxy = playback_proxy self.options = options + self._last_rss = None + + def debug_mem(self): + self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) + if self.proxy.stats_db and hasattr(self.proxy.stats_db, "_executor"): + self.logger.info("self.proxy.stats_db._executor._work_queue.qsize()=%s", + self.proxy.stats_db._executor._work_queue.qsize()) + with open("/proc/self/status") as f: + for line in f: + fields = line.split() + if len(fields) >= 2: + k, v = fields[0:2] + if k == "VmHWM:": + hwm = int(v) + elif k == "VmRSS:": + rss = int(v) + elif k == "VmData:": + data = int(v) + elif k == "VmStk:": + stk = int(v) + self.logger.info("rss=%s data=%s stack=%s hwm=%s", rss, data, stk, hwm) + self._last_rss = self._last_rss or rss # to set initial value + + if rss - self._last_rss > 1024: + num_unreachable = gc.collect() + all_objects = gc.get_objects() + total_size = 0 + summary = {} + biggest_objects = [None] * 10 + for obj in all_objects: + size = sys.getsizeof(obj) + total_size += size + if not type(obj) in summary: + summary[type(obj)] = {"count":0,"size":0} + summary[type(obj)]["count"] += 1 + summary[type(obj)]["size"] += size + if size > sys.getsizeof(biggest_objects[-1]): + for i in range(len(biggest_objects)): + if size > sys.getsizeof(biggest_objects[i]): + index = i + break + biggest_objects[index+1:] = biggest_objects[index:-1] + biggest_objects[index] = obj + + self.logger.info("%s objects totaling %s bytes", len(all_objects), total_size) + + self.logger.info("=== biggest types ===") + for item in sorted(summary.items(), key=lambda item: item[1]["size"], reverse=True)[:10]: + self.logger.info("%s bytes in %s instances of %s", item[1]["size"], item[1]["count"], item[0]) + + self.logger.info("=== warcprox types ===") + for t in (t for t in summary if str(t).find("warcprox") >= 0): + self.logger.info("%s bytes in %s instances of %s", summary[t]["size"], summary[t]["count"], t) + + for i in range(len(biggest_objects)): + obj = biggest_objects[i] + try: + value = repr(bytes(obj.getbuffer()[:100])) + except: + try: + value = repr(obj)[:100] + except BaseException as e: + value = "<{} getting value>".format(e) + self.logger.info("#%s (%s) (%s bytes) (%s refs) (id=%s): %s", i+1, type(obj), sys.getsizeof(obj), sys.getrefcount(obj), id(obj), value) + self.logger.info("%s unreachable objects totaling %s bytes", len(gc.garbage), sum(sys.getsizeof(x) for x in gc.garbage)) + + self._last_rss = rss + def run_until_shutdown(self): """ Start warcprox and run until shut down. Call @@ -51,46 +117,14 @@ class WarcproxController(object): self.stop = threading.Event() + self.debug_mem() + try: - t = time.time() - 30 + t = time.time() - 30 while not self.stop.is_set(): time.sleep(0.5) if time.time() - t > 60: - num_unreachable = gc.collect() - all_objects = gc.get_objects() - total_size = 0 - summary = {} - biggest_objects = [None] * 10 - for obj in all_objects: - size = sys.getsizeof(obj) - total_size += size - if not type(obj) in summary: - summary[type(obj)] = {"count":0,"size":0} - summary[type(obj)]["count"] += 1 - summary[type(obj)]["size"] += size - if size > sys.getsizeof(biggest_objects[-1]): - for i in range(len(biggest_objects)): - if size > sys.getsizeof(biggest_objects[i]): - index = i - break - biggest_objects[index+1:] = biggest_objects[index:-1] - biggest_objects[index] = obj - - self.logger.info("%s objects totaling %s bytes", len(all_objects), total_size) - for item in sorted(summary.items(), key=lambda item: item[1]["size"], reverse=True)[:10]: - self.logger.info("%s bytes in %s instances of %s", item[1]["size"], item[1]["count"], item[0]) - for i in range(len(biggest_objects)): - obj = biggest_objects[i] - try: - value = repr(bytes(obj.getbuffer()[:100])) - except: - try: - value = repr(obj)[:100] - except BaseException as e: - value = "<{} getting value>".format(e) - self.logger.info("#%s (%s) (%s bytes) (%s refs) (id=%s): %s", i+1, type(obj), sys.getsizeof(obj), sys.getrefcount(obj), id(obj), value) - self.logger.info("%s unreachable objects totaling %s bytes", len(gc.garbage), sum(sys.getsizeof(x) for x in gc.garbage)) - + self.debug_mem() t = time.time() except: self.logger.critical("fatal exception, shutting down", exc_info=True)