diff --git a/setup.py b/setup.py index 147cb35..4eeb13d 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.0b2.dev27', + version='2.0b2.dev28', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py index 683e925..e17d2c4 100644 --- a/warcprox/kafkafeed.py +++ b/warcprox/kafkafeed.py @@ -32,9 +32,24 @@ class CaptureFeed: def __init__(self, broker_list, topic=None): self.broker_list = broker_list self.topic = topic - # acks=0 to avoid ever blocking - self._producer = kafka.KafkaProducer( - bootstrap_servers=broker_list, acks=0) + self.__producer = None + self._connection_exception = None + + def _producer(self): + if not self.__producer: + try: + # acks=0 to avoid ever blocking + self.__producer = kafka.KafkaProducer( + bootstrap_servers=self.broker_list, acks=0) + if self._connection_exception: + logging.info('connected to kafka successfully!') + self._connection_exception = None + except Exception as e: + if not self._connection_exception: + self._connection_exception = e + logging.error('problem connecting to kafka', exc_info=True) + + return self.__producer def notify(self, recorded_url, records): if records[0].type not in (b'revisit', b'response'): @@ -80,5 +95,7 @@ class CaptureFeed: msg = json.dumps(d, separators=(',', ':')).encode('utf-8') self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg) - self._producer.send(topic, msg) + p = self._producer() + if p: + p.send(topic, msg)