slightly less incomplete work on new postfetch processor chain

This commit is contained in:
Noah Levitt 2018-01-12 14:58:26 -08:00
parent c715eaba4e
commit bd25991a0d
5 changed files with 146 additions and 240 deletions

View File

@ -19,8 +19,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
"""
import sys
import datetime
import threading
import time
import logging
from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version
@ -28,6 +31,7 @@ try:
import queue
except ImportError:
import Queue as queue
def digest_str(hash_obj, base32=False):
import base64
return hash_obj.name.encode('utf-8') + b':' + (
@ -93,15 +97,17 @@ class RequestBlockedByRule(Exception):
return "%s: %s" % (self.__class__.__name__, self.msg)
class BasePostfetchProcessor(threading.Thread):
def __init__(self, inq, outq, profile=False):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, inq, outq, options=Options()):
threading.Thread.__init__(self, name='???')
self.inq = inq
self.outq = outq
self.options = options
self.stop = threading.Event()
self.profile = profile
def run(self):
if self.profile:
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
@ -186,9 +192,15 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records)
# @classmethod
# def wrap(cls, listener, inq, outq, profile=False):
# if listener:
# return cls(listener, inq, outq, profile)
# else:
# return None
# monkey-patch log levels TRACE and NOTICE
TRACE = 5
import logging
def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
@ -197,7 +209,6 @@ logging.trace = logging.root.trace
logging.addLevelName(TRACE, 'TRACE')
NOTICE = (logging.INFO + logging.WARN) // 2
import logging
def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs)

View File

@ -32,6 +32,7 @@ import gc
import datetime
import warcprox
import certauth
import functools
class Factory:
@staticmethod
@ -60,7 +61,8 @@ class Factory:
logging.info('statistics tracking disabled')
stats_db = None
else:
stats_db = warcprox.stats.StatsDb(options.stats_db_file, options=options)
stats_db = warcprox.stats.StatsDb(
options.stats_db_file, options=options)
return stats_db
# @staticmethod
@ -70,6 +72,10 @@ class Factory:
# options.cacert, args.certs_dir, ca_name=ca_name)
# return ca
@staticmethod
def warc_writer(inq, outq, options):
return warcprox.writerthread.WarcWriterThread(inq, outq, options)
@staticmethod
def playback_proxy(options):
if options.playback_port is not None:
@ -86,48 +92,32 @@ class Factory:
def crawl_logger(options):
if options.crawl_log_dir:
return warcprox.crawl_log.CrawlLogger(
options.crawl_log_dir, options=options))
options.crawl_log_dir, options=options)
else:
return None
@staticmethod
def plugin(qualname, inq, outq):
def plugin(qualname):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
instance = class_()
listener = class_()
plugin.notify # make sure it has this method
return instance
return plugin
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
# @staticmethod
# def plugins(options):
# plugins = []
# for qualname in options.plugins or []:
# try:
# (module_name, class_name) = qualname.rsplit('.', 1)
# module_ = importlib.import_module(module_name)
# class_ = getattr(module_, class_name)
# plugin = class_()
# plugin.notify # make sure it has this method
# plugins.append(plugin)
# except Exception as e:
# logging.fatal('problem with plugin class %r: %s', qualname, e)
# sys.exit(1)
# return plugins
# @staticmethod
# def service_registry(options):
# if options.rethinkdb_services_url:
# parsed = doublethink.parse_rethinkdb_url(
# options.rethinkdb_services_url)
# rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
# return doublethink.ServiceRegistry(rr, table=parsed.table)
# else:
# return None
@staticmethod
def service_registry(options):
if options.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
return doublethink.ServiceRegistry(rr, table=parsed.table)
else:
return None
class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController")
@ -148,65 +138,55 @@ class WarcproxController(object):
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
self.build_postfetch_chain(proxy.recorded_url_q)
self.build_postfetch_chain(self.proxy.recorded_url_q)
# if warc_writer_threads is not None:
# self.warc_writer_threads = warc_writer_threads
# else:
# self.warc_writer_threads = [
# warcprox.writerthread.WarcWriterThread(
# name='WarcWriterThread%03d' % i,
# recorded_url_q=self.proxy.recorded_url_q,
# listeners=[self.proxy.running_stats], options=options)
# for i in range(int(self.proxy.max_threads ** 0.5))]
# self.playback_proxy = playback_proxy
# self.service_registry = service_registry
self.service_registry = Factory.service_registry(options)
def build_postfetch_chain(self, inq):
outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
constructors = []
def maybe_add_to_chain(processor_init):
processor = processor_init(inq, outq, self.options)
if processor:
self._postfetch_chain.append(processor)
inq = outq
outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
self.dedup_db = Factory.dedup_db(self.options)
self.dedup_db = Factory.dedup_db(options)
# dedup loader
if self.dedup_db:
maybe_add_to_chain(self.dedup_db.loader)
constructors.append(self.dedup_db.loader)
# warc writer
maybe_add_to_chain(Factory.warc_writer)
constructors.append(Factory.warc_writer)
# dedup storer
if self.dedup_db:
maybe_add_to_chain(self.dedup_db.storer)
constructors.append(self.dedup_db.storer)
# playback index storer
# XXX XXX XXX FIXME
# self.playback_proxy = Factory.playback_proxy(options)
# if self.playback_proxy:
# maybe_add_to_chain()
# outq = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
# processor = self.playback_proxy.playback_index_db(inq, outq)
# self._postfetch_chain.append(processor)
# inq = outq
stats_db = Factory.stats_db(self.options)
if stats_db:
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor, stats_db))
# stats db
maybe_add_to_chain(Factory.stats_db)
self.playback_proxy = Factory.playback_proxy(self.options)
if self.playback_proxy:
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor,
self.playback_proxy.playback_index_db))
# crawl logger
maybe_add_to_chain(Factory.crawl_logger)
for qualname in self.options.plugins:
maybe_add_to_chain(
lambda inq, outq, options: Factory.plugin(qualname, inq, outq))
# self.plugins = Factory.plugins(options)
crawl_logger = Factory.crawl_logger(self.options)
if crawl_logger:
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor, crawl_logger))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname)
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor, plugin))
self._postfetch_chain = []
for i, constructor in enumerate(constructors):
if i != len(constructors) - 1:
outq = warcprox.TimestampedQueue(
maxsize=self.options.queue_size)
else:
outq = None
processor = constructor(inq, outq, self.options)
self._postfetch_chain.append(processor)
inq = outq
def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
with open("/proc/self/status") as f:
@ -293,26 +273,14 @@ class WarcproxController(object):
self.logger.info('warcprox is already running')
return
if self.proxy.stats_db:
self.proxy.stats_db.start()
self.proxy_thread = threading.Thread(
target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start()
assert(all(
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
for wwt in self.warc_writer_threads))
if any((t.dedup_db for t in self.warc_writer_threads)):
self.warc_writer_threads[0].dedup_db.start()
for wwt in self.warc_writer_threads:
wwt.start()
if self.playback_proxy is not None:
self.playback_proxy_thread = threading.Thread(
target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread')
self.playback_proxy_thread.start()
for processor in self._postfetch_chain:
# logging.info('starting postfetch processor %r', processor)
processor.start()
logging.info('started postfetch processor %r', processor)
def shutdown(self):
with self._start_stop_lock:
@ -320,30 +288,34 @@ class WarcproxController(object):
self.logger.info('warcprox is not running')
return
for wwt in self.warc_writer_threads:
wwt.stop.set()
# for wwt in self.warc_writer_threads:
# wwt.stop.set()
for processor in self._postfetch_chain:
processor.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
for processor in self._postfetch_chain:
processor.join()
# if self.playback_proxy is not None:
# self.playback_proxy.shutdown()
# self.playback_proxy.server_close()
# if self.playback_proxy.playback_index_db is not None:
# self.playback_proxy.playback_index_db.close()
# wait for threads to finish
for wwt in self.warc_writer_threads:
wwt.join()
# # wait for threads to finish
# for wwt in self.warc_writer_threads:
# wwt.join()
if self.proxy.stats_db:
self.proxy.stats_db.stop()
# if self.proxy.stats_db:
# self.proxy.stats_db.stop()
self.proxy_thread.join()
if self.playback_proxy is not None:
self.playback_proxy_thread.join()
# self.proxy_thread.join()
# if self.playback_proxy is not None:
# self.playback_proxy_thread.join()
if self.service_registry and hasattr(self, "status_info"):
self.service_registry.unregister(self.status_info["id"])
# if self.service_registry and hasattr(self, "status_info"):
# self.service_registry.unregister(self.status_info["id"])
def run_until_shutdown(self):
"""

View File

@ -35,6 +35,14 @@ from urllib3.exceptions import HTTPError
urllib3.disable_warnings()
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
def __init__(self, dedup_db, inq, outq, base32=False, profile=False):
warcprox.BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile)
self.dedup_db = dedup_db
self.base32 = base32
def _process_url(self, recorded_url):
decorate_with_dedup_info(self.dedup_db, recorded_url, self.base32)
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -61,6 +69,12 @@ class DedupDb(object):
conn.commit()
conn.close()
def loader(self, inq, outq, profile=False):
return DedupLoader(self, inq, outq, self.options.base32, profile)
def storer(self, inq, outq, profile=False):
return warcprox.ListenerPostfetchProcessor(self, inq, outq, profile)
def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
@ -106,20 +120,20 @@ class DedupDb(object):
else:
self.save(digest_key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
recorded_url.dedup_info = dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = dedup_db.lookup(digest_key,
url=recorded_url.url)
recorded_url.dedup_info = dedup_db.lookup(
digest_key, url=recorded_url.url)
class RethinkDedupDb:
class RethinkDedupDb(DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, options=warcprox.Options()):
@ -181,7 +195,7 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class CdxServerDedup(object):
class CdxServerDedup(DedupDb):
"""Query a CDX server to perform deduplication.
"""
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
@ -244,7 +258,7 @@ class CdxServerDedup(object):
"""
pass
class TroughDedupDb(object):
class TroughDedupDb(DedupDb):
'''
https://github.com/internetarchive/trough
'''

View File

@ -198,12 +198,12 @@ def dump_state(signum=None, frame=None):
'dumping state (caught signal %s)\n%s',
signum, '\n'.join(state_strs))
def init_controller(args):
def parse_args(argv):
'''
Creates a warcprox.controller.WarcproxController configured according to
the supplied arguments (normally the result of parse_args(sys.argv)).
Parses command line arguments with argparse.
'''
options = warcprox.Options(**vars(args))
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
try:
hashlib.new(args.digest_algorithm)
@ -211,19 +211,6 @@ def init_controller(args):
logging.fatal(e)
exit(1)
controller = warcprox.controller.WarcproxController(
proxy, warc_writer_threads, playback_proxy,
service_registry=svcreg, options=options)
return controller
def parse_args(argv):
'''
Parses command line arguments with argparse.
'''
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
return args
def main(argv=None):
@ -249,7 +236,8 @@ def main(argv=None):
# see https://github.com/pyca/cryptography/issues/2911
cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
controller = init_controller(args)
options = warcprox.Options(**vars(args))
controller = warcprox.controller.WarcproxController(options)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())

View File

@ -2,7 +2,7 @@
warcprox/writerthread.py - warc writer thread, reads from the recorded url
queue, writes warc records, runs final tasks after warc records are written
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -28,44 +28,34 @@ except ImportError:
import Queue as queue
import logging
import threading
import time
from datetime import datetime
from hanzo import warctools
import warcprox
import sys
class WarcWriterThread(threading.Thread):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread")
class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterThread")
def __init__(
self, recorded_url_q, name='WarcWriterThread', writer_pool=None,
dedup_db=None, listeners=[], options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name=name)
self.recorded_url_q = recorded_url_q
self.stop = threading.Event()
if writer_pool:
self.writer_pool = writer_pool
else:
self.writer_pool = warcprox.writer.WarcWriterPool()
self.dedup_db = dedup_db
self.listeners = listeners
def __init__(self, inq, outq, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(
self, inq, outq, options=options)
self.options = options
self.idle = None
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def run(self):
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
def _get_process_put(self):
try:
warcprox.BaseStandardPostfetchProcessor._get_process_put(self)
finally:
self.writer_pool.maybe_idle_rollover()
def _process_url(self, recorded_url):
if self._should_archive(recorded_url):
records = self.writer_pool.write_records(recorded_url)
recorded_url.warc_records = records
self._log(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
_ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'}
def _filter_accepts(self, recorded_url):
if not self.method_filter:
return True
@ -81,68 +71,9 @@ class WarcWriterThread(threading.Thread):
# special warc name prefix '-' means "don't archive"
return prefix != '-' and self._filter_accepts(recorded_url)
def _run(self):
self.name = '%s(tid=%s)'% (self.name, warcprox.gettid())
while not self.stop.is_set():
try:
while True:
try:
if self.stop.is_set():
qsize = self.recorded_url_q.qsize()
if qsize % 50 == 0:
self.logger.info("%s urls left to write", qsize)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
records = []
self.idle = None
if self._should_archive(recorded_url):
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
except queue.Empty:
if self.stop.is_set():
break
self.idle = time.time()
finally:
self.writer_pool.maybe_idle_rollover()
self.logger.info('WarcWriterThread shutting down')
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'WarcWriterThread will try to continue after unexpected '
'error', exc_info=True)
time.sleep(0.5)
def _shutdown(self):
self.writer_pool.close_writers()
for listener in self.listeners:
if hasattr(listener, 'stop'):
try:
listener.stop()
except:
self.logger.error(
'%s raised exception', listener.stop, exc_info=True)
# closest thing we have to heritrix crawl log at the moment
def _log(self, recorded_url, records):
try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8")
payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8")
except:
payload_digest = "-"
@ -156,13 +87,3 @@ class WarcWriterThread(threading.Thread):
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)
def _final_tasks(self, recorded_url, records):
if self.listeners:
for listener in self.listeners:
try:
listener.notify(recorded_url, records)
except:
self.logger.error('%s raised exception',
listener.notify, exc_info=True)
self._log(recorded_url, records)