mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 08:04:49 +01:00
recorder: redis indexer accepts arg list, supports separate redis and key_template args
add length param to add_urls_to_index() in redis indexer, return cdx list
This commit is contained in:
parent
6157cebcc9
commit
01c21d3a43
@ -15,14 +15,21 @@ from recorder.filters import WriteRevisitDupePolicy
|
||||
|
||||
#==============================================================================
|
||||
class WritableRedisIndexer(RedisIndexSource):
|
||||
def __init__(self, redis_url, rel_path_template='',
|
||||
file_key_template='', name='recorder',
|
||||
dupe_policy=WriteRevisitDupePolicy()):
|
||||
super(WritableRedisIndexer, self).__init__(redis_url)
|
||||
def __init__(self, *args, **kwargs):
|
||||
redis_url = kwargs.get('redis_url')
|
||||
redis = kwargs.get('redis')
|
||||
cdx_key_template = kwargs.get('cdx_key_template')
|
||||
|
||||
super(WritableRedisIndexer, self).__init__(redis_url,
|
||||
redis,
|
||||
cdx_key_template)
|
||||
|
||||
name = kwargs.get('name', 'recorder')
|
||||
self.cdx_lookup = SimpleAggregator({name: self})
|
||||
self.rel_path_template = rel_path_template
|
||||
self.file_key_template = file_key_template
|
||||
self.dupe_policy = dupe_policy
|
||||
|
||||
self.rel_path_template = kwargs.get('rel_path_template', '')
|
||||
self.file_key_template = kwargs.get('file_key_template', '')
|
||||
self.dupe_policy = kwargs.get('dupe_policy', WriteRevisitDupePolicy())
|
||||
|
||||
def add_warc_file(self, full_filename, params):
|
||||
rel_path = res_template(self.rel_path_template, params)
|
||||
@ -32,7 +39,7 @@ class WritableRedisIndexer(RedisIndexSource):
|
||||
|
||||
self.redis.hset(file_key, filename, full_filename)
|
||||
|
||||
def add_urls_to_index(self, stream, params, filename=None):
|
||||
def add_urls_to_index(self, stream, params, filename, length):
|
||||
rel_path = res_template(self.rel_path_template, params)
|
||||
filename = os.path.relpath(filename, rel_path)
|
||||
|
||||
@ -42,12 +49,13 @@ class WritableRedisIndexer(RedisIndexSource):
|
||||
|
||||
z_key = res_template(self.redis_key_template, params)
|
||||
|
||||
cdxes = cdxout.getvalue()
|
||||
for cdx in cdxes.split(b'\n'):
|
||||
cdx_list = cdxout.getvalue().rstrip().split(b'\n')
|
||||
|
||||
for cdx in cdx_list:
|
||||
if cdx:
|
||||
self.redis.zadd(z_key, 0, cdx)
|
||||
|
||||
return cdx
|
||||
return cdx_list
|
||||
|
||||
def lookup_revisit(self, params, digest, url, iso_dt):
|
||||
params['url'] = url
|
||||
|
@ -42,14 +42,14 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
||||
def setup_class(cls):
|
||||
super(TestRecorder, cls).setup_class()
|
||||
|
||||
warcs = to_path(cls.root_dir + '/warcs')
|
||||
cls.warcs_dir = to_path(cls.root_dir + '/warcs')
|
||||
|
||||
os.makedirs(warcs)
|
||||
os.makedirs(cls.warcs_dir)
|
||||
|
||||
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
|
||||
|
||||
def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()):
|
||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
||||
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{user}:{coll}:cdxj',
|
||||
file_key_template='{user}:{coll}:warc',
|
||||
rel_path_template=self.root_dir + '/warcs/',
|
||||
dupe_policy=dupe_policy)
|
||||
@ -335,7 +335,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
||||
|
||||
rel_path = self.root_dir + '/warcs/'
|
||||
|
||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj',
|
||||
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj',
|
||||
file_key_template='{coll}:warc',
|
||||
rel_path_template=rel_path)
|
||||
|
||||
@ -383,7 +383,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
||||
|
||||
assert len(writer.fh_cache) == 1
|
||||
|
||||
writer.close_file({'param.recorder.coll': 'FOO'})
|
||||
writer.close_file(self.root_dir + '/warcs/FOO/')
|
||||
#writer.close_file({'param.recorder.coll': 'FOO'})
|
||||
|
||||
assert len(writer.fh_cache) == 0
|
||||
|
||||
|
@ -275,8 +275,8 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
||||
fcntl.flock(fh, fcntl.LOCK_UN)
|
||||
fh.close()
|
||||
|
||||
def close_file(self, params):
|
||||
full_dir = res_template(self.dir_template, params)
|
||||
def close_file(self, full_dir):
|
||||
#full_dir = res_template(self.dir_template, params)
|
||||
result = self.fh_cache.pop(full_dir, None)
|
||||
if not result:
|
||||
return
|
||||
@ -315,7 +315,9 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
||||
out.seek(start)
|
||||
|
||||
if self.dedup_index:
|
||||
self.dedup_index.add_urls_to_index(out, params, filename=filename)
|
||||
self.dedup_index.add_urls_to_index(out, params,
|
||||
filename,
|
||||
new_size - start)
|
||||
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
Loading…
x
Reference in New Issue
Block a user