From e81457df5fb4a0d42b8335f125988ff76dda757e Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 18 Mar 2016 19:49:14 -0700 Subject: [PATCH] rename WARCRecorder -> WARCWriter, add optional max_size to single warc recorder per-record recorder combines http response/req into single file --- recorder/recorderapp.py | 3 - recorder/test/test_recorder.py | 78 ++++++------- recorder/{warcrecorder.py => warcwriter.py} | 118 ++++++++++++-------- 3 files changed, 111 insertions(+), 88 deletions(-) rename recorder/{warcrecorder.py => warcwriter.py} (79%) diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index fc16ac9f..818b0eed 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -6,9 +6,6 @@ from pywb.utils.statusandheaders import StatusAndHeadersParser from pywb.warc.recordloader import ArcWarcRecord from pywb.warc.recordloader import ArcWarcRecordLoader -from recorder.warcrecorder import SingleFileWARCRecorder, PerRecordWARCRecorder -from recorder.redisindexer import WritableRedisIndexer - from six.moves.urllib.parse import parse_qsl import json diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index d83f8773..01648a82 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -12,7 +12,7 @@ from pytest import raises from recorder.recorderapp import RecorderApp from recorder.redisindexer import WritableRedisIndexer -from recorder.warcrecorder import PerRecordWARCRecorder, SingleFileWARCRecorder +from recorder.warcwriter import PerRecordWARCWriter, SingleFileWARCWriter from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy from webagg.utils import MementoUtils @@ -70,10 +70,21 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): files = [x for x in os.listdir(coll_dir) if os.path.isfile(os.path.join(coll_dir, x))] assert len(files) == num assert all(x.endswith('.warc.gz') for x in files) + return files, coll_dir def test_record_warc_1(self): recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/'))) + PerRecordWARCWriter(to_path(self.root_dir + '/warcs/'))) + + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/', 1) + + def test_record_warc_2(self): + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='live') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body @@ -81,19 +92,9 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): self._test_all_warcs('/warcs/', 2) - def test_record_warc_2(self): - recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live') - - resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') - assert b'HTTP/1.1 200 OK' in resp.body - assert b'"foo": "bar"' in resp.body - - self._test_all_warcs('/warcs/', 4) - def test_error_url(self): recorder_app = RecorderApp(self.upstream_url + '01', - PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live') + PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='live') testapp = webtest.TestApp(recorder_app) @@ -101,12 +102,12 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert resp.json['error'] != '' - self._test_all_warcs('/warcs/', 4) + self._test_all_warcs('/warcs/', 2) def test_record_cookies_header(self): base_path = to_path(self.root_dir + '/warcs/cookiecheck/') recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(base_path), accept_colls='live') + PerRecordWARCWriter(base_path), accept_colls='live') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') assert b'HTTP/1.1 302' in resp.body @@ -134,7 +135,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): base_path = to_path(self.root_dir + '/warcs/cookieskip/') header_filter = ExcludeSpecificHeaders(['Set-Cookie', 'Cookie']) recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(base_path, header_filter=header_filter), + PerRecordWARCWriter(base_path, header_filter=header_filter), accept_colls='live') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') @@ -162,13 +163,13 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): def test_record_skip_wrong_coll(self): recorder_app = RecorderApp(self.upstream_url, - writer=PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='not-live') + writer=PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='not-live') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body - self._test_all_warcs('/warcs/', 4) + self._test_all_warcs('/warcs/', 2) @patch('redis.StrictRedis', FakeStrictRedis) def test_record_param_user_coll(self): @@ -180,16 +181,16 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): rel_path_template=self.root_dir + '/warcs/') recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) - self._test_all_warcs('/warcs/', 4) + self._test_all_warcs('/warcs/', 2) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body - self._test_all_warcs('/warcs/USER/COLL/', 2) + self._test_all_warcs('/warcs/USER/COLL/', 1) r = FakeStrictRedis.from_url('redis://localhost/2') @@ -213,16 +214,16 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): rel_path_template=self.root_dir + '/warcs/') recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) - self._test_all_warcs('/warcs/', 4) + self._test_all_warcs('/warcs/', 2) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body - self._test_all_warcs('/warcs/USER/COLL/', 4) + self._test_all_warcs('/warcs/USER/COLL/', 2) # Test Redis CDX r = FakeStrictRedis.from_url('redis://localhost/2') @@ -259,17 +260,17 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): dupe_policy=SkipDupePolicy()) recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) # No new entries written - self._test_all_warcs('/warcs/', 4) + self._test_all_warcs('/warcs/', 2) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body - self._test_all_warcs('/warcs/USER/COLL/', 4) + self._test_all_warcs('/warcs/USER/COLL/', 2) # Test Redis CDX r = FakeStrictRedis.from_url('redis://localhost/2') @@ -288,14 +289,14 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): dupe_policy=WriteDupePolicy()) recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + PerRecordWARCWriter(warc_path, dedup_index=dedup_index)) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body - self._test_all_warcs('/warcs/USER/COLL/', 6) + self._test_all_warcs('/warcs/USER/COLL/', 3) r = FakeStrictRedis.from_url('redis://localhost/2') @@ -310,7 +311,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): def test_record_single_file_warc_1(self): path = to_path(self.root_dir + '/warcs/A.warc.gz') recorder_app = RecorderApp(self.upstream_url, - SingleFileWARCRecorder(path)) + SingleFileWARCWriter(path)) resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body @@ -321,14 +322,14 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): @patch('redis.StrictRedis', FakeStrictRedis) def test_record_single_file_multiple_writes(self): - warc_path = to_path(self.root_dir + '/warcs/FOO/rec-test.warc.gz') + warc_path = to_path(self.root_dir + '/warcs/FOO/rec-{hostname}-{timestamp}.warc.gz') rel_path = self.root_dir + '/warcs/' dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', rel_path_template=rel_path) - writer = SingleFileWARCRecorder(warc_path, dedup_index=dedup_index) + writer = SingleFileWARCWriter(warc_path, dedup_index=dedup_index) recorder_app = RecorderApp(self.upstream_url, writer) # First Record @@ -352,11 +353,12 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,') assert len(res) == 2 - assert os.path.isfile(warc_path) + files, coll_dir = self._test_all_warcs('/warcs/FOO/', 1) + fullname = coll_dir + files[0] cdxout = BytesIO() - with open(warc_path, 'rb') as fh: - filename = os.path.relpath(warc_path, rel_path) + with open(fullname, 'rb') as fh: + filename = os.path.relpath(fullname, rel_path) write_cdx_index(cdxout, fh, filename, cdxj=True, append_post=True, sort=True) @@ -368,10 +370,10 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert cdxres == res + # close this file writer.close() - with raises(OSError): - resp = self._test_warc_write(recorder_app, 'httpbin.org', + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?boo=far', '¶m.recorder.coll=FOO') - + self._test_all_warcs('/warcs/FOO/', 2) diff --git a/recorder/warcrecorder.py b/recorder/warcwriter.py similarity index 79% rename from recorder/warcrecorder.py rename to recorder/warcwriter.py index edf17c79..790c9faf 100644 --- a/recorder/warcrecorder.py +++ b/recorder/warcwriter.py @@ -27,7 +27,7 @@ from recorder.filters import ExcludeNone # ============================================================================ -class BaseWARCRecorder(object): +class BaseWARCWriter(object): WARC_RECORDS = {'warcinfo': 'application/warc-fields', 'response': 'application/http; msgtype=response', 'revisit': 'application/http; msgtype=response', @@ -45,6 +45,7 @@ class BaseWARCRecorder(object): self.dedup_index = dedup_index self.rec_source_name = name self.header_filter = header_filter + self.hostname = gethostname() def ensure_digest(self, record): block_digest = record.rec_headers.get('WARC-Block-Digest') @@ -135,7 +136,7 @@ class BaseWARCRecorder(object): def _write_warc_record(self, out, record): if self.gzip: - out = GzippingWriter(out) + out = GzippingWrapper(out) self._line(out, b'WARC/1.0') @@ -196,7 +197,7 @@ class BaseWARCRecorder(object): # ============================================================================ -class GzippingWriter(object): +class GzippingWrapper(object): def __init__(self, out): self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) self.out = out @@ -227,11 +228,63 @@ class Digester(object): # ============================================================================ -class SingleFileWARCRecorder(BaseWARCRecorder): - def __init__(self, filename, *args, **kwargs): - super(SingleFileWARCRecorder, self).__init__(*args, **kwargs) - self.filename = filename.format(timestamp=timestamp20_now(), - host=gethostname()) +class PerRecordWARCWriter(BaseWARCWriter): + DEF_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' + + def __init__(self, warcdir, filename_template=None, *args, **kwargs): + super(PerRecordWARCWriter, self).__init__(*args, **kwargs) + if not filename_template: + filename_template = self.DEF_TEMPLATE + self.filename_template = warcdir + filename_template + + def _do_write_req_resp(self, req, resp, params, formatter): + #resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') + #req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') + timestamp = timestamp20_now() + + filename = formatter.format(self.filename_template, + hostname=self.hostname, + timestamp=timestamp) + + path, name = os.path.split(filename) + + try: + os.makedirs(path) + except: + pass + + url = resp.rec_headers.get('WARC-Target-Uri') + print('Writing resp/req for {0} to {1}'.format(url, filename)) + + with open(filename, 'a+b') as out: + start = out.tell() + + self._write_warc_record(out, resp) + self._write_warc_record(out, req) + + out.flush() + out.seek(start) + + if self.dedup_index: + self.dedup_index.index_records(out, params, filename=filename) + + +# ============================================================================ +class SingleFileWARCWriter(BaseWARCWriter): + def __init__(self, filename_template, dir_prefix='', max_size=0, *args, **kwargs): + super(SingleFileWARCWriter, self).__init__(*args, **kwargs) + self.dir_prefix = dir_prefix + self.filename_template = filename_template + self.max_size = max_size + self._open_file() + + def _open_file(self): + timestamp = timestamp20_now() + + filename = self.filename_template.format(hostname=self.hostname, + timestamp=timestamp) + + self.filename = self.dir_prefix + filename try: os.makedirs(os.path.dirname(self.filename)) @@ -246,9 +299,10 @@ class SingleFileWARCRecorder(BaseWARCRecorder): url = resp.rec_headers.get('WARC-Target-Uri') print('Writing {0} to {1} '.format(url, self.filename)) + if not self._fh: + self._open_file() + out = self._fh - if not out: - raise IOError('Already closed') start = out.tell() @@ -256,11 +310,18 @@ class SingleFileWARCRecorder(BaseWARCRecorder): self._write_warc_record(out, req) out.flush() + + new_size = out.tell() + out.seek(start) if self.dedup_index: self.dedup_index.index_records(out, params, filename=self.filename) + # check for rollover + if self.max_size and new_size > self.max_size: + self.close() + def close(self): if not self._fh: return None @@ -269,40 +330,3 @@ class SingleFileWARCRecorder(BaseWARCRecorder): self._fh.close() self._fh = None - - -# ============================================================================ -class PerRecordWARCRecorder(BaseWARCRecorder): - def __init__(self, warcdir, *args, **kwargs): - super(PerRecordWARCRecorder, self).__init__(*args, **kwargs) - self.warcdir = warcdir - - def _do_write_req_resp(self, req, resp, params, formatter): - resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') - req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') - - full_dir = formatter.format(self.warcdir) - - try: - os.makedirs(full_dir) - except: - pass - - resp_filename = os.path.join(full_dir, resp_uuid + '.warc.gz') - req_filename = os.path.join(full_dir, req_uuid + '.warc.gz') - - url = resp.rec_headers.get('WARC-Target-Uri') - print('Writing request for {0} to {1}'.format(url, req_filename)) - print('Writing response for {0} to {1}'.format(url, resp_filename)) - - self._write_and_index(resp_filename, resp, params, True) - self._write_and_index(req_filename, req, params, False) - - def _write_and_index(self, filename, rec, params, index=False): - with open(filename, 'w+b') as out: - self._write_warc_record(out, rec) - if index and self.dedup_index: - out.seek(0) - self.dedup_index.index_records(out, params, filename=filename) - -