diff --git a/pywb/apps/frontendapp.py b/pywb/apps/frontendapp.py index 69382df2..25d5e4ab 100644 --- a/pywb/apps/frontendapp.py +++ b/pywb/apps/frontendapp.py @@ -11,7 +11,7 @@ from wsgiprox.wsgiprox import WSGIProxMiddleware from pywb.recorder.multifilewarcwriter import MultiFileWARCWriter from pywb.recorder.recorderapp import RecorderApp from pywb.recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy -from pywb.recorder.redisindexer import WritableRedisIndexer +from pywb.recorder.redisindexer import WritableRedisIndexer, RedisPendingCounterTempBuffer from pywb.utils.loaders import load_yaml_config from pywb.utils.geventserver import GeventServer @@ -244,8 +244,16 @@ class FrontEndApp(object): dedup_index=dedup_index, dedup_by_url=dedup_by_url) + if dedup_policy: + pending_counter = self.warcserver.dedup_index_url.replace(':cdxj', ':pending') + pending_timeout = recorder_config.get('pending_timeout', 30) + create_buff_func = lambda params, name: RedisPendingCounterTempBuffer(512 * 1024, pending_counter, params, name, pending_timeout) + else: + create_buff_func = None + self.recorder = RecorderApp(self.RECORD_SERVER % str(self.warcserver_server.port), warc_writer, - accept_colls=recorder_config.get('source_filter')) + accept_colls=recorder_config.get('source_filter'), + create_buff_func=create_buff_func) recorder_server = GeventServer(self.recorder, port=0) diff --git a/pywb/recorder/recorderapp.py b/pywb/recorder/recorderapp.py index 689d4171..83fa40ab 100644 --- a/pywb/recorder/recorderapp.py +++ b/pywb/recorder/recorderapp.py @@ -24,8 +24,7 @@ class RecorderApp(object): self.rec_source_name = kwargs.get('name', 'recorder') - self.create_buff_func = kwargs.get('create_buff_func', - self.default_create_buffer) + self.create_buff_func = kwargs.get('create_buff_func') or self.default_create_buffer self.write_queue = gevent.queue.Queue() gevent.spawn(self._write_loop) diff --git a/pywb/recorder/redisindexer.py b/pywb/recorder/redisindexer.py index 7fbb7d6f..52408c25 100644 --- a/pywb/recorder/redisindexer.py +++ b/pywb/recorder/redisindexer.py @@ -2,6 +2,7 @@ from warcio.timeutils import iso_date_to_timestamp from io import BytesIO import os +import tempfile from pywb.utils.canonicalize import calc_search_range from pywb.utils.format import res_template @@ -101,3 +102,29 @@ class WritableRedisIndexer(RedisIndexSource): return res return None + + +# ============================================================================ +class RedisPendingCounterTempBuffer(tempfile.SpooledTemporaryFile): + def __init__(self, max_size, redis_url, params, name, timeout=30): + redis_url = res_template(redis_url, params) + super(RedisPendingCounterTempBuffer, self).__init__(max_size=max_size) + self.redis, self.key = RedisIndexSource.parse_redis_url(redis_url) + self.timeout = timeout + + self.redis.incrby(self.key, 1) + self.redis.expire(self.key, self.timeout) + + def write(self, buf): + super(RedisPendingCounterTempBuffer, self).write(buf) + self.redis.expire(self.key, self.timeout) + + def close(self): + try: + super(RedisPendingCounterTempBuffer, self).close() + except: + traceback.print_exc() + + self.redis.incrby(self.key, -1) + self.redis.expire(self.key, self.timeout) + diff --git a/tests/test_record_dedup.py b/tests/test_record_dedup.py index 724a32cd..9b01e9e3 100644 --- a/tests/test_record_dedup.py +++ b/tests/test_record_dedup.py @@ -50,3 +50,7 @@ class TestRecordDedup(HttpBinLiveTests, CollsDirMixin, BaseConfigTest, FakeRedis # ensure only one response/request pair written assert records == ['response', 'request'] + + def test_redis_pending_count(self): + res = self.redis.get("pywb:test-dedup:pending") + assert res == b'0'