mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
Recorder Pending count (#637)
* recorder: add pending counter (in redis) to when using redis based dedup system, supports webrecorder/browsertrix#44
This commit is contained in:
parent
626da99899
commit
abb76911f5
@ -11,7 +11,7 @@ from wsgiprox.wsgiprox import WSGIProxMiddleware
|
|||||||
from pywb.recorder.multifilewarcwriter import MultiFileWARCWriter
|
from pywb.recorder.multifilewarcwriter import MultiFileWARCWriter
|
||||||
from pywb.recorder.recorderapp import RecorderApp
|
from pywb.recorder.recorderapp import RecorderApp
|
||||||
from pywb.recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy
|
from pywb.recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy
|
||||||
from pywb.recorder.redisindexer import WritableRedisIndexer
|
from pywb.recorder.redisindexer import WritableRedisIndexer, RedisPendingCounterTempBuffer
|
||||||
|
|
||||||
from pywb.utils.loaders import load_yaml_config
|
from pywb.utils.loaders import load_yaml_config
|
||||||
from pywb.utils.geventserver import GeventServer
|
from pywb.utils.geventserver import GeventServer
|
||||||
@ -244,8 +244,16 @@ class FrontEndApp(object):
|
|||||||
dedup_index=dedup_index,
|
dedup_index=dedup_index,
|
||||||
dedup_by_url=dedup_by_url)
|
dedup_by_url=dedup_by_url)
|
||||||
|
|
||||||
|
if dedup_policy:
|
||||||
|
pending_counter = self.warcserver.dedup_index_url.replace(':cdxj', ':pending')
|
||||||
|
pending_timeout = recorder_config.get('pending_timeout', 30)
|
||||||
|
create_buff_func = lambda params, name: RedisPendingCounterTempBuffer(512 * 1024, pending_counter, params, name, pending_timeout)
|
||||||
|
else:
|
||||||
|
create_buff_func = None
|
||||||
|
|
||||||
self.recorder = RecorderApp(self.RECORD_SERVER % str(self.warcserver_server.port), warc_writer,
|
self.recorder = RecorderApp(self.RECORD_SERVER % str(self.warcserver_server.port), warc_writer,
|
||||||
accept_colls=recorder_config.get('source_filter'))
|
accept_colls=recorder_config.get('source_filter'),
|
||||||
|
create_buff_func=create_buff_func)
|
||||||
|
|
||||||
recorder_server = GeventServer(self.recorder, port=0)
|
recorder_server = GeventServer(self.recorder, port=0)
|
||||||
|
|
||||||
|
@ -24,8 +24,7 @@ class RecorderApp(object):
|
|||||||
|
|
||||||
self.rec_source_name = kwargs.get('name', 'recorder')
|
self.rec_source_name = kwargs.get('name', 'recorder')
|
||||||
|
|
||||||
self.create_buff_func = kwargs.get('create_buff_func',
|
self.create_buff_func = kwargs.get('create_buff_func') or self.default_create_buffer
|
||||||
self.default_create_buffer)
|
|
||||||
|
|
||||||
self.write_queue = gevent.queue.Queue()
|
self.write_queue = gevent.queue.Queue()
|
||||||
gevent.spawn(self._write_loop)
|
gevent.spawn(self._write_loop)
|
||||||
|
@ -2,6 +2,7 @@ from warcio.timeutils import iso_date_to_timestamp
|
|||||||
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import os
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
from pywb.utils.canonicalize import calc_search_range
|
from pywb.utils.canonicalize import calc_search_range
|
||||||
from pywb.utils.format import res_template
|
from pywb.utils.format import res_template
|
||||||
@ -101,3 +102,29 @@ class WritableRedisIndexer(RedisIndexSource):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
class RedisPendingCounterTempBuffer(tempfile.SpooledTemporaryFile):
|
||||||
|
def __init__(self, max_size, redis_url, params, name, timeout=30):
|
||||||
|
redis_url = res_template(redis_url, params)
|
||||||
|
super(RedisPendingCounterTempBuffer, self).__init__(max_size=max_size)
|
||||||
|
self.redis, self.key = RedisIndexSource.parse_redis_url(redis_url)
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
self.redis.incrby(self.key, 1)
|
||||||
|
self.redis.expire(self.key, self.timeout)
|
||||||
|
|
||||||
|
def write(self, buf):
|
||||||
|
super(RedisPendingCounterTempBuffer, self).write(buf)
|
||||||
|
self.redis.expire(self.key, self.timeout)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
try:
|
||||||
|
super(RedisPendingCounterTempBuffer, self).close()
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
self.redis.incrby(self.key, -1)
|
||||||
|
self.redis.expire(self.key, self.timeout)
|
||||||
|
|
||||||
|
@ -50,3 +50,7 @@ class TestRecordDedup(HttpBinLiveTests, CollsDirMixin, BaseConfigTest, FakeRedis
|
|||||||
|
|
||||||
# ensure only one response/request pair written
|
# ensure only one response/request pair written
|
||||||
assert records == ['response', 'request']
|
assert records == ['response', 'request']
|
||||||
|
|
||||||
|
def test_redis_pending_count(self):
|
||||||
|
res = self.redis.get("pywb:test-dedup:pending")
|
||||||
|
assert res == b'0'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user