diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 01648a82..1941860e 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -12,7 +12,7 @@ from pytest import raises from recorder.recorderapp import RecorderApp from recorder.redisindexer import WritableRedisIndexer -from recorder.warcwriter import PerRecordWARCWriter, SingleFileWARCWriter +from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy from webagg.utils import MementoUtils @@ -288,8 +288,8 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): rel_path_template=self.root_dir + '/warcs/', dupe_policy=WriteDupePolicy()) - recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) + writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index) + recorder_app = RecorderApp(self.upstream_url, writer) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') @@ -307,29 +307,31 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit'] - # Single File - def test_record_single_file_warc_1(self): + assert len(writer.fh_cache) == 0 + + # Keep Open + def test_record_file_warc_keep_open(self): path = to_path(self.root_dir + '/warcs/A.warc.gz') - recorder_app = RecorderApp(self.upstream_url, - SingleFileWARCWriter(path)) + writer = MultiFileWARCWriter(path) + recorder_app = RecorderApp(self.upstream_url, writer) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body assert os.path.isfile(path) - + assert len(writer.fh_cache) == 1 @patch('redis.StrictRedis', FakeStrictRedis) - def test_record_single_file_multiple_writes(self): - warc_path = to_path(self.root_dir + '/warcs/FOO/rec-{hostname}-{timestamp}.warc.gz') + def test_record_multiple_writes_keep_open(self): + warc_path = to_path(self.root_dir + '/warcs/FOO/ABC-{hostname}-{timestamp}.warc.gz') rel_path = self.root_dir + '/warcs/' dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', rel_path_template=rel_path) - writer = SingleFileWARCWriter(warc_path, dedup_index=dedup_index) + writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index) recorder_app = RecorderApp(self.upstream_url, writer) # First Record @@ -370,10 +372,13 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert cdxres == res - # close this file + assert len(writer.fh_cache) == 1 + + writer.remove_file(self.root_dir + '/warcs/FOO/') + + assert len(writer.fh_cache) == 0 + writer.close() resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?boo=far', '¶m.recorder.coll=FOO') - - self._test_all_warcs('/warcs/FOO/', 2) diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 790c9faf..9ae4bf9e 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -8,7 +8,6 @@ import sys import os import six - import traceback from collections import OrderedDict @@ -39,6 +38,8 @@ class BaseWARCWriter(object): BUFF_SIZE = 8192 + FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' + def __init__(self, gzip=True, dedup_index=None, name='recorder', header_filter=ExcludeNone()): self.gzip = gzip @@ -228,105 +229,111 @@ class Digester(object): # ============================================================================ -class PerRecordWARCWriter(BaseWARCWriter): - DEF_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' +class MultiFileWARCWriter(BaseWARCWriter): + + def __init__(self, dir_template, filename_template=None, max_size=0, + *args, **kwargs): + super(MultiFileWARCWriter, self).__init__(*args, **kwargs) - def __init__(self, warcdir, filename_template=None, *args, **kwargs): - super(PerRecordWARCWriter, self).__init__(*args, **kwargs) if not filename_template: - filename_template = self.DEF_TEMPLATE - self.filename_template = warcdir + filename_template + dir_template, filename_template = os.path.split(dir_template) + dir_template += os.path.sep - def _do_write_req_resp(self, req, resp, params, formatter): - #resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') - #req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') + if not filename_template: + filename_template = self.FILE_TEMPLATE + + self.dir_template = dir_template + self.filename_template = filename_template + self.max_size = max_size + + self.fh_cache = {} + + def _open_file(self, dir_): timestamp = timestamp20_now() - filename = formatter.format(self.filename_template, - hostname=self.hostname, - timestamp=timestamp) - - path, name = os.path.split(filename) + filename = dir_ + self.filename_template.format(hostname=self.hostname, + timestamp=timestamp) try: - os.makedirs(path) + os.makedirs(os.path.dirname(filename)) except: pass - url = resp.rec_headers.get('WARC-Target-Uri') - print('Writing resp/req for {0} to {1}'.format(url, filename)) + fh = open(filename, 'a+b') + return fh, filename + + def _close_file(self, fh): + fcntl.flock(fh, fcntl.LOCK_UN) + fh.close() + + def remove_file(self, full_dir): + result = self.fh_cache.pop(full_dir, None) + if result: + out, filename = result + self._close_file(out) + + def _do_write_req_resp(self, req, resp, params, formatter): + full_dir = formatter.format(self.dir_template) + + result = self.fh_cache.get(full_dir) + + close_file = False + + if result: + out, filename = result + is_new = False + else: + out, filename = self._open_file(full_dir) + is_new = True + + try: + url = resp.rec_headers.get('WARC-Target-Uri') + print('Writing req/resp {0} to {1} '.format(url, filename)) - with open(filename, 'a+b') as out: start = out.tell() self._write_warc_record(out, resp) self._write_warc_record(out, req) out.flush() + + new_size = out.tell() + out.seek(start) if self.dedup_index: self.dedup_index.index_records(out, params, filename=filename) + except Exception as e: + traceback.print_exc() + close_file = True -# ============================================================================ -class SingleFileWARCWriter(BaseWARCWriter): - def __init__(self, filename_template, dir_prefix='', max_size=0, *args, **kwargs): - super(SingleFileWARCWriter, self).__init__(*args, **kwargs) - self.dir_prefix = dir_prefix - self.filename_template = filename_template - self.max_size = max_size - self._open_file() + finally: + # check for rollover + if self.max_size and new_size > self.max_size: + close_file = True - def _open_file(self): - timestamp = timestamp20_now() + if close_file: + if is_new: + self._close_file(out) + else: + self.remove_file(full_dir) - filename = self.filename_template.format(hostname=self.hostname, - timestamp=timestamp) - - self.filename = self.dir_prefix + filename - - try: - os.makedirs(os.path.dirname(self.filename)) - except: - pass - - self._fh = open(self.filename, 'a+b') - - fcntl.flock(self._fh, fcntl.LOCK_EX | fcntl.LOCK_NB) - - def _do_write_req_resp(self, req, resp, params, formatter): - url = resp.rec_headers.get('WARC-Target-Uri') - print('Writing {0} to {1} '.format(url, self.filename)) - - if not self._fh: - self._open_file() - - out = self._fh - - start = out.tell() - - self._write_warc_record(out, resp) - self._write_warc_record(out, req) - - out.flush() - - new_size = out.tell() - - out.seek(start) - - if self.dedup_index: - self.dedup_index.index_records(out, params, filename=self.filename) - - # check for rollover - if self.max_size and new_size > self.max_size: - self.close() + elif is_new: + fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB) + self.fh_cache[full_dir] = (out, filename) def close(self): - if not self._fh: - return None + for n, v in self.fh_cache.items(): + out, filename = v + self._close_file(out) - fcntl.flock(self._fh, fcntl.LOCK_UN) + self.fh_cache = {} + + +# ============================================================================ +class PerRecordWARCWriter(MultiFileWARCWriter): + def __init__(self, *args, **kwargs): + kwargs['max_size'] = 1 + super(PerRecordWARCWriter, self).__init__(*args, **kwargs) - self._fh.close() - self._fh = None