use kafka-python 1.0 recommended api; use kafka capture feed specified in warcprox-meta header, if any

This commit is contained in:
Noah Levitt 2016-03-03 18:58:52 +00:00
parent ee3ee5d621
commit 89f965d1d3
2 changed files with 9 additions and 7 deletions

View File

@ -19,7 +19,7 @@ class PyTest(TestCommand):
deps = [
'certauth>=1.1.0',
'warctools',
'kafka-python',
'kafka-python>=1.0.1',
'surt>=0.3b4',
'rethinkstuff',
'PySocks',

View File

@ -9,8 +9,8 @@ class CaptureFeed:
def __init__(self, broker_list, topic):
self.broker_list = broker_list
self.topic = topic.encode('utf-8')
self._producer = kafka.SimpleProducer(kafka.KafkaClient(broker_list))
self.topic = topic
self._producer = kafka.KafkaProducer(bootstrap_servers=broker_list)
def notify(self, recorded_url, records):
if records[0].type not in (b'revisit', b'response'):
@ -37,7 +37,7 @@ class CaptureFeed:
'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '',
'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset,
'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format(
recorded_url.timestamp, recorded_url.timestamp.microsecond//1000,
recorded_url.timestamp, recorded_url.timestamp.microsecond//1000,
int(recorded_url.duration.total_seconds() * 1000)),
# 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?)
# 'via': ?
@ -50,7 +50,9 @@ class CaptureFeed:
for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items():
d[k] = v
msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka %s', msg)
self._producer.send_messages(self.topic, msg)
topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic)
msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg)
self._producer.send(topic, msg)