From d98f03012b99cdd224a0881c4891fcfdc8fbeb4a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 4 Sep 2015 01:33:19 +0000 Subject: [PATCH] kafka capture feed, for druid --- requirements.txt | 1 + warcprox/__init__.py | 1 + warcprox/kafkafeed.py | 56 +++++++++++++++++++++++++++++++++++++++++++ warcprox/main.py | 14 +++++++---- 4 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 warcprox/kafkafeed.py diff --git a/requirements.txt b/requirements.txt index dcc1f62..a320b31 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ certauth>=1.1.0 rethinkdb git+https://github.com/internetarchive/warctools.git git+https://github.com/nlevitt/surt.git@py3 +kafka-python diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 5dc9073..ce56e59 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -64,3 +64,4 @@ import warcprox.warc as warc import warcprox.writerthread as writerthread import warcprox.stats as stats import warcprox.bigtable as bigtable +import warcprox.kafkafeed as kafkafeed diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py new file mode 100644 index 0000000..65f3f1d --- /dev/null +++ b/warcprox/kafkafeed.py @@ -0,0 +1,56 @@ +import kafka +import datetime +import json +import logging +from hanzo import warctools + +class CaptureFeed: + logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed') + + def __init__(self, broker_list, topic): + self.broker_list = broker_list + self.topic = topic.encode('utf-8') + self._producer = kafka.SimpleProducer(kafka.KafkaClient(broker_list)) + + def notify(self, recorded_url, records): + if records[0].type not in ('revisit', 'response'): + return + + try: + payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('utf-8') + except: + payload_digest = '-' + + # {"status_code":200,"content_digest":"sha1:3VU56HI3BTMDZBL2TP7SQYXITT7VEAJQ","host":"www.kaosgl.com","via":"http://www.kaosgl.com/sayfa.php?id=4427","account_id":"877","seed":"http://www.kaosgl.com/","warc_filename":"ARCHIVEIT-6003-WEEKLY-JOB171310-20150903100014694-00002.warc.gz","url":"http://www.kaosgl.com/resim/HomofobiKarsitiBulusma/trabzon05.jpg","size":29700,"start_time_plus_duration":"20150903175709637+1049","timestamp":"2015-09-03T17:57:10.707Z","mimetype":"image/jpeg","collection_id":"6003","is_test_crawl":"false","job_name":"6003-20150902172136074","warc_offset":856320200,"thread":6,"hop_path":"RLLLLLE","extra_info":{},"annotations":"duplicate:digest","content_length":29432} + + now = datetime.datetime.utcnow() + d = { + 'timestamp': '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), + 'size': recorded_url.size, + 'status_code': recorded_url.status, + 'url': recorded_url.url.decode('utf-8'), + 'mimetype': recorded_url.mimetype, + 'content_digest': payload_digest, + 'warc_filename': records[0].warc_filename, + 'warc_offset': records[0].offset, + 'host': recorded_url.host, + 'annotations': 'duplicate:digest' if records[0].type == 'revisit' else '', + 'content_length': recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset, + 'start_time_plus_duration': '{:%Y%m%d%H%M%S}{:03d}+{}'.format( + recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, + int(recorded_url.duration.total_seconds() * 1000)), + # 'hop_path': ? # only used for seed redirects, which are n/a to brozzler (?) + # 'via': ? + # 'thread': ? # not needed + } + + # fields expected to be populated here are (for archive-it): + # account_id, collection_id, is_test_crawl, seed, job_name + if recorded_url.warcprox_meta and 'capture-feed-extra-fields' in recorded_url.warcprox_meta: + for (k,v) in recorded_url.warcprox_meta['capture-feed-extra-fields'].items(): + d[k] = v + + msg = json.dumps(d, separators=(',', ':')).encode('utf-8') + self.logger.debug('feeding kafka %s', msg) + self._producer.send_messages(self.topic, msg) + diff --git a/warcprox/main.py b/warcprox/main.py index 397f4db..cd0cbbe 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -68,19 +68,19 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox", + arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument('--rethinkdb-big-table', dest='rethinkdb_big_table', action='store_true', default=False, help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') + arg_parser.add_argument('--kafka-broker-list', dest='kafka_broker_list', + default=None, help='kafka broker list for capture feed') + arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', + default=None, help='kafka capture feed topic') arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.version_str)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') - # [--ispartof=warcinfo ispartof] - # [--description=warcinfo description] - # [--operator=warcinfo operator] - # [--httpheader=warcinfo httpheader] return arg_parser @@ -144,6 +144,10 @@ def main(argv=sys.argv): stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) listeners.append(stats_db) + if args.kafka_broker_list and args.kafka_capture_feed_topic: + kafka_capture_feed = warcprox.kafkafeed.CaptureFeed(args.kafka_broker_list, args.kafka_capture_feed_topic) + listeners.append(kafka_capture_feed) + recorded_url_q = queue.Queue() ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]