warcprox/warcprox/controller.py
Noah Levitt fd81190517 refactor the multithreaded warc writing
main functional change is that only as man warc files are created as are
needed to keep up with the throughput
2018-02-07 15:48:43 -08:00

441 lines
17 KiB
Python

'''
warcprox/controller.py - contains WarcproxController class, responsible for
starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import threading
import time
import sys
import gc
import datetime
import warcprox
import certauth
import functools
import doublethink
import importlib
class Factory:
@staticmethod
def dedup_db(options):
if options.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif options.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif options.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif options.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(
cdx_url=options.cdxserver_dedup, options=options)
elif options.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options)
return dedup_db
@staticmethod
def stats_processor(options):
if options.rethinkdb_stats_url:
stats_processor = warcprox.stats.RethinkStatsProcessor(options)
elif options.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_processor = None
else:
stats_processor = warcprox.stats.StatsProcessor(options)
return stats_processor
@staticmethod
def warc_writer_processor(options):
return warcprox.writerthread.WarcWriterProcessor(options)
@staticmethod
def playback_proxy(ca, options):
if options.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
else:
playback_index_db = None
playback_proxy = None
return playback_proxy
@staticmethod
def crawl_logger(options):
if options.crawl_log_dir:
return warcprox.crawl_log.CrawlLogger(
options.crawl_log_dir, options=options)
else:
return None
@staticmethod
def plugin(qualname, options):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
try: # new plugins take `options` argument
plugin = class_(options)
except: # backward-compatibility
plugin = class_()
# check that this is either a listener or a batch processor
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
return plugin
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
@staticmethod
def service_registry(options):
if options.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
return doublethink.ServiceRegistry(rr, table=parsed.table)
else:
return None
class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController")
HEARTBEAT_INTERVAL = 20.0
def __init__(self, options=warcprox.Options()):
"""
Create warcprox controller based on `options`.
"""
self.options = options
self.proxy_thread = None
self.playback_proxy_thread = None
self._last_rss = None
self.stop = threading.Event()
self._start_stop_lock = threading.Lock()
self.stats_processor = Factory.stats_processor(self.options)
self.proxy = warcprox.warcproxy.WarcProxy(
self.stats_processor, self.postfetch_status, options)
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
# default number of warc writer threads = sqrt(proxy.max_threads)
# pulled out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
if not self.options.writer_threads:
self.options.writer_threads = int(self.proxy.max_threads ** 0.5)
self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options)
def postfetch_status(self):
result = {'postfetch_chain': []}
for processor in self._postfetch_chain:
if processor.__class__ == warcprox.ListenerPostfetchProcessor:
name = processor.listener.__class__.__name__
else:
name = processor.__class__.__name__
queued = len(processor.inq.queue)
if hasattr(processor, 'batch'):
queued += len(processor.batch)
result['postfetch_chain'].append({
'processor': name,
'queued_urls': len(processor.inq.queue)})
return result
def chain(self, processor0, processor1):
'''
Sets `processor0.outq` = `processor1.inq` = `queue.Queue()`
'''
assert not processor0.outq
assert not processor1.inq
q = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
processor0.outq = q
processor1.inq = q
def build_postfetch_chain(self, inq):
self._postfetch_chain = []
self.dedup_db = Factory.dedup_db(self.options)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.loader())
self.warc_writer_processor = Factory.warc_writer_processor(self.options)
self._postfetch_chain.append(self.warc_writer_processor)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.storer())
if self.stats_processor:
self._postfetch_chain.append(self.stats_processor)
if self.playback_proxy:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.playback_proxy.playback_index_db, self.options))
crawl_logger = Factory.crawl_logger(self.options)
if crawl_logger:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
crawl_logger, self.options))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname, self.options)
if hasattr(plugin, 'notify'):
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
plugin, self.options))
else:
self._postfetch_chain.append(plugin)
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.proxy.running_stats, self.options))
# chain them all up
self._postfetch_chain[0].inq = inq
for i in range(1, len(self._postfetch_chain)):
self.chain(self._postfetch_chain[i-1], self._postfetch_chain[i])
def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
with open("/proc/self/status") as f:
for line in f:
fields = line.split()
if len(fields) >= 2:
k, v = fields[0:2]
if k == "VmHWM:":
hwm = int(v)
elif k == "VmRSS:":
rss = int(v)
elif k == "VmData:":
data = int(v)
elif k == "VmStk:":
stk = int(v)
self.logger.info("rss=%s data=%s stack=%s hwm=%s", rss, data, stk, hwm)
self._last_rss = self._last_rss or rss # to set initial value
if rss - self._last_rss > 1024:
num_unreachable = gc.collect()
all_objects = gc.get_objects()
total_size = 0
summary = {}
biggest_objects = [None] * 10
for obj in all_objects:
size = sys.getsizeof(obj)
total_size += size
if not type(obj) in summary:
summary[type(obj)] = {"count":0,"size":0}
summary[type(obj)]["count"] += 1
summary[type(obj)]["size"] += size
if size > sys.getsizeof(biggest_objects[-1]):
for i in range(len(biggest_objects)):
if size > sys.getsizeof(biggest_objects[i]):
index = i
break
biggest_objects[index+1:] = biggest_objects[index:-1]
biggest_objects[index] = obj
self.logger.info("%s objects totaling %s bytes", len(all_objects), total_size)
self.logger.info("=== biggest types ===")
for item in sorted(summary.items(), key=lambda item: item[1]["size"], reverse=True)[:10]:
self.logger.info("%s bytes in %s instances of %s", item[1]["size"], item[1]["count"], item[0])
self.logger.info("=== warcprox types ===")
for t in (t for t in summary if str(t).find("warcprox") >= 0):
self.logger.info("%s bytes in %s instances of %s", summary[t]["size"], summary[t]["count"], t)
for i in range(len(biggest_objects)):
obj = biggest_objects[i]
try:
value = repr(bytes(obj.getbuffer()[:100]))
except:
try:
value = repr(obj)[:100]
except BaseException as e:
value = "<{} getting value>".format(e)
self.logger.info("#%s (%s) (%s bytes) (%s refs) (id=%s): %s", i+1, type(obj), sys.getsizeof(obj), sys.getrefcount(obj), id(obj), value)
self.logger.info("%s unreachable objects totaling %s bytes", len(gc.garbage), sum(sys.getsizeof(x) for x in gc.garbage))
self._last_rss = rss
def _service_heartbeat(self):
if hasattr(self, 'status_info'):
status_info = self.status_info
else:
status_info = {
'role': 'warcprox',
'version': warcprox.__version__,
'ttl': self.HEARTBEAT_INTERVAL * 3,
'port': self.proxy.server_port,
}
status_info.update(self.proxy.status())
self.status_info = self.service_registry.heartbeat(status_info)
self.logger.log(
warcprox.TRACE, "status in service registry: %s",
self.status_info)
def start(self):
with self._start_stop_lock:
if self.proxy_thread and self.proxy_thread.is_alive():
self.logger.info('warcprox is already running')
return
self.proxy_thread = threading.Thread(
target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start()
if self.playback_proxy:
self.playback_proxy_thread = threading.Thread(
target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread')
self.playback_proxy_thread.start()
for processor in self._postfetch_chain:
processor.start()
def shutdown(self):
with self._start_stop_lock:
if not self.proxy_thread or not self.proxy_thread.is_alive():
self.logger.info('warcprox is not running')
return
for processor in self._postfetch_chain:
processor.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
for processor in self._postfetch_chain:
processor.join()
self.proxy_thread.join()
if self.playback_proxy is not None:
self.playback_proxy_thread.join()
if self.service_registry and hasattr(self, "status_info"):
self.service_registry.unregister(self.status_info["id"])
def run_until_shutdown(self):
"""
Start warcprox and run until shut down. Call
warcprox_controller.stop.set() to initiate graceful shutdown.
"""
self.start()
last_mem_dbg = datetime.datetime.utcfromtimestamp(0)
last_profile_dump = datetime.datetime.utcnow()
try:
utc = datetime.timezone.utc
except AttributeError:
# python2 :-\
class UTC(datetime.tzinfo):
def tzname(self, dt): return "UTC+00:00"
def dst(self, dt): return datetime.timedelta(0)
def utcoffset(self, dt): return datetime.timedelta(0)
utc = UTC()
try:
while not self.stop.is_set():
if self.proxy.running_stats:
self.proxy.running_stats.snap()
if self.service_registry and (
not hasattr(self, "status_info") or (
datetime.datetime.now(utc)
- self.status_info["last_heartbeat"]
).total_seconds() > self.HEARTBEAT_INTERVAL):
self._service_heartbeat()
# if self.options.profile and (
# datetime.datetime.utcnow() - last_mem_dbg
# ).total_seconds() > 60:
# self.debug_mem()
# last_mem_dbg = datetime.datetime.utcnow()
if (self.options.profile and
(datetime.datetime.utcnow() - last_profile_dump
).total_seconds() > 60*10):
self._dump_profiling()
last_profile_dump = datetime.datetime.utcnow()
time.sleep(0.5)
if self.options.profile:
self._dump_profiling()
except:
self.logger.critical(
"shutting down in response to fatal exception",
exc_info=True)
pass
finally:
self.shutdown()
def _dump_profiling(self):
import pstats, tempfile, os, io
with tempfile.TemporaryDirectory() as tmpdir:
# proxy threads
files = []
for th_id, profiler in self.proxy.profilers.items():
file = os.path.join(tmpdir, '%s.dat' % th_id)
profiler.dump_stats(file)
files.append(file)
buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'aggregate performance profile of %s proxy threads:\n%s',
len(files), buf.getvalue())
# postfetch processors
for processor in self._postfetch_chain:
if not processor.profiler:
self.logger.notice('%s has no profiling data', processor)
continue
file = os.path.join(tmpdir, '%s.dat' % processor.ident)
processor.profiler.dump_stats(file)
buf = io.StringIO()
stats = pstats.Stats(file, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'performance profile of %s:\n%s', processor,
buf.getvalue())