mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-24 06:59:52 +01:00
recorder: add max_idle_secs / close_idle_files() to close any open files that have not been modified longer than set threshold, in prep for webrecorder/webrecorder#92
indexer: add 'full_warc_prefix' for setting full path prefix in add_warc_file() (eg. for http load) for webrecorder/webrecorder#95
This commit is contained in:
parent
94d6098238
commit
45c8fcddbd
@ -29,15 +29,18 @@ class WritableRedisIndexer(RedisIndexSource):
|
|||||||
|
|
||||||
self.rel_path_template = kwargs.get('rel_path_template', '')
|
self.rel_path_template = kwargs.get('rel_path_template', '')
|
||||||
self.file_key_template = kwargs.get('file_key_template', '')
|
self.file_key_template = kwargs.get('file_key_template', '')
|
||||||
|
self.full_warc_prefix = kwargs.get('full_warc_prefix', '')
|
||||||
self.dupe_policy = kwargs.get('dupe_policy', WriteRevisitDupePolicy())
|
self.dupe_policy = kwargs.get('dupe_policy', WriteRevisitDupePolicy())
|
||||||
|
|
||||||
def add_warc_file(self, full_filename, params):
|
def add_warc_file(self, full_filename, params):
|
||||||
rel_path = res_template(self.rel_path_template, params)
|
rel_path = res_template(self.rel_path_template, params)
|
||||||
filename = os.path.relpath(full_filename, rel_path)
|
rel_filename = os.path.relpath(full_filename, rel_path)
|
||||||
|
|
||||||
file_key = res_template(self.file_key_template, params)
|
file_key = res_template(self.file_key_template, params)
|
||||||
|
|
||||||
self.redis.hset(file_key, filename, full_filename)
|
full_load_path = self.full_warc_prefix + full_filename
|
||||||
|
|
||||||
|
self.redis.hset(file_key, rel_filename, full_load_path)
|
||||||
|
|
||||||
def add_urls_to_index(self, stream, params, filename, length):
|
def add_urls_to_index(self, stream, params, filename, length):
|
||||||
rel_path = res_template(self.rel_path_template, params)
|
rel_path = res_template(self.rel_path_template, params)
|
||||||
|
@ -365,6 +365,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
|||||||
|
|
||||||
self._test_all_warcs('/warcs/FOO/', 1)
|
self._test_all_warcs('/warcs/FOO/', 1)
|
||||||
|
|
||||||
|
# Check two records in WARC
|
||||||
r = FakeStrictRedis.from_url('redis://localhost/2')
|
r = FakeStrictRedis.from_url('redis://localhost/2')
|
||||||
res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,')
|
res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,')
|
||||||
assert len(res) == 2
|
assert len(res) == 2
|
||||||
@ -388,7 +389,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
|||||||
|
|
||||||
assert len(writer.fh_cache) == 1
|
assert len(writer.fh_cache) == 1
|
||||||
|
|
||||||
writer.close_file(self.root_dir + '/warcs/FOO/')
|
writer.close_dir(self.root_dir + '/warcs/FOO/')
|
||||||
#writer.close_file({'param.recorder.coll': 'FOO'})
|
#writer.close_file({'param.recorder.coll': 'FOO'})
|
||||||
|
|
||||||
assert len(writer.fh_cache) == 0
|
assert len(writer.fh_cache) == 0
|
||||||
@ -403,6 +404,46 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
|
|||||||
warcs = r.hgetall('FOO:warc')
|
warcs = r.hgetall('FOO:warc')
|
||||||
assert len(warcs) == 2
|
assert len(warcs) == 2
|
||||||
|
|
||||||
|
def test_record_multiple_writes_rollover_idle(self):
|
||||||
|
warc_path = to_path(self.root_dir + '/warcs/GOO/ABC-{hostname}-{timestamp}.warc.gz')
|
||||||
|
|
||||||
|
rel_path = self.root_dir + '/warcs/'
|
||||||
|
|
||||||
|
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj',
|
||||||
|
file_key_template='{coll}:warc',
|
||||||
|
rel_path_template=rel_path)
|
||||||
|
|
||||||
|
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9)
|
||||||
|
recorder_app = RecorderApp(self.upstream_url, writer)
|
||||||
|
|
||||||
|
# First Record
|
||||||
|
resp = self._test_warc_write(recorder_app, 'httpbin.org',
|
||||||
|
'/get?foo=bar', '¶m.recorder.coll=GOO')
|
||||||
|
|
||||||
|
assert b'HTTP/1.1 200 OK' in resp.body
|
||||||
|
assert b'"foo": "bar"' in resp.body
|
||||||
|
|
||||||
|
# Second Record
|
||||||
|
resp = self._test_warc_write(recorder_app, 'httpbin.org',
|
||||||
|
'/get?boo=far', '¶m.recorder.coll=GOO')
|
||||||
|
|
||||||
|
assert b'HTTP/1.1 200 OK' in resp.body
|
||||||
|
assert b'"boo": "far"' in resp.body
|
||||||
|
|
||||||
|
self._test_all_warcs('/warcs/GOO/', 1)
|
||||||
|
|
||||||
|
time.sleep(1.0)
|
||||||
|
writer.close_idle_files()
|
||||||
|
|
||||||
|
# Third Record
|
||||||
|
resp = self._test_warc_write(recorder_app, 'httpbin.org',
|
||||||
|
'/get?goo=bar', '¶m.recorder.coll=GOO')
|
||||||
|
|
||||||
|
assert b'HTTP/1.1 200 OK' in resp.body
|
||||||
|
assert b'"goo": "bar"' in resp.body
|
||||||
|
|
||||||
|
self._test_all_warcs('/warcs/GOO/', 2)
|
||||||
|
|
||||||
def test_warcinfo_record(self):
|
def test_warcinfo_record(self):
|
||||||
simplewriter = SimpleTempWARCWriter(gzip=False)
|
simplewriter = SimpleTempWARCWriter(gzip=False)
|
||||||
params = {'software': 'recorder test',
|
params = {'software': 'recorder test',
|
||||||
|
@ -259,7 +259,7 @@ class Digester(object):
|
|||||||
# ============================================================================
|
# ============================================================================
|
||||||
class MultiFileWARCWriter(BaseWARCWriter):
|
class MultiFileWARCWriter(BaseWARCWriter):
|
||||||
def __init__(self, dir_template, filename_template=None, max_size=0,
|
def __init__(self, dir_template, filename_template=None, max_size=0,
|
||||||
*args, **kwargs):
|
max_idle_secs=1800, *args, **kwargs):
|
||||||
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
|
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
if not filename_template:
|
if not filename_template:
|
||||||
@ -272,6 +272,10 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
self.dir_template = dir_template
|
self.dir_template = dir_template
|
||||||
self.filename_template = filename_template
|
self.filename_template = filename_template
|
||||||
self.max_size = max_size
|
self.max_size = max_size
|
||||||
|
if max_idle_secs > 0:
|
||||||
|
self.max_idle_time = datetime.timedelta(seconds=max_idle_secs)
|
||||||
|
else:
|
||||||
|
self.max_idle_time = None
|
||||||
|
|
||||||
self.fh_cache = {}
|
self.fh_cache = {}
|
||||||
|
|
||||||
@ -300,7 +304,7 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
fcntl.flock(fh, fcntl.LOCK_UN)
|
fcntl.flock(fh, fcntl.LOCK_UN)
|
||||||
fh.close()
|
fh.close()
|
||||||
|
|
||||||
def close_file(self, full_dir):
|
def close_dir(self, full_dir):
|
||||||
#full_dir = res_template(self.dir_template, params)
|
#full_dir = res_template(self.dir_template, params)
|
||||||
result = self.fh_cache.pop(full_dir, None)
|
result = self.fh_cache.pop(full_dir, None)
|
||||||
if not result:
|
if not result:
|
||||||
@ -371,13 +375,30 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
self.fh_cache[full_dir] = (out, filename)
|
self.fh_cache[full_dir] = (out, filename)
|
||||||
|
|
||||||
def close(self):
|
def iter_open_files(self):
|
||||||
for n, v in self.fh_cache.items():
|
for n, v in list(self.fh_cache.items()):
|
||||||
out, filename = v
|
out, filename = v
|
||||||
|
yield n, out, filename
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
for dirname, out, filename in self.iter_open_files():
|
||||||
self._close_file(out)
|
self._close_file(out)
|
||||||
|
|
||||||
self.fh_cache = {}
|
self.fh_cache = {}
|
||||||
|
|
||||||
|
def close_idle_files(self):
|
||||||
|
if not self.max_idle_time:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
|
||||||
|
for dirname, out, filename in self.iter_open_files():
|
||||||
|
mtime = os.path.getmtime(filename)
|
||||||
|
mtime = datetime.datetime.fromtimestamp(mtime)
|
||||||
|
if (now - mtime) > self.max_idle_time:
|
||||||
|
print('Closing idle ' + filename)
|
||||||
|
self.close_dir(dirname)
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
class PerRecordWARCWriter(MultiFileWARCWriter):
|
class PerRecordWARCWriter(MultiFileWARCWriter):
|
||||||
|
1
setup.py
1
setup.py
@ -36,6 +36,7 @@ setup(
|
|||||||
],
|
],
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'pywb>=0.30.0',
|
'pywb>=0.30.0',
|
||||||
|
'werkzeug',
|
||||||
],
|
],
|
||||||
dependency_links=[
|
dependency_links=[
|
||||||
#'git+https://github.com/ikreymer/pywb.git@develop#egg=pywb-0.30.0-develop',
|
#'git+https://github.com/ikreymer/pywb.git@develop#egg=pywb-0.30.0-develop',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user