diff --git a/setup.py b/setup.py index 19d2fe3..8f0c08e 100755 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ class PyTest(TestCommand): deps = [ 'certauth>=1.1.0', 'warctools', - 'kafka-python', + 'kafka-python>=1.0.1', 'surt>=0.3b4', 'rethinkstuff', 'PySocks', diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py index dd8a0a8..8f8aea1 100644 --- a/warcprox/kafkafeed.py +++ b/warcprox/kafkafeed.py @@ -9,8 +9,8 @@ class CaptureFeed: def __init__(self, broker_list, topic): self.broker_list = broker_list - self.topic = topic.encode('utf-8') - self._producer = kafka.SimpleProducer(kafka.KafkaClient(broker_list)) + self.topic = topic + self._producer = kafka.KafkaProducer(bootstrap_servers=broker_list) def notify(self, recorded_url, records): if records[0].type not in (b'revisit', b'response'): @@ -37,7 +37,7 @@ class CaptureFeed: 'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '', 'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset, 'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format( - recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, + recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, int(recorded_url.duration.total_seconds() * 1000)), # 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?) # 'via': ? @@ -50,7 +50,9 @@ class CaptureFeed: for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items(): d[k] = v - msg = json.dumps(d, separators=(',', ':')).encode('utf-8') - self.logger.debug('feeding kafka %s', msg) - self._producer.send_messages(self.topic, msg) + topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic) + + msg = json.dumps(d, separators=(',', ':')).encode('utf-8') + self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg) + self._producer.send(topic, msg)