From 45c8fcddbdac25c53889bccc77632b4a1e262ced Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 11 May 2016 21:40:02 -0700 Subject: [PATCH] recorder: add max_idle_secs / close_idle_files() to close any open files that have not been modified longer than set threshold, in prep for webrecorder/webrecorder#92 indexer: add 'full_warc_prefix' for setting full path prefix in add_warc_file() (eg. for http load) for webrecorder/webrecorder#95 --- recorder/redisindexer.py | 7 ++++-- recorder/test/test_recorder.py | 43 +++++++++++++++++++++++++++++++++- recorder/warcwriter.py | 29 +++++++++++++++++++---- setup.py | 1 + 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py index c3fa1c93..577bf036 100644 --- a/recorder/redisindexer.py +++ b/recorder/redisindexer.py @@ -29,15 +29,18 @@ class WritableRedisIndexer(RedisIndexSource): self.rel_path_template = kwargs.get('rel_path_template', '') self.file_key_template = kwargs.get('file_key_template', '') + self.full_warc_prefix = kwargs.get('full_warc_prefix', '') self.dupe_policy = kwargs.get('dupe_policy', WriteRevisitDupePolicy()) def add_warc_file(self, full_filename, params): rel_path = res_template(self.rel_path_template, params) - filename = os.path.relpath(full_filename, rel_path) + rel_filename = os.path.relpath(full_filename, rel_path) file_key = res_template(self.file_key_template, params) - self.redis.hset(file_key, filename, full_filename) + full_load_path = self.full_warc_prefix + full_filename + + self.redis.hset(file_key, rel_filename, full_load_path) def add_urls_to_index(self, stream, params, filename, length): rel_path = res_template(self.rel_path_template, params) diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 97a47f46..1283e022 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -365,6 +365,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) self._test_all_warcs('/warcs/FOO/', 1) + # Check two records in WARC r = FakeStrictRedis.from_url('redis://localhost/2') res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,') assert len(res) == 2 @@ -388,7 +389,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert len(writer.fh_cache) == 1 - writer.close_file(self.root_dir + '/warcs/FOO/') + writer.close_dir(self.root_dir + '/warcs/FOO/') #writer.close_file({'param.recorder.coll': 'FOO'}) assert len(writer.fh_cache) == 0 @@ -403,6 +404,46 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) warcs = r.hgetall('FOO:warc') assert len(warcs) == 2 + def test_record_multiple_writes_rollover_idle(self): + warc_path = to_path(self.root_dir + '/warcs/GOO/ABC-{hostname}-{timestamp}.warc.gz') + + rel_path = self.root_dir + '/warcs/' + + dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', + file_key_template='{coll}:warc', + rel_path_template=rel_path) + + writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9) + recorder_app = RecorderApp(self.upstream_url, writer) + + # First Record + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.coll=GOO') + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + # Second Record + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?boo=far', '¶m.recorder.coll=GOO') + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"boo": "far"' in resp.body + + self._test_all_warcs('/warcs/GOO/', 1) + + time.sleep(1.0) + writer.close_idle_files() + + # Third Record + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?goo=bar', '¶m.recorder.coll=GOO') + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"goo": "bar"' in resp.body + + self._test_all_warcs('/warcs/GOO/', 2) + def test_warcinfo_record(self): simplewriter = SimpleTempWARCWriter(gzip=False) params = {'software': 'recorder test', diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 896ff626..3008d9e9 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -259,7 +259,7 @@ class Digester(object): # ============================================================================ class MultiFileWARCWriter(BaseWARCWriter): def __init__(self, dir_template, filename_template=None, max_size=0, - *args, **kwargs): + max_idle_secs=1800, *args, **kwargs): super(MultiFileWARCWriter, self).__init__(*args, **kwargs) if not filename_template: @@ -272,6 +272,10 @@ class MultiFileWARCWriter(BaseWARCWriter): self.dir_template = dir_template self.filename_template = filename_template self.max_size = max_size + if max_idle_secs > 0: + self.max_idle_time = datetime.timedelta(seconds=max_idle_secs) + else: + self.max_idle_time = None self.fh_cache = {} @@ -300,7 +304,7 @@ class MultiFileWARCWriter(BaseWARCWriter): fcntl.flock(fh, fcntl.LOCK_UN) fh.close() - def close_file(self, full_dir): + def close_dir(self, full_dir): #full_dir = res_template(self.dir_template, params) result = self.fh_cache.pop(full_dir, None) if not result: @@ -371,13 +375,30 @@ class MultiFileWARCWriter(BaseWARCWriter): fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB) self.fh_cache[full_dir] = (out, filename) - def close(self): - for n, v in self.fh_cache.items(): + def iter_open_files(self): + for n, v in list(self.fh_cache.items()): out, filename = v + yield n, out, filename + + def close(self): + for dirname, out, filename in self.iter_open_files(): self._close_file(out) self.fh_cache = {} + def close_idle_files(self): + if not self.max_idle_time: + return + + now = datetime.datetime.now() + + for dirname, out, filename in self.iter_open_files(): + mtime = os.path.getmtime(filename) + mtime = datetime.datetime.fromtimestamp(mtime) + if (now - mtime) > self.max_idle_time: + print('Closing idle ' + filename) + self.close_dir(dirname) + # ============================================================================ class PerRecordWARCWriter(MultiFileWARCWriter): diff --git a/setup.py b/setup.py index 6bc77d7a..16d913e6 100755 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ setup( ], install_requires=[ 'pywb>=0.30.0', + 'werkzeug', ], dependency_links=[ #'git+https://github.com/ikreymer/pywb.git@develop#egg=pywb-0.30.0-develop',