1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

recorder: add tests for single file writer, including file locking

dedup policy: support customizable dedup/skip/write policy plugins and add tests
This commit is contained in:
Ilya Kreymer 2016-03-18 15:28:24 -07:00
parent cba8e4ee3a
commit b64be0dff1
5 changed files with 229 additions and 54 deletions

View File

@ -1,5 +1,8 @@
from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_iso_date
# ============================================================================
# Header Exclusions
# ============================================================================
class ExcludeNone(object):
def __call__(self, record):
@ -15,3 +18,23 @@ class ExcludeSpecificHeaders(object):
return self.exclude_headers
# ============================================================================
# Revisit Policy
# ============================================================================
class WriteRevisitDupePolicy(object):
def __call__(self, cdx):
dt = timestamp_to_datetime(cdx['timestamp'])
return ('revisit', cdx['url'], datetime_to_iso_date(dt))
# ============================================================================
class SkipDupePolicy(object):
def __call__(self, cdx):
return 'skip'
# ============================================================================
class WriteDupePolicy(object):
def __call__(self, cdx):
return 'write'

View File

@ -39,7 +39,10 @@ class RecorderApp(object):
def _write_loop(self):
while True:
self._write_one()
try:
self._write_one()
except:
traceback.print_exc()
def _write_one(self):
req = None
@ -56,8 +59,6 @@ class RecorderApp(object):
resp = self._create_resp_record(resp_head, resp_pay, 'response')
self.writer.write_req_resp(req, resp, params)
except:
traceback.print_exc()
finally:
try:

View File

@ -1,8 +1,7 @@
from pywb.utils.canonicalize import calc_search_range
from pywb.cdx.cdxobject import CDXObject
from pywb.warc.cdxindexer import write_cdx_index
from pywb.utils.timeutils import timestamp_to_datetime
from pywb.utils.timeutils import datetime_to_iso_date, iso_date_to_timestamp
from pywb.utils.timeutils import iso_date_to_timestamp
from io import BytesIO
import os
@ -11,24 +10,25 @@ from webagg.indexsource import RedisIndexSource
from webagg.aggregator import SimpleAggregator
from webagg.utils import res_template
from recorder.filters import WriteRevisitDupePolicy
#==============================================================================
class WritableRedisIndexer(RedisIndexSource):
def __init__(self, redis_url, rel_path_template='', name='recorder'):
def __init__(self, redis_url, rel_path_template='', name='recorder',
dupe_policy=WriteRevisitDupePolicy()):
super(WritableRedisIndexer, self).__init__(redis_url)
self.cdx_lookup = SimpleAggregator({name: self})
self.rel_path_template = rel_path_template
self.dupe_policy = dupe_policy
def add_record(self, stream, params, filename=None):
if not filename and hasattr(stream, 'name'):
filename = stream.name
def index_records(self, stream, params, filename=None):
rel_path = res_template(self.rel_path_template, params)
filename = os.path.relpath(filename, rel_path)
cdxout = BytesIO()
write_cdx_index(cdxout, stream, filename,
cdxj=True, append_post=True, rel_root=rel_path)
cdxj=True, append_post=True)
z_key = res_template(self.redis_key_template, params)
@ -55,8 +55,8 @@ class WritableRedisIndexer(RedisIndexSource):
cdx_iter, errs = self.cdx_lookup(params)
for cdx in cdx_iter:
dt = timestamp_to_datetime(cdx['timestamp'])
return ('revisit', cdx['url'],
datetime_to_iso_date(dt))
res = self.dupe_policy(cdx)
if res:
return res
return None

View File

@ -8,11 +8,12 @@ import webtest
from fakeredis import FakeStrictRedis
from mock import patch
from pytest import raises
from recorder.recorderapp import RecorderApp
from recorder.redisindexer import WritableRedisIndexer
from recorder.warcrecorder import PerRecordWARCRecorder
from recorder.filters import ExcludeSpecificHeaders
from recorder.warcrecorder import PerRecordWARCRecorder, SingleFileWARCRecorder
from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy
from webagg.utils import MementoUtils
@ -20,9 +21,11 @@ from pywb.cdx.cdxobject import CDXObject
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.warc.cdxindexer import write_cdx_index
from six.moves.urllib.parse import quote, unquote
from io import BytesIO
import time
general_req_data = "\
GET {path} HTTP/1.1\r\n\
@ -45,7 +48,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
def _test_per_warc(self, recorder_app, host, path, other_params=''):
def _test_warc_write(self, recorder_app, host, path, other_params=''):
url = 'http://' + host + path
req_url = '/live/resource/postreq?url=' + url + other_params
testapp = webtest.TestApp(recorder_app)
@ -72,7 +75,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')))
resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
@ -82,7 +85,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live')
resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
@ -105,7 +108,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCRecorder(base_path), accept_colls='live')
resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
assert b'HTTP/1.1 302' in resp.body
buff = BytesIO(resp.body)
@ -134,7 +137,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
PerRecordWARCRecorder(base_path, header_filter=header_filter),
accept_colls='live')
resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
assert b'HTTP/1.1 302' in resp.body
buff = BytesIO(resp.body)
@ -161,7 +164,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
recorder_app = RecorderApp(self.upstream_url,
writer=PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='not-live')
resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
@ -181,7 +184,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
self._test_all_warcs('/warcs/', 4)
resp = self._test_per_warc(recorder_app, 'httpbin.org',
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
@ -190,7 +193,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrange('USER:COLL:cdxj', 0, -1)
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 1
cdx = CDXObject(res[0])
@ -214,7 +217,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
self._test_all_warcs('/warcs/', 4)
resp = self._test_per_warc(recorder_app, 'httpbin.org',
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
@ -224,7 +227,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
# Test Redis CDX
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrange('USER:COLL:cdxj', 0, -1)
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
cdx = CDXObject(res[1])
@ -246,5 +249,129 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
assert status_headers.get_header('WARC-Refers-To-Target-URI') == 'http://httpbin.org/get?foo=bar'
assert status_headers.get_header('WARC-Refers-To-Date') != ''
@patch('redis.StrictRedis', FakeStrictRedis)
def test_record_param_user_coll_skip(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
rel_path_template=self.root_dir + '/warcs/',
dupe_policy=SkipDupePolicy())
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCRecorder(warc_path, dedup_index=dedup_index))
# No new entries written
self._test_all_warcs('/warcs/', 4)
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 4)
# Test Redis CDX
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
@patch('redis.StrictRedis', FakeStrictRedis)
def test_record_param_user_coll_write_dupe_no_revisit(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
rel_path_template=self.root_dir + '/warcs/',
dupe_policy=WriteDupePolicy())
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCRecorder(warc_path, dedup_index=dedup_index))
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 6)
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 3
mimes = [CDXObject(x)['mime'] for x in res]
assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit']
# Single File
def test_record_single_file_warc_1(self):
path = to_path(self.root_dir + '/warcs/A.warc.gz')
recorder_app = RecorderApp(self.upstream_url,
SingleFileWARCRecorder(path))
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
assert os.path.isfile(path)
@patch('redis.StrictRedis', FakeStrictRedis)
def test_record_single_file_multiple_writes(self):
warc_path = to_path(self.root_dir + '/warcs/FOO/rec-test.warc.gz')
rel_path = self.root_dir + '/warcs/'
dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj',
rel_path_template=rel_path)
writer = SingleFileWARCRecorder(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer)
# First Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.coll=FOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
# Second Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=FOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"boo": "far"' in resp.body
self._test_all_warcs('/warcs/FOO/', 1)
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
assert os.path.isfile(warc_path)
cdxout = BytesIO()
with open(warc_path, 'rb') as fh:
filename = os.path.relpath(warc_path, rel_path)
write_cdx_index(cdxout, fh, filename,
cdxj=True, append_post=True, sort=True)
res = [CDXObject(x) for x in res]
cdxres = cdxout.getvalue().strip()
cdxres = cdxres.split(b'\n')
cdxres = [CDXObject(x) for x in cdxres]
assert cdxres == res
writer.close()
with raises(OSError):
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=FOO')

View File

@ -8,12 +8,18 @@ import sys
import os
import six
import traceback
from collections import OrderedDict
from socket import gethostname
import fcntl
from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.timeutils import timestamp20_now
from webagg.utils import ParamFormatter
@ -99,14 +105,15 @@ class BaseWARCRecorder(object):
print('Skipping due to dedup')
return
self._do_write_req_resp(req, resp, params)
formatter = ParamFormatter(params, name=self.rec_source_name)
self._do_write_req_resp(req, resp, params, formatter)
def _check_revisit(self, record, params):
if not self.dedup_index:
return record
try:
url = record.rec_headers.get('WARC-Target-URI')
url = record.rec_headers.get('WARC-Target-Uri')
digest = record.rec_headers.get('WARC-Payload-Digest')
iso_dt = record.rec_headers.get('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
@ -221,33 +228,47 @@ class Digester(object):
# ============================================================================
class SingleFileWARCRecorder(BaseWARCRecorder):
def __init__(self, warcfilename, *args, **kwargs):
def __init__(self, filename, *args, **kwargs):
super(SingleFileWARCRecorder, self).__init__(*args, **kwargs)
self.warcfilename = warcfilename
self.filename = filename.format(timestamp=timestamp20_now(),
host=gethostname())
def _do_write_req_resp(self, req, resp, params):
print('Writing {0} to {1} '.format(url, self.warcfilename))
with open(self.warcfilename, 'a+b') as out:
start = out.tell()
try:
os.makedirs(os.path.dirname(self.filename))
except:
pass
self._write_warc_record(out, resp)
self._write_warc_record(out, req)
self._fh = open(self.filename, 'a+b')
out.flush()
out.seek(start)
fcntl.flock(self._fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
if self.dedup_index:
self.dedup_index.add_record(out, params, filename=self.warcfilename)
def _do_write_req_resp(self, req, resp, params, formatter):
url = resp.rec_headers.get('WARC-Target-Uri')
print('Writing {0} to {1} '.format(url, self.filename))
def add_user_record(self, url, content_type, data):
with open(self.warcfilename, 'a+b') as out:
start = out.tell()
self._write_warc_metadata(out, url, content_type, data)
out.flush()
out = self._fh
if not out:
raise IOError('Already closed')
#out.seek(start)
#if self.indexer:
# self.indexer.add_record(out, self.warcfilename)
start = out.tell()
self._write_warc_record(out, resp)
self._write_warc_record(out, req)
out.flush()
out.seek(start)
if self.dedup_index:
self.dedup_index.index_records(out, params, filename=self.filename)
def close(self):
if not self._fh:
return None
fcntl.flock(self._fh, fcntl.LOCK_UN)
self._fh.close()
self._fh = None
# ============================================================================
@ -256,11 +277,10 @@ class PerRecordWARCRecorder(BaseWARCRecorder):
super(PerRecordWARCRecorder, self).__init__(*args, **kwargs)
self.warcdir = warcdir
def _do_write_req_resp(self, req, resp, params):
def _do_write_req_resp(self, req, resp, params, formatter):
resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ')
req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ')
formatter = ParamFormatter(params, name=self.rec_source_name)
full_dir = formatter.format(self.warcdir)
try:
@ -271,14 +291,18 @@ class PerRecordWARCRecorder(BaseWARCRecorder):
resp_filename = os.path.join(full_dir, resp_uuid + '.warc.gz')
req_filename = os.path.join(full_dir, req_uuid + '.warc.gz')
self._write_record(resp_filename, resp, params, True)
self._write_record(req_filename, req, params, False)
url = resp.rec_headers.get('WARC-Target-Uri')
print('Writing request for {0} to {1}'.format(url, req_filename))
print('Writing response for {0} to {1}'.format(url, resp_filename))
def _write_record(self, filename, rec, params, index=False):
self._write_and_index(resp_filename, resp, params, True)
self._write_and_index(req_filename, req, params, False)
def _write_and_index(self, filename, rec, params, index=False):
with open(filename, 'w+b') as out:
self._write_warc_record(out, rec)
if index and self.dedup_index:
out.seek(0)
self.dedup_index.add_record(out, params, filename=filename)
self.dedup_index.index_records(out, params, filename=filename)