From 2c95a1f2eefe96e188a31fa1483ba0e658f7fb44 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 28 Jun 2017 13:12:30 -0700 Subject: [PATCH] remove kafka feed code --- README.rst | 55 +++++++++--------- benchmarks/run-benchmarks.py | 6 -- setup.py | 1 - warcprox/__init__.py | 1 - warcprox/kafkafeed.py | 104 ----------------------------------- warcprox/main.py | 12 ---- 6 files changed, 29 insertions(+), 150 deletions(-) delete mode 100644 warcprox/kafkafeed.py diff --git a/README.rst b/README.rst index 2e0edb2..b9c1c5f 100644 --- a/README.rst +++ b/README.rst @@ -48,10 +48,8 @@ Usage [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] - [--kafka-broker-list KAFKA_BROKER_LIST] - [--kafka-capture-feed-topic KAFKA_CAPTURE_FEED_TOPIC] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] - [--version] [-v] [--trace] [-q] + [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] warcprox - WARC writing MITM HTTP/S proxy @@ -62,35 +60,35 @@ Usage address to listen on (default: localhost) -c CACERT, --cacert CACERT CA certificate file; if file does not exist, it - will be created (default: ./MacBook-Pro.local- - warcprox-ca.pem) + will be created (default: + ./ayutla.monkeybrains.net-warcprox-ca.pem) --certs-dir CERTS_DIR where to store and load generated certificates - (default: ./MacBook-Pro.local-warcprox-ca) + (default: ./ayutla.monkeybrains.net-warcprox-ca) -d DIRECTORY, --dir DIRECTORY where to write warcs (default: ./warcs) - -z, --gzip write gzip-compressed warc records (default: - False) + -z, --gzip write gzip-compressed warc records -n PREFIX, --prefix PREFIX WARC filename prefix (default: WARCPROX) -s SIZE, --size SIZE WARC file rollover size threshold in bytes (default: 1000000000) --rollover-idle-time ROLLOVER_IDLE_TIME WARC file rollover idle time threshold in seconds - (so that Friday's last open WARC doesn't sit there - all weekend waiting for more data) (default: None) + (so that Friday's last open WARC doesn't sit + there all weekend waiting for more data) + (default: None) -g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM - digest algorithm, one of sha1, md5, sha512, - sha224, sha384, sha256 (default: sha1) - --base32 write digests in Base32 instead of hex (default: - False) + digest algorithm, one of sha1, sha384, sha512, + md5, sha224, sha256 (default: sha1) + --base32 write digests in Base32 instead of hex --method-filter HTTP_METHOD - only record requests with the given http method(s) - (can be used more than once) (default: None) + only record requests with the given http + method(s) (can be used more than once) (default: + None) --stats-db-file STATS_DB_FILE persistent statistics database file; empty string or /dev/null disables statistics tracking - (default: ./warcprox-stats.db) + (default: ./warcprox.sqlite) -P PLAYBACK_PORT, --playback-port PLAYBACK_PORT port to listen on for instant playback (default: None) @@ -101,7 +99,7 @@ Usage -j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE persistent deduplication database file; empty string or /dev/null disables deduplication - (default: ./warcprox-dedup.db) + (default: ./warcprox.sqlite) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. @@ -115,15 +113,20 @@ Usage use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored - unless --rethinkdb-servers is specified) (default: - False) - --kafka-broker-list KAFKA_BROKER_LIST - kafka broker list for capture feed (default: None) - --kafka-capture-feed-topic KAFKA_CAPTURE_FEED_TOPIC - kafka capture feed topic (default: None) + unless --rethinkdb-servers is specified) --onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY - host:port of tor socks proxy, used only to connect - to .onion sites (default: None) + host:port of tor socks proxy, used only to + connect to .onion sites (default: None) + --plugin PLUGIN_CLASS + Qualified name of plugin class, e.g. + "mypkg.mymod.MyClass". May be used multiple times + to register multiple plugins. Plugin classes are + loaded from the regular python module search + path. They will be instantiated with no arguments + and must have a method `notify(self, + recorded_url, records)` which will be called for + each url, after warc records have been written. + (default: None) --version show program's version number and exit -v, --verbose --trace diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index 9ab6c58..f595f8b 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -215,12 +215,6 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. 'small table called "dedup"; table is suitable for use as ' 'index for playback (ignored unless --rethinkdb-servers is ' 'specified)')) - arg_parser.add_argument( - '--kafka-broker-list', dest='kafka_broker_list', default=None, - help='kafka broker list for capture feed') - arg_parser.add_argument( - '--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', - default=None, help='kafka capture feed topic') arg_parser.add_argument( '--queue-size', dest='queue_size', type=int, default=1, help=( 'max size of the queue of urls waiting to be processed by ' diff --git a/setup.py b/setup.py index c06fed7..dc59ca8 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,6 @@ class PyTest(setuptools.command.test.test): deps = [ 'certauth==1.1.6', 'warctools', - 'kafka-python>=1.0.1', 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev81', 'PySocks', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index fade9a0..5564ff3 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -114,4 +114,3 @@ import warcprox.warc as warc import warcprox.writerthread as writerthread import warcprox.stats as stats import warcprox.bigtable as bigtable -import warcprox.kafkafeed as kafkafeed diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py deleted file mode 100644 index 64f8594..0000000 --- a/warcprox/kafkafeed.py +++ /dev/null @@ -1,104 +0,0 @@ -''' -warcprox/kafkafeed.py - support for publishing information about archived -urls to apache kafka - -Copyright (C) 2015-2016 Internet Archive - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -USA. -''' - -import kafka -import datetime -import json -import logging -from hanzo import warctools -import threading - -class CaptureFeed: - logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed') - - def __init__(self, broker_list, topic=None): - self.broker_list = broker_list - self.topic = topic - self.__producer = None - self._connection_exception = None - self._lock = threading.RLock() - - def _producer(self): - with self._lock: - if not self.__producer: - try: - # acks=0 to avoid ever blocking - self.__producer = kafka.KafkaProducer( - bootstrap_servers=self.broker_list, acks=0) - if self._connection_exception: - logging.info('connected to kafka successfully!') - self._connection_exception = None - except Exception as e: - if not self._connection_exception: - self._connection_exception = e - logging.error('problem connecting to kafka', exc_info=1) - - return self.__producer - - def notify(self, recorded_url, records): - if records[0].type not in (b'revisit', b'response'): - return - - topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic) - if not topic: - return - - try: - payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8') - except: - payload_digest = '-' - - # {"status_code":200,"content_digest":"sha1:3VU56HI3BTMDZBL2TP7SQYXITT7VEAJQ","host":"www.kaosgl.com","via":"http://www.kaosgl.com/sayfa.php?id=4427","account_id":"877","seed":"http://www.kaosgl.com/","warc_filename":"ARCHIVEIT-6003-WEEKLY-JOB171310-20150903100014694-00002.warc.gz","url":"http://www.kaosgl.com/resim/HomofobiKarsitiBulusma/trabzon05.jpg","size":29700,"start_time_plus_duration":"20150903175709637+1049","timestamp":"2015-09-03T17:57:10.707Z","mimetype":"image/jpeg","collection_id":"6003","is_test_crawl":"false","job_name":"6003-20150902172136074","warc_offset":856320200,"thread":6,"hop_path":"RLLLLLE","extra_info":{},"annotations":"duplicate:digest","content_length":29432} - - now = datetime.datetime.utcnow() - d = { - 'timestamp': '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), - 'size': recorded_url.size, - 'status_code': recorded_url.status, - 'url': recorded_url.url.decode('utf-8'), - 'mimetype': recorded_url.mimetype, - 'content_digest': payload_digest, - 'warc_filename': records[0].warc_filename, - 'warc_offset': records[0].offset, - 'host': recorded_url.host, - 'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '', - 'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset, - 'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format( - recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, - int(recorded_url.duration.total_seconds() * 1000)), - # 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?) - # 'via': ? - # 'thread': ? # not needed - } - - # fields expected to be populated here are (for archive-it): - # account_id, collection_id, is_test_crawl, seed, job_name - if recorded_url.warcprox_meta and 'capture-feed-extra-fields' in recorded_url.warcprox_meta: - for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items(): - d[k] = v - - msg = json.dumps(d, separators=(',', ':')).encode('utf-8') - self.logger.debug('feeding kafka topic=%r msg=%r', topic, msg) - p = self._producer() - if p: - p.send(topic, msg) - diff --git a/warcprox/main.py b/warcprox/main.py index df56f1b..ca6b305 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -114,12 +114,6 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('--rethinkdb-big-table', dest='rethinkdb_big_table', action='store_true', default=False, help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument( - '--kafka-broker-list', dest='kafka_broker_list', default=None, - help=argparse.SUPPRESS) - arg_parser.add_argument( - '--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', - default=None, help=argparse.SUPPRESS) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, @@ -209,11 +203,6 @@ def init_controller(args): stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) listeners.append(stats_db) - if args.kafka_broker_list: - kafka_capture_feed = warcprox.kafkafeed.CaptureFeed( - args.kafka_broker_list, args.kafka_capture_feed_topic) - listeners.append(kafka_capture_feed) - recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size) ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] @@ -309,7 +298,6 @@ def main(argv=sys.argv): format=( '%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) - logging.getLogger('kafka').setLevel(loglevel + 5) real_main(args)