diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py index f864b51a..afa8a305 100644 --- a/recorder/redisindexer.py +++ b/recorder/redisindexer.py @@ -15,13 +15,23 @@ from recorder.filters import WriteRevisitDupePolicy #============================================================================== class WritableRedisIndexer(RedisIndexSource): - def __init__(self, redis_url, rel_path_template='', name='recorder', + def __init__(self, redis_url, rel_path_template='', + file_key_template='', name='recorder', dupe_policy=WriteRevisitDupePolicy()): super(WritableRedisIndexer, self).__init__(redis_url) self.cdx_lookup = SimpleAggregator({name: self}) self.rel_path_template = rel_path_template + self.file_key_template = file_key_template self.dupe_policy = dupe_policy + def add_warc_file(self, full_filename, params): + rel_path = res_template(self.rel_path_template, params) + filename = os.path.relpath(full_filename, rel_path) + + file_key = res_template(self.file_key_template, params) + + self.redis.hset(file_key, filename, full_filename) + def index_records(self, stream, params, filename=None): rel_path = res_template(self.rel_path_template, params) filename = os.path.relpath(filename, rel_path) diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 1941860e..d83f3375 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -13,7 +13,8 @@ from pytest import raises from recorder.recorderapp import RecorderApp from recorder.redisindexer import WritableRedisIndexer from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter -from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy +from recorder.filters import ExcludeSpecificHeaders +from recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy from webagg.utils import MementoUtils @@ -47,6 +48,13 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) + def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()): + dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + file_key_template='{user}:{coll}:warc', + rel_path_template=self.root_dir + '/warcs/', + dupe_policy=dupe_policy) + + return dedup_index def _test_warc_write(self, recorder_app, host, path, other_params=''): url = 'http://' + host + path @@ -176,9 +184,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') - - dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', - rel_path_template=self.root_dir + '/warcs/') + dedup_index = self._get_dedup_index() recorder_app = RecorderApp(self.upstream_url, PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) @@ -204,14 +210,16 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert cdx['filename'].startswith('USER/COLL/') assert cdx['filename'].endswith('.warc.gz') + warcs = r.hgetall('USER:COLL:warc') + full_path = self.root_dir + '/warcs/' + cdx['filename'] + assert warcs == {cdx['filename'].encode('utf-8'): full_path.encode('utf-8')} + @patch('redis.StrictRedis', FakeStrictRedis) def test_record_param_user_coll_revisit(self): warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') - - dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', - rel_path_template=self.root_dir + '/warcs/') + dedup_index = self._get_dedup_index() recorder_app = RecorderApp(self.upstream_url, PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) @@ -240,6 +248,10 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): fullwarc = os.path.join(self.root_dir, 'warcs', cdx['filename']) + warcs = r.hgetall('USER:COLL:warc') + assert len(warcs) == 2 + assert warcs[cdx['filename'].encode('utf-8')] == fullwarc.encode('utf-8') + with open(fullwarc, 'rb') as fh: decomp = DecompressingBufferedReader(fh) # Test refers-to headers @@ -254,10 +266,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): def test_record_param_user_coll_skip(self): warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') - - dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', - rel_path_template=self.root_dir + '/warcs/', - dupe_policy=SkipDupePolicy()) + dedup_index = self._get_dedup_index(dupe_policy=SkipDupePolicy()) recorder_app = RecorderApp(self.upstream_url, PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) @@ -283,10 +292,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') - - dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', - rel_path_template=self.root_dir + '/warcs/', - dupe_policy=WriteDupePolicy()) + dedup_index = self._get_dedup_index(dupe_policy=WriteDupePolicy()) writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index) recorder_app = RecorderApp(self.upstream_url, writer) @@ -329,8 +335,10 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): rel_path = self.root_dir + '/warcs/' dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', + file_key_template='{coll}:warc', rel_path_template=rel_path) + writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index) recorder_app = RecorderApp(self.upstream_url, writer) @@ -382,3 +390,8 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?boo=far', '¶m.recorder.coll=FOO') + + self._test_all_warcs('/warcs/FOO/', 2) + + warcs = r.hgetall('FOO:warc') + assert len(warcs) == 2 diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 9ae4bf9e..99c93e06 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -20,7 +20,7 @@ from pywb.utils.loaders import LimitReader, to_native_str from pywb.utils.bufferedreaders import BufferedReader from pywb.utils.timeutils import timestamp20_now -from webagg.utils import ParamFormatter +from webagg.utils import ParamFormatter, res_template from recorder.filters import ExcludeNone @@ -107,8 +107,8 @@ class BaseWARCWriter(object): print('Skipping due to dedup') return - formatter = ParamFormatter(params, name=self.rec_source_name) - self._do_write_req_resp(req, resp, params, formatter) + params['_formatter'] = ParamFormatter(params, name=self.rec_source_name) + self._do_write_req_resp(req, resp, params) def _check_revisit(self, record, params): if not self.dedup_index: @@ -248,18 +248,24 @@ class MultiFileWARCWriter(BaseWARCWriter): self.fh_cache = {} - def _open_file(self, dir_): + def _open_file(self, dir_, params): timestamp = timestamp20_now() filename = dir_ + self.filename_template.format(hostname=self.hostname, timestamp=timestamp) + path, name = os.path.split(filename) + try: - os.makedirs(os.path.dirname(filename)) + os.makedirs(path) except: pass fh = open(filename, 'a+b') + + if self.dedup_index: + self.dedup_index.add_warc_file(filename, params) + return fh, filename def _close_file(self, fh): @@ -272,8 +278,8 @@ class MultiFileWARCWriter(BaseWARCWriter): out, filename = result self._close_file(out) - def _do_write_req_resp(self, req, resp, params, formatter): - full_dir = formatter.format(self.dir_template) + def _do_write_req_resp(self, req, resp, params): + full_dir = res_template(self.dir_template, params) result = self.fh_cache.get(full_dir) @@ -283,7 +289,7 @@ class MultiFileWARCWriter(BaseWARCWriter): out, filename = result is_new = False else: - out, filename = self._open_file(full_dir) + out, filename = self._open_file(full_dir, params) is_new = True try: