diff --git a/setup.py b/setup.py index 0062bac..d785cbc 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.0.dev22', + version='2.0.dev23', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py index e9d2176..8b2dbf9 100644 --- a/warcprox/kafkafeed.py +++ b/warcprox/kafkafeed.py @@ -1,24 +1,24 @@ -# -# warcprox/kafkafeed.py - support for publishing information about archived -# urls to apache kafka -# -# Copyright (C) 2015-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +''' +warcprox/kafkafeed.py - support for publishing information about archived +urls to apache kafka + +Copyright (C) 2015-2016 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' import kafka import datetime @@ -29,7 +29,7 @@ from hanzo import warctools class CaptureFeed: logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed') - def __init__(self, broker_list, topic): + def __init__(self, broker_list, topic=None): self.broker_list = broker_list self.topic = topic self._producer = kafka.KafkaProducer(bootstrap_servers=broker_list) @@ -38,6 +38,10 @@ class CaptureFeed: if records[0].type not in (b'revisit', b'response'): return + topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic) + if not topic: + return + try: payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8') except: @@ -72,8 +76,6 @@ class CaptureFeed: for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items(): d[k] = v - topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic) - msg = json.dumps(d, separators=(',', ':')).encode('utf-8') self.logger.debug('feeding kafka topic=%s msg=%s', repr(topic), msg) self._producer.send(topic, msg) diff --git a/warcprox/main.py b/warcprox/main.py index c4c3006..18a316e 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -34,7 +34,6 @@ import hashlib import argparse import os import socket -import pprint import traceback import signal import threading @@ -118,18 +117,19 @@ def dump_state(signum=None, frame=None): ''' Signal handler, logs stack traces of active threads. ''' - pp = pprint.PrettyPrinter(indent=4) state_strs = [] for th in threading.enumerate(): try: state_strs.append(str(th)) except AssertionError: - state_strs.append("") + state_strs.append('') stack = traceback.format_stack(sys._current_frames()[th.ident]) - state_strs.append("".join(stack)) + state_strs.append(''.join(stack)) - logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + logging.warn( + 'dumping state (caught signal %s)\n%s', + signum, '\n'.join(state_strs)) def init_controller(args): ''' @@ -171,8 +171,9 @@ def init_controller(args): stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) listeners.append(stats_db) - if args.kafka_broker_list and args.kafka_capture_feed_topic: - kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(args.kafka_broker_list, args.kafka_capture_feed_topic) + if args.kafka_broker_list: + kafka_capture_feed = warcprox.kafkafeed.CaptureFeed( + args.kafka_broker_list, args.kafka_capture_feed_topic) listeners.append(kafka_capture_feed) recorded_url_q = queue.Queue(maxsize=args.queue_size) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 0dc736e..4bf693f 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -132,8 +132,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # to apply this rule if the requested url is within domain bucket0_fields = bucket0.split(':') if len(bucket0_fields) == 2: - self.logger.info( - 'checking %s:%s', repr(limit_key), repr(limit_value)) if not warcprox.host_matches_ip_or_domain( self.hostname, bucket0_fields[1]): return # else host matches, go ahead and enforce the limit