From 01c21d3a436a9535a4ecd07b718201beb943b3ff Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 2 Apr 2016 21:36:36 -0700 Subject: [PATCH] recorder: redis indexer accepts arg list, supports separate redis and key_template args add length param to add_urls_to_index() in redis indexer, return cdx list --- recorder/redisindexer.py | 30 +++++++++++++++++++----------- recorder/test/test_recorder.py | 11 ++++++----- recorder/warcwriter.py | 8 +++++--- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py index 33d83692..886cb62a 100644 --- a/recorder/redisindexer.py +++ b/recorder/redisindexer.py @@ -15,14 +15,21 @@ from recorder.filters import WriteRevisitDupePolicy #============================================================================== class WritableRedisIndexer(RedisIndexSource): - def __init__(self, redis_url, rel_path_template='', - file_key_template='', name='recorder', - dupe_policy=WriteRevisitDupePolicy()): - super(WritableRedisIndexer, self).__init__(redis_url) + def __init__(self, *args, **kwargs): + redis_url = kwargs.get('redis_url') + redis = kwargs.get('redis') + cdx_key_template = kwargs.get('cdx_key_template') + + super(WritableRedisIndexer, self).__init__(redis_url, + redis, + cdx_key_template) + + name = kwargs.get('name', 'recorder') self.cdx_lookup = SimpleAggregator({name: self}) - self.rel_path_template = rel_path_template - self.file_key_template = file_key_template - self.dupe_policy = dupe_policy + + self.rel_path_template = kwargs.get('rel_path_template', '') + self.file_key_template = kwargs.get('file_key_template', '') + self.dupe_policy = kwargs.get('dupe_policy', WriteRevisitDupePolicy()) def add_warc_file(self, full_filename, params): rel_path = res_template(self.rel_path_template, params) @@ -32,7 +39,7 @@ class WritableRedisIndexer(RedisIndexSource): self.redis.hset(file_key, filename, full_filename) - def add_urls_to_index(self, stream, params, filename=None): + def add_urls_to_index(self, stream, params, filename, length): rel_path = res_template(self.rel_path_template, params) filename = os.path.relpath(filename, rel_path) @@ -42,12 +49,13 @@ class WritableRedisIndexer(RedisIndexSource): z_key = res_template(self.redis_key_template, params) - cdxes = cdxout.getvalue() - for cdx in cdxes.split(b'\n'): + cdx_list = cdxout.getvalue().rstrip().split(b'\n') + + for cdx in cdx_list: if cdx: self.redis.zadd(z_key, 0, cdx) - return cdx + return cdx_list def lookup_revisit(self, params, digest, url, iso_dt): params['url'] = url diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 707f2e95..d763953f 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -42,14 +42,14 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) def setup_class(cls): super(TestRecorder, cls).setup_class() - warcs = to_path(cls.root_dir + '/warcs') + cls.warcs_dir = to_path(cls.root_dir + '/warcs') - os.makedirs(warcs) + os.makedirs(cls.warcs_dir) cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()): - dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{user}:{coll}:cdxj', file_key_template='{user}:{coll}:warc', rel_path_template=self.root_dir + '/warcs/', dupe_policy=dupe_policy) @@ -335,7 +335,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) rel_path = self.root_dir + '/warcs/' - dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', + dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', file_key_template='{coll}:warc', rel_path_template=rel_path) @@ -383,7 +383,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert len(writer.fh_cache) == 1 - writer.close_file({'param.recorder.coll': 'FOO'}) + writer.close_file(self.root_dir + '/warcs/FOO/') + #writer.close_file({'param.recorder.coll': 'FOO'}) assert len(writer.fh_cache) == 0 diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index e0d51154..db88e45e 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -275,8 +275,8 @@ class MultiFileWARCWriter(BaseWARCWriter): fcntl.flock(fh, fcntl.LOCK_UN) fh.close() - def close_file(self, params): - full_dir = res_template(self.dir_template, params) + def close_file(self, full_dir): + #full_dir = res_template(self.dir_template, params) result = self.fh_cache.pop(full_dir, None) if not result: return @@ -315,7 +315,9 @@ class MultiFileWARCWriter(BaseWARCWriter): out.seek(start) if self.dedup_index: - self.dedup_index.add_urls_to_index(out, params, filename=filename) + self.dedup_index.add_urls_to_index(out, params, + filename, + new_size - start) except Exception as e: traceback.print_exc()