diff --git a/warcprox/controller.py b/warcprox/controller.py index e198006..1850857 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -6,6 +6,8 @@ import logging import threading import time import warcprox +import sys +import gc class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") @@ -50,10 +52,48 @@ class WarcproxController(object): self.stop = threading.Event() try: + t = time.time() - 30 while not self.stop.is_set(): time.sleep(0.5) + if time.time() - t > 60: + num_unreachable = gc.collect() + all_objects = gc.get_objects() + total_size = 0 + summary = {} + biggest_objects = [None] * 10 + for obj in all_objects: + size = sys.getsizeof(obj) + total_size += size + if not type(obj) in summary: + summary[type(obj)] = {"count":0,"size":0} + summary[type(obj)]["count"] += 1 + summary[type(obj)]["size"] += size + if size > sys.getsizeof(biggest_objects[-1]): + for i in range(len(biggest_objects)): + if size > sys.getsizeof(biggest_objects[i]): + index = i + break + biggest_objects[index+1:] = biggest_objects[index:-1] + biggest_objects[index] = obj + + self.logger.info("%s objects totaling %s bytes", len(all_objects), total_size) + for item in sorted(summary.items(), key=lambda item: item[1]["size"], reverse=True)[:10]: + self.logger.info("%s bytes in %s instances of %s", item[1]["size"], item[1]["count"], item[0]) + for i in range(len(biggest_objects)): + obj = biggest_objects[i] + try: + value = repr(bytes(obj.getbuffer()[:100])) + except: + try: + value = repr(obj)[:100] + except BaseException as e: + value = "<{} getting value>".format(e) + self.logger.info("#%s (%s) (%s bytes) (%s refs) (id=%s): %s", i+1, type(obj), sys.getsizeof(obj), sys.getrefcount(obj), id(obj), value) + self.logger.info("%s unreachable objects totaling %s bytes", len(gc.garbage), sum(sys.getsizeof(x) for x in gc.garbage)) + + t = time.time() except: - self.logger.critical("fatal exception, shutting down", exc_info=1) + self.logger.critical("fatal exception, shutting down", exc_info=True) pass finally: self.warc_writer_thread.stop.set() diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 01f94cd..b4d541a 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -115,7 +115,6 @@ class ProxyingRecorder(object): else: return 0 - class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None): @@ -351,11 +350,10 @@ class RecordedUrl: self.host = host self.duration = duration - def __del__(self): - self.logger.debug("finished with %s", self) - if self.response_recorder: - self.response_recorder.tempfile.close() - self.response_recorder = None + # def __del__(self): + # self.logger.debug("finished with %s", self) + # if self.response_recorder: + # del self.response_recorder class SingleThreadedWarcProxy(http_server.HTTPServer): diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 8da6c11..a95a68f 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -49,6 +49,10 @@ class WarcWriterThread(threading.Thread): recorded_url, base32=self.options.base32) records = self.writer_pool.write_records(recorded_url) self._final_tasks(recorded_url, records) + + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() except queue.Empty: self.idle = time.time() self.writer_pool.maybe_idle_rollover()