1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00

recorder: use more general MultiFileWARCWriter, supporting both keeping file open

and one-warc-per record use cases
This commit is contained in:
Ilya Kreymer 2016-03-18 21:40:41 -07:00
parent e81457df5f
commit 3452cf39e0
2 changed files with 100 additions and 88 deletions

View File

@ -12,7 +12,7 @@ from pytest import raises
from recorder.recorderapp import RecorderApp from recorder.recorderapp import RecorderApp
from recorder.redisindexer import WritableRedisIndexer from recorder.redisindexer import WritableRedisIndexer
from recorder.warcwriter import PerRecordWARCWriter, SingleFileWARCWriter from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter
from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy
from webagg.utils import MementoUtils from webagg.utils import MementoUtils
@ -288,8 +288,8 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
rel_path_template=self.root_dir + '/warcs/', rel_path_template=self.root_dir + '/warcs/',
dupe_policy=WriteDupePolicy()) dupe_policy=WriteDupePolicy())
recorder_app = RecorderApp(self.upstream_url, writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index)
PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) recorder_app = RecorderApp(self.upstream_url, writer)
resp = self._test_warc_write(recorder_app, 'httpbin.org', resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL') '/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
@ -307,29 +307,31 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit'] assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit']
# Single File assert len(writer.fh_cache) == 0
def test_record_single_file_warc_1(self):
# Keep Open
def test_record_file_warc_keep_open(self):
path = to_path(self.root_dir + '/warcs/A.warc.gz') path = to_path(self.root_dir + '/warcs/A.warc.gz')
recorder_app = RecorderApp(self.upstream_url, writer = MultiFileWARCWriter(path)
SingleFileWARCWriter(path)) recorder_app = RecorderApp(self.upstream_url, writer)
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body assert b'"foo": "bar"' in resp.body
assert os.path.isfile(path) assert os.path.isfile(path)
assert len(writer.fh_cache) == 1
@patch('redis.StrictRedis', FakeStrictRedis) @patch('redis.StrictRedis', FakeStrictRedis)
def test_record_single_file_multiple_writes(self): def test_record_multiple_writes_keep_open(self):
warc_path = to_path(self.root_dir + '/warcs/FOO/rec-{hostname}-{timestamp}.warc.gz') warc_path = to_path(self.root_dir + '/warcs/FOO/ABC-{hostname}-{timestamp}.warc.gz')
rel_path = self.root_dir + '/warcs/' rel_path = self.root_dir + '/warcs/'
dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj',
rel_path_template=rel_path) rel_path_template=rel_path)
writer = SingleFileWARCWriter(warc_path, dedup_index=dedup_index) writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer) recorder_app = RecorderApp(self.upstream_url, writer)
# First Record # First Record
@ -370,10 +372,13 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
assert cdxres == res assert cdxres == res
# close this file assert len(writer.fh_cache) == 1
writer.remove_file(self.root_dir + '/warcs/FOO/')
assert len(writer.fh_cache) == 0
writer.close() writer.close()
resp = self._test_warc_write(recorder_app, 'httpbin.org', resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=FOO') '/get?boo=far', '&param.recorder.coll=FOO')
self._test_all_warcs('/warcs/FOO/', 2)

View File

@ -8,7 +8,6 @@ import sys
import os import os
import six import six
import traceback import traceback
from collections import OrderedDict from collections import OrderedDict
@ -39,6 +38,8 @@ class BaseWARCWriter(object):
BUFF_SIZE = 8192 BUFF_SIZE = 8192
FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
def __init__(self, gzip=True, dedup_index=None, name='recorder', def __init__(self, gzip=True, dedup_index=None, name='recorder',
header_filter=ExcludeNone()): header_filter=ExcludeNone()):
self.gzip = gzip self.gzip = gzip
@ -228,105 +229,111 @@ class Digester(object):
# ============================================================================ # ============================================================================
class PerRecordWARCWriter(BaseWARCWriter): class MultiFileWARCWriter(BaseWARCWriter):
DEF_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
def __init__(self, dir_template, filename_template=None, max_size=0,
*args, **kwargs):
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
def __init__(self, warcdir, filename_template=None, *args, **kwargs):
super(PerRecordWARCWriter, self).__init__(*args, **kwargs)
if not filename_template: if not filename_template:
filename_template = self.DEF_TEMPLATE dir_template, filename_template = os.path.split(dir_template)
self.filename_template = warcdir + filename_template dir_template += os.path.sep
def _do_write_req_resp(self, req, resp, params, formatter): if not filename_template:
#resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') filename_template = self.FILE_TEMPLATE
#req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ')
self.dir_template = dir_template
self.filename_template = filename_template
self.max_size = max_size
self.fh_cache = {}
def _open_file(self, dir_):
timestamp = timestamp20_now() timestamp = timestamp20_now()
filename = formatter.format(self.filename_template, filename = dir_ + self.filename_template.format(hostname=self.hostname,
hostname=self.hostname, timestamp=timestamp)
timestamp=timestamp)
path, name = os.path.split(filename)
try: try:
os.makedirs(path) os.makedirs(os.path.dirname(filename))
except: except:
pass pass
url = resp.rec_headers.get('WARC-Target-Uri') fh = open(filename, 'a+b')
print('Writing resp/req for {0} to {1}'.format(url, filename)) return fh, filename
def _close_file(self, fh):
fcntl.flock(fh, fcntl.LOCK_UN)
fh.close()
def remove_file(self, full_dir):
result = self.fh_cache.pop(full_dir, None)
if result:
out, filename = result
self._close_file(out)
def _do_write_req_resp(self, req, resp, params, formatter):
full_dir = formatter.format(self.dir_template)
result = self.fh_cache.get(full_dir)
close_file = False
if result:
out, filename = result
is_new = False
else:
out, filename = self._open_file(full_dir)
is_new = True
try:
url = resp.rec_headers.get('WARC-Target-Uri')
print('Writing req/resp {0} to {1} '.format(url, filename))
with open(filename, 'a+b') as out:
start = out.tell() start = out.tell()
self._write_warc_record(out, resp) self._write_warc_record(out, resp)
self._write_warc_record(out, req) self._write_warc_record(out, req)
out.flush() out.flush()
new_size = out.tell()
out.seek(start) out.seek(start)
if self.dedup_index: if self.dedup_index:
self.dedup_index.index_records(out, params, filename=filename) self.dedup_index.index_records(out, params, filename=filename)
except Exception as e:
traceback.print_exc()
close_file = True
# ============================================================================ finally:
class SingleFileWARCWriter(BaseWARCWriter): # check for rollover
def __init__(self, filename_template, dir_prefix='', max_size=0, *args, **kwargs): if self.max_size and new_size > self.max_size:
super(SingleFileWARCWriter, self).__init__(*args, **kwargs) close_file = True
self.dir_prefix = dir_prefix
self.filename_template = filename_template
self.max_size = max_size
self._open_file()
def _open_file(self): if close_file:
timestamp = timestamp20_now() if is_new:
self._close_file(out)
else:
self.remove_file(full_dir)
filename = self.filename_template.format(hostname=self.hostname, elif is_new:
timestamp=timestamp) fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.fh_cache[full_dir] = (out, filename)
self.filename = self.dir_prefix + filename
try:
os.makedirs(os.path.dirname(self.filename))
except:
pass
self._fh = open(self.filename, 'a+b')
fcntl.flock(self._fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
def _do_write_req_resp(self, req, resp, params, formatter):
url = resp.rec_headers.get('WARC-Target-Uri')
print('Writing {0} to {1} '.format(url, self.filename))
if not self._fh:
self._open_file()
out = self._fh
start = out.tell()
self._write_warc_record(out, resp)
self._write_warc_record(out, req)
out.flush()
new_size = out.tell()
out.seek(start)
if self.dedup_index:
self.dedup_index.index_records(out, params, filename=self.filename)
# check for rollover
if self.max_size and new_size > self.max_size:
self.close()
def close(self): def close(self):
if not self._fh: for n, v in self.fh_cache.items():
return None out, filename = v
self._close_file(out)
fcntl.flock(self._fh, fcntl.LOCK_UN) self.fh_cache = {}
# ============================================================================
class PerRecordWARCWriter(MultiFileWARCWriter):
def __init__(self, *args, **kwargs):
kwargs['max_size'] = 1
super(PerRecordWARCWriter, self).__init__(*args, **kwargs)
self._fh.close()
self._fh = None