diff --git a/warcprox/__init__.py b/warcprox/__init__.py index c592eb6..7c628ad 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -19,8 +19,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ +import sys import datetime import threading +import time +import logging from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution __version__ = _get_distribution('warcprox').version @@ -28,6 +31,7 @@ try: import queue except ImportError: import Queue as queue + def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( @@ -93,15 +97,17 @@ class RequestBlockedByRule(Exception): return "%s: %s" % (self.__class__.__name__, self.msg) class BasePostfetchProcessor(threading.Thread): - def __init__(self, inq, outq, profile=False): + logger = logging.getLogger("warcprox.BasePostfetchProcessor") + + def __init__(self, inq, outq, options=Options()): threading.Thread.__init__(self, name='???') self.inq = inq self.outq = outq + self.options = options self.stop = threading.Event() - self.profile = profile def run(self): - if self.profile: + if self.options.profile: import cProfile self.profiler = cProfile.Profile() self.profiler.enable() @@ -186,9 +192,15 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): def _process_url(self, recorded_url): return self.listener.notify(recorded_url, recorded_url.warc_records) + # @classmethod + # def wrap(cls, listener, inq, outq, profile=False): + # if listener: + # return cls(listener, inq, outq, profile) + # else: + # return None + # monkey-patch log levels TRACE and NOTICE TRACE = 5 -import logging def _logger_trace(self, msg, *args, **kwargs): if self.isEnabledFor(TRACE): self._log(TRACE, msg, args, **kwargs) @@ -197,7 +209,6 @@ logging.trace = logging.root.trace logging.addLevelName(TRACE, 'TRACE') NOTICE = (logging.INFO + logging.WARN) // 2 -import logging def _logger_notice(self, msg, *args, **kwargs): if self.isEnabledFor(NOTICE): self._log(NOTICE, msg, args, **kwargs) diff --git a/warcprox/controller.py b/warcprox/controller.py index 0f0c20a..80b9c50 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -32,6 +32,7 @@ import gc import datetime import warcprox import certauth +import functools class Factory: @staticmethod @@ -60,7 +61,8 @@ class Factory: logging.info('statistics tracking disabled') stats_db = None else: - stats_db = warcprox.stats.StatsDb(options.stats_db_file, options=options) + stats_db = warcprox.stats.StatsDb( + options.stats_db_file, options=options) return stats_db # @staticmethod @@ -70,6 +72,10 @@ class Factory: # options.cacert, args.certs_dir, ca_name=ca_name) # return ca + @staticmethod + def warc_writer(inq, outq, options): + return warcprox.writerthread.WarcWriterThread(inq, outq, options) + @staticmethod def playback_proxy(options): if options.playback_port is not None: @@ -86,48 +92,32 @@ class Factory: def crawl_logger(options): if options.crawl_log_dir: return warcprox.crawl_log.CrawlLogger( - options.crawl_log_dir, options=options)) + options.crawl_log_dir, options=options) else: return None @staticmethod - def plugin(qualname, inq, outq): + def plugin(qualname): try: (module_name, class_name) = qualname.rsplit('.', 1) module_ = importlib.import_module(module_name) class_ = getattr(module_, class_name) - instance = class_() + listener = class_() plugin.notify # make sure it has this method - return instance + return plugin except Exception as e: logging.fatal('problem with plugin class %r: %s', qualname, e) sys.exit(1) - # @staticmethod - # def plugins(options): - # plugins = [] - # for qualname in options.plugins or []: - # try: - # (module_name, class_name) = qualname.rsplit('.', 1) - # module_ = importlib.import_module(module_name) - # class_ = getattr(module_, class_name) - # plugin = class_() - # plugin.notify # make sure it has this method - # plugins.append(plugin) - # except Exception as e: - # logging.fatal('problem with plugin class %r: %s', qualname, e) - # sys.exit(1) - # return plugins - - # @staticmethod - # def service_registry(options): - # if options.rethinkdb_services_url: - # parsed = doublethink.parse_rethinkdb_url( - # options.rethinkdb_services_url) - # rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) - # return doublethink.ServiceRegistry(rr, table=parsed.table) - # else: - # return None + @staticmethod + def service_registry(options): + if options.rethinkdb_services_url: + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_services_url) + rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + return doublethink.ServiceRegistry(rr, table=parsed.table) + else: + return None class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") @@ -148,65 +138,55 @@ class WarcproxController(object): self.proxy = warcprox.warcproxy.WarcProxy(options=options) - self.build_postfetch_chain(proxy.recorded_url_q) + self.build_postfetch_chain(self.proxy.recorded_url_q) - # if warc_writer_threads is not None: - # self.warc_writer_threads = warc_writer_threads - # else: - # self.warc_writer_threads = [ - # warcprox.writerthread.WarcWriterThread( - # name='WarcWriterThread%03d' % i, - # recorded_url_q=self.proxy.recorded_url_q, - # listeners=[self.proxy.running_stats], options=options) - # for i in range(int(self.proxy.max_threads ** 0.5))] - # self.playback_proxy = playback_proxy - # self.service_registry = service_registry + self.service_registry = Factory.service_registry(options) def build_postfetch_chain(self, inq): - outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + constructors = [] - def maybe_add_to_chain(processor_init): - processor = processor_init(inq, outq, self.options) - if processor: - self._postfetch_chain.append(processor) - inq = outq - outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + self.dedup_db = Factory.dedup_db(self.options) - self.dedup_db = Factory.dedup_db(options) - - # dedup loader if self.dedup_db: - maybe_add_to_chain(self.dedup_db.loader) + constructors.append(self.dedup_db.loader) - # warc writer - maybe_add_to_chain(Factory.warc_writer) + constructors.append(Factory.warc_writer) - # dedup storer if self.dedup_db: - maybe_add_to_chain(self.dedup_db.storer) + constructors.append(self.dedup_db.storer) - # playback index storer - # XXX XXX XXX FIXME - # self.playback_proxy = Factory.playback_proxy(options) - # if self.playback_proxy: - # maybe_add_to_chain() - # outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size) - # processor = self.playback_proxy.playback_index_db(inq, outq) - # self._postfetch_chain.append(processor) - # inq = outq + stats_db = Factory.stats_db(self.options) + if stats_db: + constructors.append(functools.partial( + warcprox.ListenerPostfetchProcessor, stats_db)) - # stats db - maybe_add_to_chain(Factory.stats_db) + self.playback_proxy = Factory.playback_proxy(self.options) + if self.playback_proxy: + constructors.append(functools.partial( + warcprox.ListenerPostfetchProcessor, + self.playback_proxy.playback_index_db)) - # crawl logger - maybe_add_to_chain(Factory.crawl_logger) - - for qualname in self.options.plugins: - maybe_add_to_chain( - lambda inq, outq, options: Factory.plugin(qualname, inq, outq)) - # self.plugins = Factory.plugins(options) + crawl_logger = Factory.crawl_logger(self.options) + if crawl_logger: + constructors.append(functools.partial( + warcprox.ListenerPostfetchProcessor, crawl_logger)) + for qualname in self.options.plugins or []: + plugin = Factory.plugin(qualname) + constructors.append(functools.partial( + warcprox.ListenerPostfetchProcessor, plugin)) + self._postfetch_chain = [] + for i, constructor in enumerate(constructors): + if i != len(constructors) - 1: + outq = warcprox.TimestampedQueue( + maxsize=self.options.queue_size) + else: + outq = None + processor = constructor(inq, outq, self.options) + self._postfetch_chain.append(processor) + inq = outq + def debug_mem(self): self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) with open("/proc/self/status") as f: @@ -293,26 +273,14 @@ class WarcproxController(object): self.logger.info('warcprox is already running') return - if self.proxy.stats_db: - self.proxy.stats_db.start() self.proxy_thread = threading.Thread( target=self.proxy.serve_forever, name='ProxyThread') self.proxy_thread.start() - assert(all( - wwt.dedup_db is self.warc_writer_threads[0].dedup_db - for wwt in self.warc_writer_threads)) - if any((t.dedup_db for t in self.warc_writer_threads)): - self.warc_writer_threads[0].dedup_db.start() - - for wwt in self.warc_writer_threads: - wwt.start() - - if self.playback_proxy is not None: - self.playback_proxy_thread = threading.Thread( - target=self.playback_proxy.serve_forever, - name='PlaybackProxyThread') - self.playback_proxy_thread.start() + for processor in self._postfetch_chain: + # logging.info('starting postfetch processor %r', processor) + processor.start() + logging.info('started postfetch processor %r', processor) def shutdown(self): with self._start_stop_lock: @@ -320,30 +288,34 @@ class WarcproxController(object): self.logger.info('warcprox is not running') return - for wwt in self.warc_writer_threads: - wwt.stop.set() + # for wwt in self.warc_writer_threads: + # wwt.stop.set() + for processor in self._postfetch_chain: + processor.stop.set() self.proxy.shutdown() self.proxy.server_close() - if self.playback_proxy is not None: - self.playback_proxy.shutdown() - self.playback_proxy.server_close() - if self.playback_proxy.playback_index_db is not None: - self.playback_proxy.playback_index_db.close() + for processor in self._postfetch_chain: + processor.join() + # if self.playback_proxy is not None: + # self.playback_proxy.shutdown() + # self.playback_proxy.server_close() + # if self.playback_proxy.playback_index_db is not None: + # self.playback_proxy.playback_index_db.close() - # wait for threads to finish - for wwt in self.warc_writer_threads: - wwt.join() + # # wait for threads to finish + # for wwt in self.warc_writer_threads: + # wwt.join() - if self.proxy.stats_db: - self.proxy.stats_db.stop() + # if self.proxy.stats_db: + # self.proxy.stats_db.stop() - self.proxy_thread.join() - if self.playback_proxy is not None: - self.playback_proxy_thread.join() + # self.proxy_thread.join() + # if self.playback_proxy is not None: + # self.playback_proxy_thread.join() - if self.service_registry and hasattr(self, "status_info"): - self.service_registry.unregister(self.status_info["id"]) + # if self.service_registry and hasattr(self, "status_info"): + # self.service_registry.unregister(self.status_info["id"]) def run_until_shutdown(self): """ diff --git a/warcprox/dedup.py b/warcprox/dedup.py index f21e1df..f62eccb 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -35,6 +35,14 @@ from urllib3.exceptions import HTTPError urllib3.disable_warnings() +class DedupLoader(warcprox.BaseStandardPostfetchProcessor): + def __init__(self, dedup_db, inq, outq, base32=False, profile=False): + warcprox.BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile) + self.dedup_db = dedup_db + self.base32 = base32 + def _process_url(self, recorded_url): + decorate_with_dedup_info(self.dedup_db, recorded_url, self.base32) + class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -61,6 +69,12 @@ class DedupDb(object): conn.commit() conn.close() + def loader(self, inq, outq, profile=False): + return DedupLoader(self, inq, outq, self.options.base32, profile) + + def storer(self, inq, outq, profile=False): + return warcprox.ListenerPostfetchProcessor(self, inq, outq, profile) + def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') @@ -106,20 +120,20 @@ class DedupDb(object): else: self.save(digest_key, records[0]) - def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if (recorded_url.response_recorder and recorded_url.payload_digest and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) + recorded_url.dedup_info = dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, - url=recorded_url.url) + recorded_url.dedup_info = dedup_db.lookup( + digest_key, url=recorded_url.url) -class RethinkDedupDb: +class RethinkDedupDb(DedupDb): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): @@ -181,7 +195,7 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) -class CdxServerDedup(object): +class CdxServerDedup(DedupDb): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") @@ -244,7 +258,7 @@ class CdxServerDedup(object): """ pass -class TroughDedupDb(object): +class TroughDedupDb(DedupDb): ''' https://github.com/internetarchive/trough ''' diff --git a/warcprox/main.py b/warcprox/main.py index d17e2be..785c029 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -198,12 +198,12 @@ def dump_state(signum=None, frame=None): 'dumping state (caught signal %s)\n%s', signum, '\n'.join(state_strs)) -def init_controller(args): +def parse_args(argv): ''' - Creates a warcprox.controller.WarcproxController configured according to - the supplied arguments (normally the result of parse_args(sys.argv)). + Parses command line arguments with argparse. ''' - options = warcprox.Options(**vars(args)) + arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) + args = arg_parser.parse_args(args=argv[1:]) try: hashlib.new(args.digest_algorithm) @@ -211,19 +211,6 @@ def init_controller(args): logging.fatal(e) exit(1) - - controller = warcprox.controller.WarcproxController( - proxy, warc_writer_threads, playback_proxy, - service_registry=svcreg, options=options) - - return controller - -def parse_args(argv): - ''' - Parses command line arguments with argparse. - ''' - arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) - args = arg_parser.parse_args(args=argv[1:]) return args def main(argv=None): @@ -249,7 +236,8 @@ def main(argv=None): # see https://github.com/pyca/cryptography/issues/2911 cryptography.hazmat.backends.openssl.backend.activate_builtin_random() - controller = init_controller(args) + options = warcprox.Options(**vars(args)) + controller = warcprox.controller.WarcproxController(options) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 7ee9159..ae64484 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -2,7 +2,7 @@ warcprox/writerthread.py - warc writer thread, reads from the recorded url queue, writes warc records, runs final tasks after warc records are written -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -28,44 +28,34 @@ except ImportError: import Queue as queue import logging -import threading import time -from datetime import datetime -from hanzo import warctools import warcprox -import sys -class WarcWriterThread(threading.Thread): - logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") +class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): + logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") - def __init__( - self, recorded_url_q, name='WarcWriterThread', writer_pool=None, - dedup_db=None, listeners=[], options=warcprox.Options()): - """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" - threading.Thread.__init__(self, name=name) - self.recorded_url_q = recorded_url_q - self.stop = threading.Event() - if writer_pool: - self.writer_pool = writer_pool - else: - self.writer_pool = warcprox.writer.WarcWriterPool() - self.dedup_db = dedup_db - self.listeners = listeners + def __init__(self, inq, outq, options=warcprox.Options()): + warcprox.BaseStandardPostfetchProcessor.__init__( + self, inq, outq, options=options) self.options = options - self.idle = None + self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) - def run(self): - if self.options.profile: - import cProfile - self.profiler = cProfile.Profile() - self.profiler.enable() - self._run() - self.profiler.disable() - else: - self._run() + def _get_process_put(self): + try: + warcprox.BaseStandardPostfetchProcessor._get_process_put(self) + finally: + self.writer_pool.maybe_idle_rollover() + + def _process_url(self, recorded_url): + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() - _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} def _filter_accepts(self, recorded_url): if not self.method_filter: return True @@ -81,68 +71,9 @@ class WarcWriterThread(threading.Thread): # special warc name prefix '-' means "don't archive" return prefix != '-' and self._filter_accepts(recorded_url) - def _run(self): - self.name = '%s(tid=%s)'% (self.name, warcprox.gettid()) - while not self.stop.is_set(): - try: - while True: - try: - if self.stop.is_set(): - qsize = self.recorded_url_q.qsize() - if qsize % 50 == 0: - self.logger.info("%s urls left to write", qsize) - - recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - records = [] - self.idle = None - if self._should_archive(recorded_url): - if self.dedup_db: - warcprox.dedup.decorate_with_dedup_info(self.dedup_db, - recorded_url, base32=self.options.base32) - records = self.writer_pool.write_records(recorded_url) - - self._final_tasks(recorded_url, records) - - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() - except queue.Empty: - if self.stop.is_set(): - break - self.idle = time.time() - finally: - self.writer_pool.maybe_idle_rollover() - - self.logger.info('WarcWriterThread shutting down') - self._shutdown() - except Exception as e: - if isinstance(e, OSError) and e.errno == 28: - # OSError: [Errno 28] No space left on device - self.logger.critical( - 'shutting down due to fatal problem: %s: %s', - e.__class__.__name__, e) - self._shutdown() - sys.exit(1) - - self.logger.critical( - 'WarcWriterThread will try to continue after unexpected ' - 'error', exc_info=True) - time.sleep(0.5) - - def _shutdown(self): - self.writer_pool.close_writers() - for listener in self.listeners: - if hasattr(listener, 'stop'): - try: - listener.stop() - except: - self.logger.error( - '%s raised exception', listener.stop, exc_info=True) - - # closest thing we have to heritrix crawl log at the moment def _log(self, recorded_url, records): try: - payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8") + payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8") except: payload_digest = "-" @@ -156,13 +87,3 @@ class WarcWriterThread(threading.Thread): recorded_url.method, recorded_url.url.decode("utf-8"), recorded_url.mimetype, recorded_url.size, payload_digest, type_, filename, offset) - - def _final_tasks(self, recorded_url, records): - if self.listeners: - for listener in self.listeners: - try: - listener.notify(recorded_url, records) - except: - self.logger.error('%s raised exception', - listener.notify, exc_info=True) - self._log(recorded_url, records)