do not require --kafka-capture-feed-topic to make the kafka capture feed work (it can be configured per job or per site)

This commit is contained in:
Noah Levitt 2016-07-05 11:51:56 -05:00
parent b82d82b5f1
commit 5eed7061b1
4 changed files with 35 additions and 34 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.0.dev22', version='2.0.dev23',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -1,24 +1,24 @@
# '''
# warcprox/kafkafeed.py - support for publishing information about archived warcprox/kafkafeed.py - support for publishing information about archived
# urls to apache kafka urls to apache kafka
#
# Copyright (C) 2015-2016 Internet Archive Copyright (C) 2015-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2 as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version. of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA. USA.
# '''
import kafka import kafka
import datetime import datetime
@ -29,7 +29,7 @@ from hanzo import warctools
class CaptureFeed: class CaptureFeed:
logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed') logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed')
def __init__(self, broker_list, topic): def __init__(self, broker_list, topic=None):
self.broker_list = broker_list self.broker_list = broker_list
self.topic = topic self.topic = topic
self._producer = kafka.KafkaProducer(bootstrap_servers=broker_list) self._producer = kafka.KafkaProducer(bootstrap_servers=broker_list)
@ -38,6 +38,10 @@ class CaptureFeed:
if records[0].type not in (b'revisit', b'response'): if records[0].type not in (b'revisit', b'response'):
return return
topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic)
if not topic:
return
try: try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8') payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8')
except: except:
@ -72,8 +76,6 @@ class CaptureFeed:
for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items(): for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items():
d[k] = v d[k] = v
topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic)
msg = json.dumps(d, separators=(',', ':')).encode('utf-8') msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg) self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg)
self._producer.send(topic, msg) self._producer.send(topic, msg)

View File

@ -34,7 +34,6 @@ import hashlib
import argparse import argparse
import os import os
import socket import socket
import pprint
import traceback import traceback
import signal import signal
import threading import threading
@ -118,18 +117,19 @@ def dump_state(signum=None, frame=None):
''' '''
Signal handler, logs stack traces of active threads. Signal handler, logs stack traces of active threads.
''' '''
pp = pprint.PrettyPrinter(indent=4)
state_strs = [] state_strs = []
for th in threading.enumerate(): for th in threading.enumerate():
try: try:
state_strs.append(str(th)) state_strs.append(str(th))
except AssertionError: except AssertionError:
state_strs.append("<n/a:AssertionError>") state_strs.append('<n/a:AssertionError>')
stack = traceback.format_stack(sys._current_frames()[th.ident]) stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack)) state_strs.append(''.join(stack))
logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) logging.warn(
'dumping state (caught signal %s)\n%s',
signum, '\n'.join(state_strs))
def init_controller(args): def init_controller(args):
''' '''
@ -171,8 +171,9 @@ def init_controller(args):
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db) listeners.append(stats_db)
if args.kafka_broker_list and args.kafka_capture_feed_topic: if args.kafka_broker_list:
kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(args.kafka_broker_list, args.kafka_capture_feed_topic) kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(
args.kafka_broker_list, args.kafka_capture_feed_topic)
listeners.append(kafka_capture_feed) listeners.append(kafka_capture_feed)
recorded_url_q = queue.Queue(maxsize=args.queue_size) recorded_url_q = queue.Queue(maxsize=args.queue_size)

View File

@ -132,8 +132,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
# to apply this rule if the requested url is within domain # to apply this rule if the requested url is within domain
bucket0_fields = bucket0.split(':') bucket0_fields = bucket0.split(':')
if len(bucket0_fields) == 2: if len(bucket0_fields) == 2:
self.logger.info(
'checking %s:%s', repr(limit_key), repr(limit_value))
if not warcprox.host_matches_ip_or_domain( if not warcprox.host_matches_ip_or_domain(
self.hostname, bucket0_fields[1]): self.hostname, bucket0_fields[1]):
return # else host matches, go ahead and enforce the limit return # else host matches, go ahead and enforce the limit