very incomplete work on new postfetch processor chain

This commit is contained in:
Noah Levitt 2018-01-12 11:39:53 -08:00
parent c459812c93
commit c715eaba4e
4 changed files with 261 additions and 122 deletions

View File

@ -1,7 +1,7 @@
""" """
warcprox/__init__.py - warcprox package main file, contains some utility code warcprox/__init__.py - warcprox package main file, contains some utility code
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -19,6 +19,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA. USA.
""" """
import datetime
import threading
from argparse import Namespace as _Namespace from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version __version__ = _get_distribution('warcprox').version
@ -26,8 +28,6 @@ try:
import queue import queue
except ImportError: except ImportError:
import Queue as queue import Queue as queue
import datetime
def digest_str(hash_obj, base32=False): def digest_str(hash_obj, base32=False):
import base64 import base64
return hash_obj.name.encode('utf-8') + b':' + ( return hash_obj.name.encode('utf-8') + b':' + (
@ -92,6 +92,100 @@ class RequestBlockedByRule(Exception):
def __str__(self): def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg) return "%s: %s" % (self.__class__.__name__, self.msg)
class BasePostfetchProcessor(threading.Thread):
def __init__(self, inq, outq, profile=False):
threading.Thread.__init__(self, name='???')
self.inq = inq
self.outq = outq
self.stop = threading.Event()
self.profile = profile
def run(self):
if self.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
def _get_process_put(self):
'''
Get url(s) from `self.inq`, process url(s), queue to `self.outq`.
Subclasses must implement this.
May raise queue.Empty.
'''
raise Exception('not implemented')
def _run(self):
while not self.stop.is_set():
try:
while True:
try:
self._get_process_put()
except queue.Empty:
if self.stop.is_set():
break
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'%s will try to continue after unexpected error',
self.name, exc_info=True)
time.sleep(0.5)
def _shutdown(self):
pass
class BaseStandardPostfetchProcessor(BasePostfetchProcessor):
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self._process_url(recorded_url)
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url):
raise Exception('not implemented')
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
MAX_BATCH_SIZE = 500
def _get_process_put(self):
batch = []
batch.append(self.inq.get(block=True, timeout=0.5))
try:
while len(batch) < self.MAX_BATCH_SIZE:
batch.append(self.inq.get(block=False))
except queue.Empty:
pass
self._process_batch(batch)
if self.outq:
for recorded_url in batch:
self.outq.put(recorded_url)
def _process_batch(self, batch):
raise Exception('not implemented')
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, inq, outq, profile=False):
BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile)
self.listener = listener
def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records)
# monkey-patch log levels TRACE and NOTICE # monkey-patch log levels TRACE and NOTICE
TRACE = 5 TRACE = 5
import logging import logging

View File

@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities some memory profiling capabilities
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -27,55 +27,186 @@ from __future__ import absolute_import
import logging import logging
import threading import threading
import time import time
import warcprox
import sys import sys
import gc import gc
import datetime import datetime
import warcprox
import certauth
class Factory:
@staticmethod
def dedup_db(options):
if options.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif options.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif options.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif options.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(
cdx_url=options.cdxserver_dedup)
elif options.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options)
return dedup_db
@staticmethod
def stats_db(options):
if options.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
elif options.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_db = None
else:
stats_db = warcprox.stats.StatsDb(options.stats_db_file, options=options)
return stats_db
# @staticmethod
# def certauth(options):
# ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
# ca = certauth.certauth.CertificateAuthority(
# options.cacert, args.certs_dir, ca_name=ca_name)
# return ca
@staticmethod
def playback_proxy(options):
if options.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
options.playback_index_db_file, options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
else:
playback_index_db = None
playback_proxy = None
return playback_proxy
@staticmethod
def crawl_logger(options):
if options.crawl_log_dir:
return warcprox.crawl_log.CrawlLogger(
options.crawl_log_dir, options=options))
else:
return None
@staticmethod
def plugin(qualname, inq, outq):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
instance = class_()
plugin.notify # make sure it has this method
return instance
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
# @staticmethod
# def plugins(options):
# plugins = []
# for qualname in options.plugins or []:
# try:
# (module_name, class_name) = qualname.rsplit('.', 1)
# module_ = importlib.import_module(module_name)
# class_ = getattr(module_, class_name)
# plugin = class_()
# plugin.notify # make sure it has this method
# plugins.append(plugin)
# except Exception as e:
# logging.fatal('problem with plugin class %r: %s', qualname, e)
# sys.exit(1)
# return plugins
# @staticmethod
# def service_registry(options):
# if options.rethinkdb_services_url:
# parsed = doublethink.parse_rethinkdb_url(
# options.rethinkdb_services_url)
# rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
# return doublethink.ServiceRegistry(rr, table=parsed.table)
# else:
# return None
class WarcproxController(object): class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController") logger = logging.getLogger("warcprox.controller.WarcproxController")
HEARTBEAT_INTERVAL = 20.0 HEARTBEAT_INTERVAL = 20.0
def __init__( def __init__(self, options=warcprox.Options()):
self, proxy=None, warc_writer_threads=None, playback_proxy=None,
service_registry=None, options=warcprox.Options()):
""" """
Create warcprox controller. Create warcprox controller based on `options`.
If supplied, `proxy` should be an instance of WarcProxy, and
`warc_writer_threads` should be a list of WarcWriterThread instances.
If not supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If
not supplied, no playback proxy will run.
""" """
if proxy is not None: self.options = options
self.proxy = proxy
else:
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
if warc_writer_threads is not None:
self.warc_writer_threads = warc_writer_threads
else:
self.warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i,
recorded_url_q=self.proxy.recorded_url_q,
listeners=[self.proxy.running_stats], options=options)
for i in range(int(self.proxy.max_threads ** 0.5))]
self.proxy_thread = None self.proxy_thread = None
self.playback_proxy_thread = None self.playback_proxy_thread = None
self.playback_proxy = playback_proxy
self.service_registry = service_registry
self.options = options
self._last_rss = None self._last_rss = None
self.stop = threading.Event() self.stop = threading.Event()
self._start_stop_lock = threading.Lock() self._start_stop_lock = threading.Lock()
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
self.build_postfetch_chain(proxy.recorded_url_q)
# if warc_writer_threads is not None:
# self.warc_writer_threads = warc_writer_threads
# else:
# self.warc_writer_threads = [
# warcprox.writerthread.WarcWriterThread(
# name='WarcWriterThread%03d' % i,
# recorded_url_q=self.proxy.recorded_url_q,
# listeners=[self.proxy.running_stats], options=options)
# for i in range(int(self.proxy.max_threads ** 0.5))]
# self.playback_proxy = playback_proxy
# self.service_registry = service_registry
def build_postfetch_chain(self, inq):
outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
def maybe_add_to_chain(processor_init):
processor = processor_init(inq, outq, self.options)
if processor:
self._postfetch_chain.append(processor)
inq = outq
outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
self.dedup_db = Factory.dedup_db(options)
# dedup loader
if self.dedup_db:
maybe_add_to_chain(self.dedup_db.loader)
# warc writer
maybe_add_to_chain(Factory.warc_writer)
# dedup storer
if self.dedup_db:
maybe_add_to_chain(self.dedup_db.storer)
# playback index storer
# XXX XXX XXX FIXME
# self.playback_proxy = Factory.playback_proxy(options)
# if self.playback_proxy:
# maybe_add_to_chain()
# outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
# processor = self.playback_proxy.playback_index_db(inq, outq)
# self._postfetch_chain.append(processor)
# inq = outq
# stats db
maybe_add_to_chain(Factory.stats_db)
# crawl logger
maybe_add_to_chain(Factory.crawl_logger)
for qualname in self.options.plugins:
maybe_add_to_chain(
lambda inq, outq, options: Factory.plugin(qualname, inq, outq))
# self.plugins = Factory.plugins(options)
def debug_mem(self): def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
with open("/proc/self/status") as f: with open("/proc/self/status") as f:

View File

@ -211,92 +211,6 @@ def init_controller(args):
logging.fatal(e) logging.fatal(e)
exit(1) exit(1)
listeners = []
running_stats = warcprox.stats.RunningStats()
listeners.append(running_stats)
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db)
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_db = None
else:
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db)
recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size)
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy(
ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db,
running_stats=running_stats, options=options)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
args.playback_index_db_file, options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
listeners.append(playback_index_db)
else:
playback_index_db = None
playback_proxy = None
if args.crawl_log_dir:
listeners.append(warcprox.crawl_log.CrawlLogger(
args.crawl_log_dir, options=options))
for qualname in args.plugins or []:
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
listener = class_()
listener.notify # make sure it has this method
listeners.append(listener)
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
writer_pool = warcprox.writer.WarcWriterPool(options=options)
# number of warc writer threads = sqrt(proxy.max_threads)
# I came up with this out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5)
logging.debug('initializing %d warc writer threads', num_writer_threads)
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
writer_pool=writer_pool, dedup_db=dedup_db,
listeners=listeners, options=options)
for i in range(num_writer_threads)]
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else:
svcreg = None
controller = warcprox.controller.WarcproxController( controller = warcprox.controller.WarcproxController(
proxy, warc_writer_threads, playback_proxy, proxy, warc_writer_threads, playback_proxy,

View File

@ -2,7 +2,7 @@
warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic, warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic,
enqueue info on the recorded url queue enqueue info on the recorded url queue
Copyright (C) 2013-2016 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License