kafka capture feed, for druid

This commit is contained in:
Noah Levitt 2015-09-04 01:33:19 +00:00
parent b30218027e
commit d98f03012b
4 changed files with 67 additions and 5 deletions

View File

@ -2,3 +2,4 @@ certauth>=1.1.0
rethinkdb rethinkdb
git+https://github.com/internetarchive/warctools.git git+https://github.com/internetarchive/warctools.git
git+https://github.com/nlevitt/surt.git@py3 git+https://github.com/nlevitt/surt.git@py3
kafka-python

View File

@ -64,3 +64,4 @@ import warcprox.warc as warc
import warcprox.writerthread as writerthread import warcprox.writerthread as writerthread
import warcprox.stats as stats import warcprox.stats as stats
import warcprox.bigtable as bigtable import warcprox.bigtable as bigtable
import warcprox.kafkafeed as kafkafeed

56
warcprox/kafkafeed.py Normal file
View File

@ -0,0 +1,56 @@
import kafka
import datetime
import json
import logging
from hanzo import warctools
class CaptureFeed:
logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed')
def __init__(self, broker_list, topic):
self.broker_list = broker_list
self.topic = topic.encode('utf-8')
self._producer = kafka.SimpleProducer(kafka.KafkaClient(broker_list))
def notify(self, recorded_url, records):
if records[0].type not in ('revisit', 'response'):
return
try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8')
except:
payload_digest = '-'
# {"status_code":200,"content_digest":"sha1:3VU56HI3BTMDZBL2TP7SQYXITT7VEAJQ","host":"www.kaosgl.com","via":"http://www.kaosgl.com/sayfa.php?id=4427","account_id":"877","seed":"http://www.kaosgl.com/","warc_filename":"ARCHIVEIT-6003-WEEKLY-JOB171310-20150903100014694-00002.warc.gz","url":"http://www.kaosgl.com/resim/HomofobiKarsitiBulusma/trabzon05.jpg","size":29700,"start_time_plus_duration":"20150903175709637+1049","timestamp":"2015-09-03T17:57:10.707Z","mimetype":"image/jpeg","collection_id":"6003","is_test_crawl":"false","job_name":"6003-20150902172136074","warc_offset":856320200,"thread":6,"hop_path":"RLLLLLE","extra_info":{},"annotations":"duplicate:digest","content_length":29432}
now = datetime.datetime.utcnow()
d = {
'timestamp': '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'size': recorded_url.size,
'status_code': recorded_url.status,
'url': recorded_url.url.decode('utf-8'),
'mimetype': recorded_url.mimetype,
'content_digest': payload_digest,
'warc_filename': records[0].warc_filename,
'warc_offset': records[0].offset,
'host': recorded_url.host,
'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '',
'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset,
'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format(
recorded_url.timestamp, recorded_url.timestamp.microsecond//1000,
int(recorded_url.duration.total_seconds() * 1000)),
# 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?)
# 'via': ?
# 'thread': ? # not needed
}
# fields expected to be populated here are (for archive-it):
# account_id, collection_id, is_test_crawl, seed, job_name
if recorded_url.warcprox_meta and 'capture-feed-extra-fields' in recorded_url.warcprox_meta:
for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items():
d[k] = v
msg = json.dumps(d, separators=(',', ':')).encode('utf-8')
self.logger.debug('feeding kafka %s', msg)
self._producer.send_messages(self.topic, msg)

View File

@ -68,19 +68,19 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox", arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table', arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False, dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list',
default=None, help='kafka broker list for capture feed')
arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
default=None, help='kafka capture feed topic')
arg_parser.add_argument('--version', action='version', arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.version_str)) version="warcprox {}".format(warcprox.version_str))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
# [--description=warcinfo description]
# [--operator=warcinfo operator]
# [--httpheader=warcinfo httpheader]
return arg_parser return arg_parser
@ -144,6 +144,10 @@ def main(argv=sys.argv):
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db) listeners.append(stats_db)
if args.kafka_broker_list and args.kafka_capture_feed_topic:
kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(args.kafka_broker_list, args.kafka_capture_feed_topic)
listeners.append(kafka_capture_feed)
recorded_url_q = queue.Queue() recorded_url_q = queue.Queue()
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]