mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
don't implement __del__, maybe it can cause mem leaks; bunch of logging to try to detect leaks
This commit is contained in:
parent
3b9345e7d7
commit
dd1c7b5f7d
@ -6,6 +6,8 @@ import logging
|
||||
import threading
|
||||
import time
|
||||
import warcprox
|
||||
import sys
|
||||
import gc
|
||||
|
||||
class WarcproxController(object):
|
||||
logger = logging.getLogger("warcprox.controller.WarcproxController")
|
||||
@ -50,10 +52,48 @@ class WarcproxController(object):
|
||||
self.stop = threading.Event()
|
||||
|
||||
try:
|
||||
t = time.time() - 30
|
||||
while not self.stop.is_set():
|
||||
time.sleep(0.5)
|
||||
if time.time() - t > 60:
|
||||
num_unreachable = gc.collect()
|
||||
all_objects = gc.get_objects()
|
||||
total_size = 0
|
||||
summary = {}
|
||||
biggest_objects = [None] * 10
|
||||
for obj in all_objects:
|
||||
size = sys.getsizeof(obj)
|
||||
total_size += size
|
||||
if not type(obj) in summary:
|
||||
summary[type(obj)] = {"count":0,"size":0}
|
||||
summary[type(obj)]["count"] += 1
|
||||
summary[type(obj)]["size"] += size
|
||||
if size > sys.getsizeof(biggest_objects[-1]):
|
||||
for i in range(len(biggest_objects)):
|
||||
if size > sys.getsizeof(biggest_objects[i]):
|
||||
index = i
|
||||
break
|
||||
biggest_objects[index+1:] = biggest_objects[index:-1]
|
||||
biggest_objects[index] = obj
|
||||
|
||||
self.logger.info("%s objects totaling %s bytes", len(all_objects), total_size)
|
||||
for item in sorted(summary.items(), key=lambda item: item[1]["size"], reverse=True)[:10]:
|
||||
self.logger.info("%s bytes in %s instances of %s", item[1]["size"], item[1]["count"], item[0])
|
||||
for i in range(len(biggest_objects)):
|
||||
obj = biggest_objects[i]
|
||||
try:
|
||||
value = repr(bytes(obj.getbuffer()[:100]))
|
||||
except:
|
||||
try:
|
||||
value = repr(obj)[:100]
|
||||
except BaseException as e:
|
||||
value = "<{} getting value>".format(e)
|
||||
self.logger.info("#%s (%s) (%s bytes) (%s refs) (id=%s): %s", i+1, type(obj), sys.getsizeof(obj), sys.getrefcount(obj), id(obj), value)
|
||||
self.logger.info("%s unreachable objects totaling %s bytes", len(gc.garbage), sum(sys.getsizeof(x) for x in gc.garbage))
|
||||
|
||||
t = time.time()
|
||||
except:
|
||||
self.logger.critical("fatal exception, shutting down", exc_info=1)
|
||||
self.logger.critical("fatal exception, shutting down", exc_info=True)
|
||||
pass
|
||||
finally:
|
||||
self.warc_writer_thread.stop.set()
|
||||
|
@ -115,7 +115,6 @@ class ProxyingRecorder(object):
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
|
||||
|
||||
def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None):
|
||||
@ -351,11 +350,10 @@ class RecordedUrl:
|
||||
self.host = host
|
||||
self.duration = duration
|
||||
|
||||
def __del__(self):
|
||||
self.logger.debug("finished with %s", self)
|
||||
if self.response_recorder:
|
||||
self.response_recorder.tempfile.close()
|
||||
self.response_recorder = None
|
||||
# def __del__(self):
|
||||
# self.logger.debug("finished with %s", self)
|
||||
# if self.response_recorder:
|
||||
# del self.response_recorder
|
||||
|
||||
|
||||
class SingleThreadedWarcProxy(http_server.HTTPServer):
|
||||
|
@ -49,6 +49,10 @@ class WarcWriterThread(threading.Thread):
|
||||
recorded_url, base32=self.options.base32)
|
||||
records = self.writer_pool.write_records(recorded_url)
|
||||
self._final_tasks(recorded_url, records)
|
||||
|
||||
# try to release resources in a timely fashion
|
||||
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
|
||||
recorded_url.response_recorder.tempfile.close()
|
||||
except queue.Empty:
|
||||
self.idle = time.time()
|
||||
self.writer_pool.maybe_idle_rollover()
|
||||
|
Loading…
x
Reference in New Issue
Block a user