Merge branch 'warc-close-api' into qa

* warc-close-api:
  offer WarcproxController to plugin constructors
This commit is contained in:
Noah Levitt 2019-01-09 22:49:30 +00:00
commit 4085905a9a
2 changed files with 17 additions and 10 deletions

View File

@ -81,11 +81,14 @@ class RequestBlockedByRule(Exception):
class BasePostfetchProcessor(threading.Thread): class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor") logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, options=Options()): def __init__(self, options=Options(), controller=None, **kwargs):
threading.Thread.__init__(self, name=self.__class__.__name__) threading.Thread.__init__(self, name=self.__class__.__name__)
self.options = options self.options = options
self.controller = controller
self.stop = threading.Event() self.stop = threading.Event()
# these should be set before thread is started
# these should be set by the caller before thread is started
self.inq = None self.inq = None
self.outq = None self.outq = None
self.profiler = None self.profiler = None
@ -205,8 +208,8 @@ class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
raise Exception('not implemented') raise Exception('not implemented')
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, options=Options()): def __init__(self, listener, options=Options(), controller=None, **kwargs):
BaseStandardPostfetchProcessor.__init__(self, options) BaseStandardPostfetchProcessor.__init__(self, options, controller, **kwargs)
self.listener = listener self.listener = listener
self.name = listener.__class__.__name__ self.name = listener.__class__.__name__

View File

@ -93,15 +93,19 @@ class Factory:
return None return None
@staticmethod @staticmethod
def plugin(qualname, options): def plugin(qualname, options, controller=None):
try: try:
(module_name, class_name) = qualname.rsplit('.', 1) (module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name) module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name) class_ = getattr(module_, class_name)
try: # new plugins take `options` argument try:
plugin = class_(options) # new plugins take `options` and `controller` arguments
except: # backward-compatibility plugin = class_(options, controller)
plugin = class_() except:
try: # medium plugins take `options` argument
plugin = class_(options)
except: # old plugins take no arguments
plugin = class_()
# check that this is either a listener or a batch processor # check that this is either a listener or a batch processor
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup') assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
return plugin return plugin
@ -229,7 +233,7 @@ class WarcproxController(object):
crawl_logger, self.options)) crawl_logger, self.options))
for qualname in self.options.plugins or []: for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname, self.options) plugin = Factory.plugin(qualname, self.options, self)
if hasattr(plugin, 'notify'): if hasattr(plugin, 'notify'):
self._postfetch_chain.append( self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor( warcprox.ListenerPostfetchProcessor(