remove kafka feed code

This commit is contained in:
Noah Levitt 2017-06-28 13:12:30 -07:00
parent 4c32394256
commit 2c95a1f2ee
6 changed files with 29 additions and 150 deletions

View File

@ -48,10 +48,8 @@ Usage
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
[--kafka-broker-list KAFKA_BROKER_LIST]
[--kafka-capture-feed-topic KAFKA_CAPTURE_FEED_TOPIC]
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
[--version] [-v] [--trace] [-q] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
warcprox - WARC writing MITM HTTP/S proxy warcprox - WARC writing MITM HTTP/S proxy
@ -62,35 +60,35 @@ Usage
address to listen on (default: localhost) address to listen on (default: localhost)
-c CACERT, --cacert CACERT -c CACERT, --cacert CACERT
CA certificate file; if file does not exist, it CA certificate file; if file does not exist, it
will be created (default: ./MacBook-Pro.local- will be created (default:
warcprox-ca.pem) ./ayutla.monkeybrains.net-warcprox-ca.pem)
--certs-dir CERTS_DIR --certs-dir CERTS_DIR
where to store and load generated certificates where to store and load generated certificates
(default: ./MacBook-Pro.local-warcprox-ca) (default: ./ayutla.monkeybrains.net-warcprox-ca)
-d DIRECTORY, --dir DIRECTORY -d DIRECTORY, --dir DIRECTORY
where to write warcs (default: ./warcs) where to write warcs (default: ./warcs)
-z, --gzip write gzip-compressed warc records (default: -z, --gzip write gzip-compressed warc records
False)
-n PREFIX, --prefix PREFIX -n PREFIX, --prefix PREFIX
WARC filename prefix (default: WARCPROX) WARC filename prefix (default: WARCPROX)
-s SIZE, --size SIZE WARC file rollover size threshold in bytes -s SIZE, --size SIZE WARC file rollover size threshold in bytes
(default: 1000000000) (default: 1000000000)
--rollover-idle-time ROLLOVER_IDLE_TIME --rollover-idle-time ROLLOVER_IDLE_TIME
WARC file rollover idle time threshold in seconds WARC file rollover idle time threshold in seconds
(so that Friday's last open WARC doesn't sit there (so that Friday's last open WARC doesn't sit
all weekend waiting for more data) (default: None) there all weekend waiting for more data)
(default: None)
-g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM -g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM
digest algorithm, one of sha1, md5, sha512, digest algorithm, one of sha1, sha384, sha512,
sha224, sha384, sha256 (default: sha1) md5, sha224, sha256 (default: sha1)
--base32 write digests in Base32 instead of hex (default: --base32 write digests in Base32 instead of hex
False)
--method-filter HTTP_METHOD --method-filter HTTP_METHOD
only record requests with the given http method(s) only record requests with the given http
(can be used more than once) (default: None) method(s) (can be used more than once) (default:
None)
--stats-db-file STATS_DB_FILE --stats-db-file STATS_DB_FILE
persistent statistics database file; empty string persistent statistics database file; empty string
or /dev/null disables statistics tracking or /dev/null disables statistics tracking
(default: ./warcprox-stats.db) (default: ./warcprox.sqlite)
-P PLAYBACK_PORT, --playback-port PLAYBACK_PORT -P PLAYBACK_PORT, --playback-port PLAYBACK_PORT
port to listen on for instant playback (default: port to listen on for instant playback (default:
None) None)
@ -101,7 +99,7 @@ Usage
-j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE -j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE
persistent deduplication database file; empty persistent deduplication database file; empty
string or /dev/null disables deduplication string or /dev/null disables deduplication
(default: ./warcprox-dedup.db) (default: ./warcprox.sqlite)
--rethinkdb-servers RETHINKDB_SERVERS --rethinkdb-servers RETHINKDB_SERVERS
rethinkdb servers, used for dedup and stats if rethinkdb servers, used for dedup and stats if
specified; e.g. specified; e.g.
@ -115,15 +113,20 @@ Usage
use a big rethinkdb table called "captures", use a big rethinkdb table called "captures",
instead of a small table called "dedup"; table is instead of a small table called "dedup"; table is
suitable for use as index for playback (ignored suitable for use as index for playback (ignored
unless --rethinkdb-servers is specified) (default: unless --rethinkdb-servers is specified)
False)
--kafka-broker-list KAFKA_BROKER_LIST
kafka broker list for capture feed (default: None)
--kafka-capture-feed-topic KAFKA_CAPTURE_FEED_TOPIC
kafka capture feed topic (default: None)
--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY --onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY
host:port of tor socks proxy, used only to connect host:port of tor socks proxy, used only to
to .onion sites (default: None) connect to .onion sites (default: None)
--plugin PLUGIN_CLASS
Qualified name of plugin class, e.g.
"mypkg.mymod.MyClass". May be used multiple times
to register multiple plugins. Plugin classes are
loaded from the regular python module search
path. They will be instantiated with no arguments
and must have a method `notify(self,
recorded_url, records)` which will be called for
each url, after warc records have been written.
(default: None)
--version show program's version number and exit --version show program's version number and exit
-v, --verbose -v, --verbose
--trace --trace

View File

@ -215,12 +215,6 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
'small table called "dedup"; table is suitable for use as ' 'small table called "dedup"; table is suitable for use as '
'index for playback (ignored unless --rethinkdb-servers is ' 'index for playback (ignored unless --rethinkdb-servers is '
'specified)')) 'specified)'))
arg_parser.add_argument(
'--kafka-broker-list', dest='kafka_broker_list', default=None,
help='kafka broker list for capture feed')
arg_parser.add_argument(
'--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
default=None, help='kafka capture feed topic')
arg_parser.add_argument( arg_parser.add_argument(
'--queue-size', dest='queue_size', type=int, default=1, help=( '--queue-size', dest='queue_size', type=int, default=1, help=(
'max size of the queue of urls waiting to be processed by ' 'max size of the queue of urls waiting to be processed by '

View File

@ -38,7 +38,6 @@ class PyTest(setuptools.command.test.test):
deps = [ deps = [
'certauth==1.1.6', 'certauth==1.1.6',
'warctools', 'warctools',
'kafka-python>=1.0.1',
'urlcanon>=0.1.dev16', 'urlcanon>=0.1.dev16',
'doublethink>=0.2.0.dev81', 'doublethink>=0.2.0.dev81',
'PySocks', 'PySocks',

View File

@ -114,4 +114,3 @@ import warcprox.warc as warc
import warcprox.writerthread as writerthread import warcprox.writerthread as writerthread
import warcprox.stats as stats import warcprox.stats as stats
import warcprox.bigtable as bigtable import warcprox.bigtable as bigtable
import warcprox.kafkafeed as kafkafeed

View File

@ -1,104 +0,0 @@
'''
warcprox/kafkafeed.py - support for publishing information about archived
urls to apache kafka
Copyright (C) 2015-2016 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
import kafka
import datetime
import json
import logging
from hanzo import warctools
import threading
class CaptureFeed:
logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed')
def __init__(self, broker_list, topic=None):
self.broker_list = broker_list
self.topic = topic
self.__producer = None
self._connection_exception = None
self._lock = threading.RLock()
def _producer(self):
with self._lock:
if not self.__producer:
try:
# acks=0 to avoid ever blocking
self.__producer = kafka.KafkaProducer(
bootstrap_servers=self.broker_list, acks=0)
if self._connection_exception:
logging.info('connected to kafka successfully!')
self._connection_exception = None
except Exception as e:
if not self._connection_exception:
self._connection_exception = e
logging.error('problem connecting to kafka', exc_info=1)
return self.__producer
def notify(self, recorded_url, records):
if records[0].type not in (b'revisit', b'response'):
return
topic = recorded_url.warcprox_meta.get('capture-feed-topic', self.topic)
if not topic:
return
try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8')
except:
payload_digest = '-'
# {"status_code":200,"content_digest":"sha1:3VU56HI3BTMDZBL2TP7SQYXITT7VEAJQ","host":"www.kaosgl.com","via":"http://www.kaosgl.com/sayfa.php?id=4427","account_id":"877","seed":"http://www.kaosgl.com/","warc_filename":"ARCHIVEIT-6003-WEEKLY-JOB171310-20150903100014694-00002.warc.gz","url":"http://www.kaosgl.com/resim/HomofobiKarsitiBulusma/trabzon05.jpg","size":29700,"start_time_plus_duration":"20150903175709637+1049","timestamp":"2015-09-03T17:57:10.707Z","mimetype":"image/jpeg","collection_id":"6003","is_test_crawl":"false","job_name":"6003-20150902172136074","warc_offset":856320200,"thread":6,"hop_path":"RLLLLLE","extra_info":{},"annotations":"duplicate:digest","content_length":29432}
now = datetime.datetime.utcnow()
d = {
'timestamp': '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'size': recorded_url.size,
'status_code': recorded_url.status,
'url': recorded_url.url.decode('utf-8'),
'mimetype': recorded_url.mimetype,
'content_digest': payload_digest,
'warc_filename': records[0].warc_filename,
'warc_offset': records[0].offset,
'host': recorded_url.host,
'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '',
'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset,
'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format(
recorded_url.timestamp, recorded_url.timestamp.microsecond//1000,
int(recorded_url.duration.total_seconds() * 1000)),
# 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?)
# 'via': ?
# 'thread': ? # not needed
}
# fields expected to be populated here are (for archive-it):
# account_id, collection_id, is_test_crawl, seed, job_name
if recorded_url.warcprox_meta and 'capture-feed-extra-fields' in recorded_url.warcprox_meta:
for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items():
d[k] = v
msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka topic=%r msg=%r', topic, msg)
p = self._producer()
if p:
p.send(topic, msg)

View File

@ -114,12 +114,6 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('--rethinkdb-big-table', arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False, dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument(
'--kafka-broker-list', dest='kafka_broker_list', default=None,
help=argparse.SUPPRESS)
arg_parser.add_argument(
'--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
default=None, help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int, arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS) default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int, arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
@ -209,11 +203,6 @@ def init_controller(args):
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db) listeners.append(stats_db)
if args.kafka_broker_list:
kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(
args.kafka_broker_list, args.kafka_capture_feed_topic)
listeners.append(kafka_capture_feed)
recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size) recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size)
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
@ -309,7 +298,6 @@ def main(argv=sys.argv):
format=( format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
logging.getLogger('kafka').setLevel(loglevel + 5)
real_main(args) real_main(args)