From b64be0dff1c266a0865d6cd307ad356bab949007 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 18 Mar 2016 15:28:24 -0700 Subject: [PATCH] recorder: add tests for single file writer, including file locking dedup policy: support customizable dedup/skip/write policy plugins and add tests --- recorder/filters.py | 23 +++++ recorder/recorderapp.py | 7 +- recorder/redisindexer.py | 22 ++--- recorder/test/test_recorder.py | 151 ++++++++++++++++++++++++++++++--- recorder/warcrecorder.py | 80 +++++++++++------ 5 files changed, 229 insertions(+), 54 deletions(-) diff --git a/recorder/filters.py b/recorder/filters.py index 809822d4..3635a5ab 100644 --- a/recorder/filters.py +++ b/recorder/filters.py @@ -1,5 +1,8 @@ +from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_iso_date +# ============================================================================ +# Header Exclusions # ============================================================================ class ExcludeNone(object): def __call__(self, record): @@ -15,3 +18,23 @@ class ExcludeSpecificHeaders(object): return self.exclude_headers +# ============================================================================ +# Revisit Policy +# ============================================================================ +class WriteRevisitDupePolicy(object): + def __call__(self, cdx): + dt = timestamp_to_datetime(cdx['timestamp']) + return ('revisit', cdx['url'], datetime_to_iso_date(dt)) + + +# ============================================================================ +class SkipDupePolicy(object): + def __call__(self, cdx): + return 'skip' + + +# ============================================================================ +class WriteDupePolicy(object): + def __call__(self, cdx): + return 'write' + diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index 694228f2..fc16ac9f 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -39,7 +39,10 @@ class RecorderApp(object): def _write_loop(self): while True: - self._write_one() + try: + self._write_one() + except: + traceback.print_exc() def _write_one(self): req = None @@ -56,8 +59,6 @@ class RecorderApp(object): resp = self._create_resp_record(resp_head, resp_pay, 'response') self.writer.write_req_resp(req, resp, params) - except: - traceback.print_exc() finally: try: diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py index ce359672..f864b51a 100644 --- a/recorder/redisindexer.py +++ b/recorder/redisindexer.py @@ -1,8 +1,7 @@ from pywb.utils.canonicalize import calc_search_range from pywb.cdx.cdxobject import CDXObject from pywb.warc.cdxindexer import write_cdx_index -from pywb.utils.timeutils import timestamp_to_datetime -from pywb.utils.timeutils import datetime_to_iso_date, iso_date_to_timestamp +from pywb.utils.timeutils import iso_date_to_timestamp from io import BytesIO import os @@ -11,24 +10,25 @@ from webagg.indexsource import RedisIndexSource from webagg.aggregator import SimpleAggregator from webagg.utils import res_template +from recorder.filters import WriteRevisitDupePolicy + #============================================================================== class WritableRedisIndexer(RedisIndexSource): - def __init__(self, redis_url, rel_path_template='', name='recorder'): + def __init__(self, redis_url, rel_path_template='', name='recorder', + dupe_policy=WriteRevisitDupePolicy()): super(WritableRedisIndexer, self).__init__(redis_url) self.cdx_lookup = SimpleAggregator({name: self}) self.rel_path_template = rel_path_template + self.dupe_policy = dupe_policy - def add_record(self, stream, params, filename=None): - if not filename and hasattr(stream, 'name'): - filename = stream.name - + def index_records(self, stream, params, filename=None): rel_path = res_template(self.rel_path_template, params) filename = os.path.relpath(filename, rel_path) cdxout = BytesIO() write_cdx_index(cdxout, stream, filename, - cdxj=True, append_post=True, rel_root=rel_path) + cdxj=True, append_post=True) z_key = res_template(self.redis_key_template, params) @@ -55,8 +55,8 @@ class WritableRedisIndexer(RedisIndexSource): cdx_iter, errs = self.cdx_lookup(params) for cdx in cdx_iter: - dt = timestamp_to_datetime(cdx['timestamp']) - return ('revisit', cdx['url'], - datetime_to_iso_date(dt)) + res = self.dupe_policy(cdx) + if res: + return res return None diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 7838830b..d83f8773 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -8,11 +8,12 @@ import webtest from fakeredis import FakeStrictRedis from mock import patch +from pytest import raises from recorder.recorderapp import RecorderApp from recorder.redisindexer import WritableRedisIndexer -from recorder.warcrecorder import PerRecordWARCRecorder -from recorder.filters import ExcludeSpecificHeaders +from recorder.warcrecorder import PerRecordWARCRecorder, SingleFileWARCRecorder +from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy from webagg.utils import MementoUtils @@ -20,9 +21,11 @@ from pywb.cdx.cdxobject import CDXObject from pywb.utils.statusandheaders import StatusAndHeadersParser from pywb.utils.bufferedreaders import DecompressingBufferedReader from pywb.warc.recordloader import ArcWarcRecordLoader +from pywb.warc.cdxindexer import write_cdx_index from six.moves.urllib.parse import quote, unquote from io import BytesIO +import time general_req_data = "\ GET {path} HTTP/1.1\r\n\ @@ -45,7 +48,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) - def _test_per_warc(self, recorder_app, host, path, other_params=''): + def _test_warc_write(self, recorder_app, host, path, other_params=''): url = 'http://' + host + path req_url = '/live/resource/postreq?url=' + url + other_params testapp = webtest.TestApp(recorder_app) @@ -72,7 +75,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): recorder_app = RecorderApp(self.upstream_url, PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/'))) - resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body @@ -82,7 +85,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): recorder_app = RecorderApp(self.upstream_url, PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live') - resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body @@ -105,7 +108,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): recorder_app = RecorderApp(self.upstream_url, PerRecordWARCRecorder(base_path), accept_colls='live') - resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') assert b'HTTP/1.1 302' in resp.body buff = BytesIO(resp.body) @@ -134,7 +137,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): PerRecordWARCRecorder(base_path, header_filter=header_filter), accept_colls='live') - resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') assert b'HTTP/1.1 302' in resp.body buff = BytesIO(resp.body) @@ -161,7 +164,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): recorder_app = RecorderApp(self.upstream_url, writer=PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='not-live') - resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body @@ -181,7 +184,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): self._test_all_warcs('/warcs/', 4) - resp = self._test_per_warc(recorder_app, 'httpbin.org', + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body @@ -190,7 +193,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): r = FakeStrictRedis.from_url('redis://localhost/2') - res = r.zrange('USER:COLL:cdxj', 0, -1) + res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,') assert len(res) == 1 cdx = CDXObject(res[0]) @@ -214,7 +217,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): self._test_all_warcs('/warcs/', 4) - resp = self._test_per_warc(recorder_app, 'httpbin.org', + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') assert b'HTTP/1.1 200 OK' in resp.body assert b'"foo": "bar"' in resp.body @@ -224,7 +227,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): # Test Redis CDX r = FakeStrictRedis.from_url('redis://localhost/2') - res = r.zrange('USER:COLL:cdxj', 0, -1) + res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,') assert len(res) == 2 cdx = CDXObject(res[1]) @@ -246,5 +249,129 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): assert status_headers.get_header('WARC-Refers-To-Target-URI') == 'http://httpbin.org/get?foo=bar' assert status_headers.get_header('WARC-Refers-To-Date') != '' + @patch('redis.StrictRedis', FakeStrictRedis) + def test_record_param_user_coll_skip(self): + warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') + + + dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + rel_path_template=self.root_dir + '/warcs/', + dupe_policy=SkipDupePolicy()) + + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + + # No new entries written + self._test_all_warcs('/warcs/', 4) + + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/USER/COLL/', 4) + + # Test Redis CDX + r = FakeStrictRedis.from_url('redis://localhost/2') + + res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,') + assert len(res) == 2 + + @patch('redis.StrictRedis', FakeStrictRedis) + def test_record_param_user_coll_write_dupe_no_revisit(self): + + warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') + + + dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + rel_path_template=self.root_dir + '/warcs/', + dupe_policy=WriteDupePolicy()) + + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/USER/COLL/', 6) + + r = FakeStrictRedis.from_url('redis://localhost/2') + + res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,') + assert len(res) == 3 + + mimes = [CDXObject(x)['mime'] for x in res] + + assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit'] + + # Single File + def test_record_single_file_warc_1(self): + path = to_path(self.root_dir + '/warcs/A.warc.gz') + recorder_app = RecorderApp(self.upstream_url, + SingleFileWARCRecorder(path)) + + resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + assert os.path.isfile(path) + + + @patch('redis.StrictRedis', FakeStrictRedis) + def test_record_single_file_multiple_writes(self): + warc_path = to_path(self.root_dir + '/warcs/FOO/rec-test.warc.gz') + + rel_path = self.root_dir + '/warcs/' + + dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj', + rel_path_template=rel_path) + + writer = SingleFileWARCRecorder(warc_path, dedup_index=dedup_index) + recorder_app = RecorderApp(self.upstream_url, writer) + + # First Record + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.coll=FOO') + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + + # Second Record + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?boo=far', '¶m.recorder.coll=FOO') + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"boo": "far"' in resp.body + + self._test_all_warcs('/warcs/FOO/', 1) + + r = FakeStrictRedis.from_url('redis://localhost/2') + res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,') + assert len(res) == 2 + + assert os.path.isfile(warc_path) + + cdxout = BytesIO() + with open(warc_path, 'rb') as fh: + filename = os.path.relpath(warc_path, rel_path) + write_cdx_index(cdxout, fh, filename, + cdxj=True, append_post=True, sort=True) + + res = [CDXObject(x) for x in res] + + cdxres = cdxout.getvalue().strip() + cdxres = cdxres.split(b'\n') + cdxres = [CDXObject(x) for x in cdxres] + + assert cdxres == res + + writer.close() + + with raises(OSError): + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?boo=far', '¶m.recorder.coll=FOO') diff --git a/recorder/warcrecorder.py b/recorder/warcrecorder.py index a17cd670..edf17c79 100644 --- a/recorder/warcrecorder.py +++ b/recorder/warcrecorder.py @@ -8,12 +8,18 @@ import sys import os import six + import traceback from collections import OrderedDict +from socket import gethostname + +import fcntl + from pywb.utils.loaders import LimitReader, to_native_str from pywb.utils.bufferedreaders import BufferedReader +from pywb.utils.timeutils import timestamp20_now from webagg.utils import ParamFormatter @@ -99,14 +105,15 @@ class BaseWARCRecorder(object): print('Skipping due to dedup') return - self._do_write_req_resp(req, resp, params) + formatter = ParamFormatter(params, name=self.rec_source_name) + self._do_write_req_resp(req, resp, params, formatter) def _check_revisit(self, record, params): if not self.dedup_index: return record try: - url = record.rec_headers.get('WARC-Target-URI') + url = record.rec_headers.get('WARC-Target-Uri') digest = record.rec_headers.get('WARC-Payload-Digest') iso_dt = record.rec_headers.get('WARC-Date') result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt) @@ -221,33 +228,47 @@ class Digester(object): # ============================================================================ class SingleFileWARCRecorder(BaseWARCRecorder): - def __init__(self, warcfilename, *args, **kwargs): + def __init__(self, filename, *args, **kwargs): super(SingleFileWARCRecorder, self).__init__(*args, **kwargs) - self.warcfilename = warcfilename + self.filename = filename.format(timestamp=timestamp20_now(), + host=gethostname()) - def _do_write_req_resp(self, req, resp, params): - print('Writing {0} to {1} '.format(url, self.warcfilename)) - with open(self.warcfilename, 'a+b') as out: - start = out.tell() + try: + os.makedirs(os.path.dirname(self.filename)) + except: + pass - self._write_warc_record(out, resp) - self._write_warc_record(out, req) + self._fh = open(self.filename, 'a+b') - out.flush() - out.seek(start) + fcntl.flock(self._fh, fcntl.LOCK_EX | fcntl.LOCK_NB) - if self.dedup_index: - self.dedup_index.add_record(out, params, filename=self.warcfilename) + def _do_write_req_resp(self, req, resp, params, formatter): + url = resp.rec_headers.get('WARC-Target-Uri') + print('Writing {0} to {1} '.format(url, self.filename)) - def add_user_record(self, url, content_type, data): - with open(self.warcfilename, 'a+b') as out: - start = out.tell() - self._write_warc_metadata(out, url, content_type, data) - out.flush() + out = self._fh + if not out: + raise IOError('Already closed') - #out.seek(start) - #if self.indexer: - # self.indexer.add_record(out, self.warcfilename) + start = out.tell() + + self._write_warc_record(out, resp) + self._write_warc_record(out, req) + + out.flush() + out.seek(start) + + if self.dedup_index: + self.dedup_index.index_records(out, params, filename=self.filename) + + def close(self): + if not self._fh: + return None + + fcntl.flock(self._fh, fcntl.LOCK_UN) + + self._fh.close() + self._fh = None # ============================================================================ @@ -256,11 +277,10 @@ class PerRecordWARCRecorder(BaseWARCRecorder): super(PerRecordWARCRecorder, self).__init__(*args, **kwargs) self.warcdir = warcdir - def _do_write_req_resp(self, req, resp, params): + def _do_write_req_resp(self, req, resp, params, formatter): resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') - formatter = ParamFormatter(params, name=self.rec_source_name) full_dir = formatter.format(self.warcdir) try: @@ -271,14 +291,18 @@ class PerRecordWARCRecorder(BaseWARCRecorder): resp_filename = os.path.join(full_dir, resp_uuid + '.warc.gz') req_filename = os.path.join(full_dir, req_uuid + '.warc.gz') - self._write_record(resp_filename, resp, params, True) - self._write_record(req_filename, req, params, False) + url = resp.rec_headers.get('WARC-Target-Uri') + print('Writing request for {0} to {1}'.format(url, req_filename)) + print('Writing response for {0} to {1}'.format(url, resp_filename)) - def _write_record(self, filename, rec, params, index=False): + self._write_and_index(resp_filename, resp, params, True) + self._write_and_index(req_filename, req, params, False) + + def _write_and_index(self, filename, rec, params, index=False): with open(filename, 'w+b') as out: self._write_warc_record(out, rec) if index and self.dedup_index: out.seek(0) - self.dedup_index.add_record(out, params, filename=filename) + self.dedup_index.index_records(out, params, filename=filename)