keep trying to connect to kafka and don't let connection failure interfere with other warcprox operations

This commit is contained in:
Noah Levitt 2016-09-07 13:43:01 -07:00
parent 504af2fb0f
commit 5d44859ba8
2 changed files with 22 additions and 5 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.0b2.dev27',
version='2.0b2.dev28',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -32,9 +32,24 @@ class CaptureFeed:
def __init__(self, broker_list, topic=None):
self.broker_list = broker_list
self.topic = topic
# acks=0 to avoid ever blocking
self._producer = kafka.KafkaProducer(
bootstrap_servers=broker_list, acks=0)
self.__producer = None
self._connection_exception = None
def _producer(self):
if not self.__producer:
try:
# acks=0 to avoid ever blocking
self.__producer = kafka.KafkaProducer(
bootstrap_servers=self.broker_list, acks=0)
if self._connection_exception:
logging.info('connected to kafka successfully!')
self._connection_exception = None
except Exception as e:
if not self._connection_exception:
self._connection_exception = e
logging.error('problem connecting to kafka', exc_info=True)
return self.__producer
def notify(self, recorded_url, records):
if records[0].type not in (b'revisit', b'response'):
@ -80,5 +95,7 @@ class CaptureFeed:
msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg)
self._producer.send(topic, msg)
p = self._producer()
if p:
p.send(topic, msg)