mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
offer WarcproxController to plugin constructors
because plugin needs to get at stuff, especially the warc writer processor, for this close api to be useful
This commit is contained in:
parent
150c1e67c6
commit
8fd1af1d04
@ -81,11 +81,14 @@ class RequestBlockedByRule(Exception):
|
|||||||
class BasePostfetchProcessor(threading.Thread):
|
class BasePostfetchProcessor(threading.Thread):
|
||||||
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
|
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
|
||||||
|
|
||||||
def __init__(self, options=Options()):
|
def __init__(self, options=Options(), controller=None, **kwargs):
|
||||||
threading.Thread.__init__(self, name=self.__class__.__name__)
|
threading.Thread.__init__(self, name=self.__class__.__name__)
|
||||||
self.options = options
|
self.options = options
|
||||||
|
self.controller = controller
|
||||||
|
|
||||||
self.stop = threading.Event()
|
self.stop = threading.Event()
|
||||||
# these should be set before thread is started
|
|
||||||
|
# these should be set by the caller before thread is started
|
||||||
self.inq = None
|
self.inq = None
|
||||||
self.outq = None
|
self.outq = None
|
||||||
self.profiler = None
|
self.profiler = None
|
||||||
@ -205,8 +208,8 @@ class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
|
|||||||
raise Exception('not implemented')
|
raise Exception('not implemented')
|
||||||
|
|
||||||
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
|
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
|
||||||
def __init__(self, listener, options=Options()):
|
def __init__(self, listener, options=Options(), controller=None, **kwargs):
|
||||||
BaseStandardPostfetchProcessor.__init__(self, options)
|
BaseStandardPostfetchProcessor.__init__(self, options, controller, **kwargs)
|
||||||
self.listener = listener
|
self.listener = listener
|
||||||
self.name = listener.__class__.__name__
|
self.name = listener.__class__.__name__
|
||||||
|
|
||||||
|
@ -93,15 +93,19 @@ class Factory:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def plugin(qualname, options):
|
def plugin(qualname, options, controller=None):
|
||||||
try:
|
try:
|
||||||
(module_name, class_name) = qualname.rsplit('.', 1)
|
(module_name, class_name) = qualname.rsplit('.', 1)
|
||||||
module_ = importlib.import_module(module_name)
|
module_ = importlib.import_module(module_name)
|
||||||
class_ = getattr(module_, class_name)
|
class_ = getattr(module_, class_name)
|
||||||
try: # new plugins take `options` argument
|
try:
|
||||||
plugin = class_(options)
|
# new plugins take `options` and `controller` arguments
|
||||||
except: # backward-compatibility
|
plugin = class_(options, controller)
|
||||||
plugin = class_()
|
except:
|
||||||
|
try: # medium plugins take `options` argument
|
||||||
|
plugin = class_(options)
|
||||||
|
except: # old plugins take no arguments
|
||||||
|
plugin = class_()
|
||||||
# check that this is either a listener or a batch processor
|
# check that this is either a listener or a batch processor
|
||||||
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
|
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
|
||||||
return plugin
|
return plugin
|
||||||
@ -229,7 +233,7 @@ class WarcproxController(object):
|
|||||||
crawl_logger, self.options))
|
crawl_logger, self.options))
|
||||||
|
|
||||||
for qualname in self.options.plugins or []:
|
for qualname in self.options.plugins or []:
|
||||||
plugin = Factory.plugin(qualname, self.options)
|
plugin = Factory.plugin(qualname, self.options, self)
|
||||||
if hasattr(plugin, 'notify'):
|
if hasattr(plugin, 'notify'):
|
||||||
self._postfetch_chain.append(
|
self._postfetch_chain.append(
|
||||||
warcprox.ListenerPostfetchProcessor(
|
warcprox.ListenerPostfetchProcessor(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user