diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 256ede9..c592eb6 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,7 +1,7 @@ """ warcprox/__init__.py - warcprox package main file, contains some utility code -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -19,6 +19,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ +import datetime +import threading from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution __version__ = _get_distribution('warcprox').version @@ -26,8 +28,6 @@ try: import queue except ImportError: import Queue as queue -import datetime - def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( @@ -92,6 +92,100 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) +class BasePostfetchProcessor(threading.Thread): + def __init__(self, inq, outq, profile=False): + threading.Thread.__init__(self, name='???') + self.inq = inq + self.outq = outq + self.stop = threading.Event() + self.profile = profile + + def run(self): + if self.profile: + import cProfile + self.profiler = cProfile.Profile() + self.profiler.enable() + self._run() + self.profiler.disable() + else: + self._run() + + def _get_process_put(self): + ''' + Get url(s) from `self.inq`, process url(s), queue to `self.outq`. + + Subclasses must implement this. + + May raise queue.Empty. + ''' + raise Exception('not implemented') + + def _run(self): + while not self.stop.is_set(): + try: + while True: + try: + self._get_process_put() + except queue.Empty: + if self.stop.is_set(): + break + self._shutdown() + except Exception as e: + if isinstance(e, OSError) and e.errno == 28: + # OSError: [Errno 28] No space left on device + self.logger.critical( + 'shutting down due to fatal problem: %s: %s', + e.__class__.__name__, e) + self._shutdown() + sys.exit(1) + + self.logger.critical( + '%s will try to continue after unexpected error', + self.name, exc_info=True) + time.sleep(0.5) + + def _shutdown(self): + pass + +class BaseStandardPostfetchProcessor(BasePostfetchProcessor): + def _get_process_put(self): + recorded_url = self.inq.get(block=True, timeout=0.5) + self._process_url(recorded_url) + if self.outq: + self.outq.put(recorded_url) + + def _process_url(self, recorded_url): + raise Exception('not implemented') + +class BaseBatchPostfetchProcessor(BasePostfetchProcessor): + MAX_BATCH_SIZE = 500 + + def _get_process_put(self): + batch = [] + batch.append(self.inq.get(block=True, timeout=0.5)) + try: + while len(batch) < self.MAX_BATCH_SIZE: + batch.append(self.inq.get(block=False)) + except queue.Empty: + pass + + self._process_batch(batch) + + if self.outq: + for recorded_url in batch: + self.outq.put(recorded_url) + + def _process_batch(self, batch): + raise Exception('not implemented') + +class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): + def __init__(self, listener, inq, outq, profile=False): + BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile) + self.listener = listener + + def _process_url(self, recorded_url): + return self.listener.notify(recorded_url, recorded_url.warc_records) + # monkey-patch log levels TRACE and NOTICE TRACE = 5 import logging diff --git a/warcprox/controller.py b/warcprox/controller.py index 0bf8a4f..0f0c20a 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for sending heartbeats to the service registry if configured to do so; also has some memory profiling capabilities -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -27,55 +27,186 @@ from __future__ import absolute_import import logging import threading import time -import warcprox import sys import gc import datetime +import warcprox +import certauth + +class Factory: + @staticmethod + def dedup_db(options): + if options.rethinkdb_dedup_url: + dedup_db = warcprox.dedup.RethinkDedupDb(options=options) + elif options.rethinkdb_big_table_url: + dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) + elif options.rethinkdb_trough_db_url: + dedup_db = warcprox.dedup.TroughDedupDb(options) + elif options.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup( + cdx_url=options.cdxserver_dedup) + elif options.dedup_db_file in (None, '', '/dev/null'): + logging.info('deduplication disabled') + dedup_db = None + else: + dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options) + return dedup_db + + @staticmethod + def stats_db(options): + if options.rethinkdb_stats_url: + stats_db = warcprox.stats.RethinkStatsDb(options=options) + elif options.stats_db_file in (None, '', '/dev/null'): + logging.info('statistics tracking disabled') + stats_db = None + else: + stats_db = warcprox.stats.StatsDb(options.stats_db_file, options=options) + return stats_db + + # @staticmethod + # def certauth(options): + # ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] + # ca = certauth.certauth.CertificateAuthority( + # options.cacert, args.certs_dir, ca_name=ca_name) + # return ca + + @staticmethod + def playback_proxy(options): + if options.playback_port is not None: + playback_index_db = warcprox.playback.PlaybackIndexDb( + options.playback_index_db_file, options=options) + playback_proxy = warcprox.playback.PlaybackProxy( + ca=ca, playback_index_db=playback_index_db, options=options) + else: + playback_index_db = None + playback_proxy = None + return playback_proxy + + @staticmethod + def crawl_logger(options): + if options.crawl_log_dir: + return warcprox.crawl_log.CrawlLogger( + options.crawl_log_dir, options=options)) + else: + return None + + @staticmethod + def plugin(qualname, inq, outq): + try: + (module_name, class_name) = qualname.rsplit('.', 1) + module_ = importlib.import_module(module_name) + class_ = getattr(module_, class_name) + instance = class_() + plugin.notify # make sure it has this method + return instance + except Exception as e: + logging.fatal('problem with plugin class %r: %s', qualname, e) + sys.exit(1) + + # @staticmethod + # def plugins(options): + # plugins = [] + # for qualname in options.plugins or []: + # try: + # (module_name, class_name) = qualname.rsplit('.', 1) + # module_ = importlib.import_module(module_name) + # class_ = getattr(module_, class_name) + # plugin = class_() + # plugin.notify # make sure it has this method + # plugins.append(plugin) + # except Exception as e: + # logging.fatal('problem with plugin class %r: %s', qualname, e) + # sys.exit(1) + # return plugins + + # @staticmethod + # def service_registry(options): + # if options.rethinkdb_services_url: + # parsed = doublethink.parse_rethinkdb_url( + # options.rethinkdb_services_url) + # rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + # return doublethink.ServiceRegistry(rr, table=parsed.table) + # else: + # return None class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") HEARTBEAT_INTERVAL = 20.0 - def __init__( - self, proxy=None, warc_writer_threads=None, playback_proxy=None, - service_registry=None, options=warcprox.Options()): + def __init__(self, options=warcprox.Options()): """ - Create warcprox controller. - - If supplied, `proxy` should be an instance of WarcProxy, and - `warc_writer_threads` should be a list of WarcWriterThread instances. - If not supplied, they are created with default values. - - If supplied, playback_proxy should be an instance of PlaybackProxy. If - not supplied, no playback proxy will run. + Create warcprox controller based on `options`. """ - if proxy is not None: - self.proxy = proxy - else: - self.proxy = warcprox.warcproxy.WarcProxy(options=options) - - if warc_writer_threads is not None: - self.warc_writer_threads = warc_writer_threads - else: - self.warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - name='WarcWriterThread%03d' % i, - recorded_url_q=self.proxy.recorded_url_q, - listeners=[self.proxy.running_stats], options=options) - for i in range(int(self.proxy.max_threads ** 0.5))] + self.options = options self.proxy_thread = None self.playback_proxy_thread = None - self.playback_proxy = playback_proxy - self.service_registry = service_registry - self.options = options - self._last_rss = None - self.stop = threading.Event() self._start_stop_lock = threading.Lock() + self.proxy = warcprox.warcproxy.WarcProxy(options=options) + + self.build_postfetch_chain(proxy.recorded_url_q) + + # if warc_writer_threads is not None: + # self.warc_writer_threads = warc_writer_threads + # else: + # self.warc_writer_threads = [ + # warcprox.writerthread.WarcWriterThread( + # name='WarcWriterThread%03d' % i, + # recorded_url_q=self.proxy.recorded_url_q, + # listeners=[self.proxy.running_stats], options=options) + # for i in range(int(self.proxy.max_threads ** 0.5))] + # self.playback_proxy = playback_proxy + # self.service_registry = service_registry + + def build_postfetch_chain(self, inq): + outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + + def maybe_add_to_chain(processor_init): + processor = processor_init(inq, outq, self.options) + if processor: + self._postfetch_chain.append(processor) + inq = outq + outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + + self.dedup_db = Factory.dedup_db(options) + + # dedup loader + if self.dedup_db: + maybe_add_to_chain(self.dedup_db.loader) + + # warc writer + maybe_add_to_chain(Factory.warc_writer) + + # dedup storer + if self.dedup_db: + maybe_add_to_chain(self.dedup_db.storer) + + # playback index storer + # XXX XXX XXX FIXME + # self.playback_proxy = Factory.playback_proxy(options) + # if self.playback_proxy: + # maybe_add_to_chain() + # outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + # processor = self.playback_proxy.playback_index_db(inq, outq) + # self._postfetch_chain.append(processor) + # inq = outq + + # stats db + maybe_add_to_chain(Factory.stats_db) + + # crawl logger + maybe_add_to_chain(Factory.crawl_logger) + + for qualname in self.options.plugins: + maybe_add_to_chain( + lambda inq, outq, options: Factory.plugin(qualname, inq, outq)) + # self.plugins = Factory.plugins(options) + + def debug_mem(self): self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) with open("/proc/self/status") as f: diff --git a/warcprox/main.py b/warcprox/main.py index 348dfbf..d17e2be 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -211,92 +211,6 @@ def init_controller(args): logging.fatal(e) exit(1) - listeners = [] - running_stats = warcprox.stats.RunningStats() - listeners.append(running_stats) - - if args.rethinkdb_dedup_url: - dedup_db = warcprox.dedup.RethinkDedupDb(options=options) - elif args.rethinkdb_big_table_url: - dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) - elif args.rethinkdb_trough_db_url: - dedup_db = warcprox.dedup.TroughDedupDb(options) - elif args.cdxserver_dedup: - dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) - elif args.dedup_db_file in (None, '', '/dev/null'): - logging.info('deduplication disabled') - dedup_db = None - else: - dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) - if dedup_db: - listeners.append(dedup_db) - - if args.rethinkdb_stats_url: - stats_db = warcprox.stats.RethinkStatsDb(options=options) - listeners.append(stats_db) - elif args.stats_db_file in (None, '', '/dev/null'): - logging.info('statistics tracking disabled') - stats_db = None - else: - stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) - listeners.append(stats_db) - - recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size) - - ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, - ca_name=ca_name) - - proxy = warcprox.warcproxy.WarcProxy( - ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db, - running_stats=running_stats, options=options) - - if args.playback_port is not None: - playback_index_db = warcprox.playback.PlaybackIndexDb( - args.playback_index_db_file, options=options) - playback_proxy = warcprox.playback.PlaybackProxy( - ca=ca, playback_index_db=playback_index_db, options=options) - listeners.append(playback_index_db) - else: - playback_index_db = None - playback_proxy = None - - if args.crawl_log_dir: - listeners.append(warcprox.crawl_log.CrawlLogger( - args.crawl_log_dir, options=options)) - - for qualname in args.plugins or []: - try: - (module_name, class_name) = qualname.rsplit('.', 1) - module_ = importlib.import_module(module_name) - class_ = getattr(module_, class_name) - listener = class_() - listener.notify # make sure it has this method - listeners.append(listener) - except Exception as e: - logging.fatal('problem with plugin class %r: %s', qualname, e) - sys.exit(1) - - writer_pool = warcprox.writer.WarcWriterPool(options=options) - # number of warc writer threads = sqrt(proxy.max_threads) - # I came up with this out of thin air because it strikes me as reasonable - # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 - num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5) - logging.debug('initializing %d warc writer threads', num_writer_threads) - warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, - writer_pool=writer_pool, dedup_db=dedup_db, - listeners=listeners, options=options) - for i in range(num_writer_threads)] - - if args.rethinkdb_services_url: - parsed = doublethink.parse_rethinkdb_url( - options.rethinkdb_services_url) - rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) - svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) - else: - svcreg = None controller = warcprox.controller.WarcproxController( proxy, warc_writer_threads, playback_proxy, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 6f7dd34..60f79d3 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -2,7 +2,7 @@ warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic, enqueue info on the recorded url queue -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License