diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 2cd62cd..1934895 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -106,6 +106,7 @@ class BasePostfetchProcessor(threading.Thread): # these should be set before thread is started self.inq = None self.outq = None + self.profiler = None def run(self): if self.options.profile: @@ -129,6 +130,7 @@ class BasePostfetchProcessor(threading.Thread): raise Exception('not implemented') def _run(self): + logging.info('%s starting up', self) self._startup() while not self.stop.is_set(): try: diff --git a/warcprox/controller.py b/warcprox/controller.py index 9902cb5..4a0c09a 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -167,20 +167,22 @@ class WarcproxController(object): if self.playback_proxy: self._postfetch_chain.append( warcprox.ListenerPostfetchProcessor( - self.playback_proxy.playback_index_db)) + self.playback_proxy.playback_index_db, self.options)) crawl_logger = Factory.crawl_logger(self.options) if crawl_logger: self._postfetch_chain.append( - warcprox.ListenerPostfetchProcessor(crawl_logger)) + warcprox.ListenerPostfetchProcessor( + crawl_logger, self.options)) self._postfetch_chain.append( - warcprox.ListenerPostfetchProcessor(self.proxy.running_stats)) + warcprox.ListenerPostfetchProcessor( + self.proxy.running_stats, self.options)) for qualname in self.options.plugins or []: plugin = Factory.plugin(qualname) self._postfetch_chain.append( - warcprox.ListenerPostfetchProcessor(plugin)) + warcprox.ListenerPostfetchProcessor(plugin, self.options)) # chain them all up self._postfetch_chain[0].inq = inq @@ -285,7 +287,6 @@ class WarcproxController(object): for processor in self._postfetch_chain: processor.start() - logging.info('started postfetch processor %r', processor) def shutdown(self): with self._start_stop_lock: @@ -390,6 +391,9 @@ class WarcproxController(object): # postfetch processors for processor in self._postfetch_chain: + if not processor.profiler: + self.logger.notice('%s has no profiling data', processor) + continue file = os.path.join(tmpdir, '%s.dat' % processor.ident) processor.profiler.dump_stats(file) buf = io.StringIO() diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0b52ffb..950c110 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -260,11 +260,42 @@ class CdxServerDedup(DedupDb): """ pass +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): + def __init__(self, trough_dedup_db, options=warcprox.Options()): + warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + self.trough_dedup_db = trough_dedup_db + + def _filter_and_bucketize(self, batch): + ''' + Returns `{bucket: [recorded_url, ...]}`, excluding urls that should + have dedup info stored. + ''' + buckets = collections.defaultdict(list) + for recorded_url in batch: + if (recorded_url.warc_records + and recorded_url.warc_records[0].type == b'response' + and recorded_url.response_recorder.payload_size() > 0): + if (recorded_url.warcprox_meta + and 'captures-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['captures-bucket'] + else: + bucket = '__unspecified__' + buckets[bucket].append(recorded_url) + return buckets + + def _process_batch(self, batch): + buckets = self._filter_and_bucketize(batch) + for bucket in buckets: + self.trough_dedup_db.batch_save(buckets[bucket], bucket) + class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) self.trough_dedup_db = trough_dedup_db + def _startup(self): + self.trough_dedup_db.start() + def _filter_and_bucketize(self, batch): ''' Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not @@ -275,7 +306,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): if (recorded_url.response_recorder and recorded_url.payload_digest and recorded_url.response_recorder.payload_size() > 0): - if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + if (recorded_url.warcprox_meta + and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] else: bucket = '__unspecified__' @@ -315,7 +347,8 @@ class TroughDedupDb(DedupDb): ' url varchar(2100) not null,\n' ' date datetime not null,\n' ' id varchar(100));\n') # warc record id - WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' + WRITE_SQL_TMPL = ('insert or ignore into dedup\n' + '(digest_key, url, date, id)\n' 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): @@ -323,8 +356,11 @@ class TroughDedupDb(DedupDb): self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) - def loader(self, options=warcprox.Options()): - return BatchTroughLoader(self, options) + def loader(self, *args, **kwargs): + return BatchTroughLoader(self, self.options) + + def storer(self, *args, **kwargs): + return BatchTroughStorer(self, self.options) def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) @@ -337,6 +373,21 @@ class TroughDedupDb(DedupDb): bucket, self.WRITE_SQL_TMPL, (digest_key, url, warc_date, record_id), self.SCHEMA_ID) + def batch_save(self, batch, bucket='__unspecified__'): + sql_tmpl = ('insert or ignore into dedup\n' + '(digest_key, url, date, id)\n' + 'values %s;' % ','.join( + '(%s,%s,%s,%s)' for i in range(len(batch)))) + values = [] + for recorded_url in batch: + values.extend([ + warcprox.digest_str( + recorded_url.payload_digest, self.options.base32), + recorded_url.url, + recorded_url.warc_records[0].date, + recorded_url.warc_records[0].id,]) + self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID) + def lookup(self, digest_key, bucket='__unspecified__', url=None): results = self._trough_cli.read( bucket, 'select * from dedup where digest_key=%s;',