mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-24 06:59:52 +01:00
recorder: ensure filename is also tracked by the indexer, add tests
for redis file mapping
This commit is contained in:
parent
3452cf39e0
commit
c96e419341
@ -15,13 +15,23 @@ from recorder.filters import WriteRevisitDupePolicy
|
|||||||
|
|
||||||
#==============================================================================
|
#==============================================================================
|
||||||
class WritableRedisIndexer(RedisIndexSource):
|
class WritableRedisIndexer(RedisIndexSource):
|
||||||
def __init__(self, redis_url, rel_path_template='', name='recorder',
|
def __init__(self, redis_url, rel_path_template='',
|
||||||
|
file_key_template='', name='recorder',
|
||||||
dupe_policy=WriteRevisitDupePolicy()):
|
dupe_policy=WriteRevisitDupePolicy()):
|
||||||
super(WritableRedisIndexer, self).__init__(redis_url)
|
super(WritableRedisIndexer, self).__init__(redis_url)
|
||||||
self.cdx_lookup = SimpleAggregator({name: self})
|
self.cdx_lookup = SimpleAggregator({name: self})
|
||||||
self.rel_path_template = rel_path_template
|
self.rel_path_template = rel_path_template
|
||||||
|
self.file_key_template = file_key_template
|
||||||
self.dupe_policy = dupe_policy
|
self.dupe_policy = dupe_policy
|
||||||
|
|
||||||
|
def add_warc_file(self, full_filename, params):
|
||||||
|
rel_path = res_template(self.rel_path_template, params)
|
||||||
|
filename = os.path.relpath(full_filename, rel_path)
|
||||||
|
|
||||||
|
file_key = res_template(self.file_key_template, params)
|
||||||
|
|
||||||
|
self.redis.hset(file_key, filename, full_filename)
|
||||||
|
|
||||||
def index_records(self, stream, params, filename=None):
|
def index_records(self, stream, params, filename=None):
|
||||||
rel_path = res_template(self.rel_path_template, params)
|
rel_path = res_template(self.rel_path_template, params)
|
||||||
filename = os.path.relpath(filename, rel_path)
|
filename = os.path.relpath(filename, rel_path)
|
||||||
|
@ -13,7 +13,8 @@ from pytest import raises
|
|||||||
from recorder.recorderapp import RecorderApp
|
from recorder.recorderapp import RecorderApp
|
||||||
from recorder.redisindexer import WritableRedisIndexer
|
from recorder.redisindexer import WritableRedisIndexer
|
||||||
from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter
|
from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter
|
||||||
from recorder.filters import ExcludeSpecificHeaders, SkipDupePolicy, WriteDupePolicy
|
from recorder.filters import ExcludeSpecificHeaders
|
||||||
|
from recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy
|
||||||
|
|
||||||
from webagg.utils import MementoUtils
|
from webagg.utils import MementoUtils
|
||||||
|
|
||||||
@ -47,6 +48,13 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
|
|
||||||
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
|
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
|
||||||
|
|
||||||
|
def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()):
|
||||||
|
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
||||||
|
file_key_template='{user}:{coll}:warc',
|
||||||
|
rel_path_template=self.root_dir + '/warcs/',
|
||||||
|
dupe_policy=dupe_policy)
|
||||||
|
|
||||||
|
return dedup_index
|
||||||
|
|
||||||
def _test_warc_write(self, recorder_app, host, path, other_params=''):
|
def _test_warc_write(self, recorder_app, host, path, other_params=''):
|
||||||
url = 'http://' + host + path
|
url = 'http://' + host + path
|
||||||
@ -176,9 +184,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
|
|
||||||
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
||||||
|
|
||||||
|
dedup_index = self._get_dedup_index()
|
||||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
|
||||||
rel_path_template=self.root_dir + '/warcs/')
|
|
||||||
|
|
||||||
recorder_app = RecorderApp(self.upstream_url,
|
recorder_app = RecorderApp(self.upstream_url,
|
||||||
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
||||||
@ -204,14 +210,16 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
assert cdx['filename'].startswith('USER/COLL/')
|
assert cdx['filename'].startswith('USER/COLL/')
|
||||||
assert cdx['filename'].endswith('.warc.gz')
|
assert cdx['filename'].endswith('.warc.gz')
|
||||||
|
|
||||||
|
warcs = r.hgetall('USER:COLL:warc')
|
||||||
|
full_path = self.root_dir + '/warcs/' + cdx['filename']
|
||||||
|
assert warcs == {cdx['filename'].encode('utf-8'): full_path.encode('utf-8')}
|
||||||
|
|
||||||
|
|
||||||
@patch('redis.StrictRedis', FakeStrictRedis)
|
@patch('redis.StrictRedis', FakeStrictRedis)
|
||||||
def test_record_param_user_coll_revisit(self):
|
def test_record_param_user_coll_revisit(self):
|
||||||
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
||||||
|
|
||||||
|
dedup_index = self._get_dedup_index()
|
||||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
|
||||||
rel_path_template=self.root_dir + '/warcs/')
|
|
||||||
|
|
||||||
recorder_app = RecorderApp(self.upstream_url,
|
recorder_app = RecorderApp(self.upstream_url,
|
||||||
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
||||||
@ -240,6 +248,10 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
|
|
||||||
fullwarc = os.path.join(self.root_dir, 'warcs', cdx['filename'])
|
fullwarc = os.path.join(self.root_dir, 'warcs', cdx['filename'])
|
||||||
|
|
||||||
|
warcs = r.hgetall('USER:COLL:warc')
|
||||||
|
assert len(warcs) == 2
|
||||||
|
assert warcs[cdx['filename'].encode('utf-8')] == fullwarc.encode('utf-8')
|
||||||
|
|
||||||
with open(fullwarc, 'rb') as fh:
|
with open(fullwarc, 'rb') as fh:
|
||||||
decomp = DecompressingBufferedReader(fh)
|
decomp = DecompressingBufferedReader(fh)
|
||||||
# Test refers-to headers
|
# Test refers-to headers
|
||||||
@ -254,10 +266,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
def test_record_param_user_coll_skip(self):
|
def test_record_param_user_coll_skip(self):
|
||||||
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
||||||
|
|
||||||
|
dedup_index = self._get_dedup_index(dupe_policy=SkipDupePolicy())
|
||||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
|
||||||
rel_path_template=self.root_dir + '/warcs/',
|
|
||||||
dupe_policy=SkipDupePolicy())
|
|
||||||
|
|
||||||
recorder_app = RecorderApp(self.upstream_url,
|
recorder_app = RecorderApp(self.upstream_url,
|
||||||
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
|
||||||
@ -283,10 +292,7 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
|
|
||||||
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
||||||
|
|
||||||
|
dedup_index = self._get_dedup_index(dupe_policy=WriteDupePolicy())
|
||||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj',
|
|
||||||
rel_path_template=self.root_dir + '/warcs/',
|
|
||||||
dupe_policy=WriteDupePolicy())
|
|
||||||
|
|
||||||
writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index)
|
writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index)
|
||||||
recorder_app = RecorderApp(self.upstream_url, writer)
|
recorder_app = RecorderApp(self.upstream_url, writer)
|
||||||
@ -329,8 +335,10 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
rel_path = self.root_dir + '/warcs/'
|
rel_path = self.root_dir + '/warcs/'
|
||||||
|
|
||||||
dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj',
|
dedup_index = WritableRedisIndexer('redis://localhost/2/{coll}:cdxj',
|
||||||
|
file_key_template='{coll}:warc',
|
||||||
rel_path_template=rel_path)
|
rel_path_template=rel_path)
|
||||||
|
|
||||||
|
|
||||||
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index)
|
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index)
|
||||||
recorder_app = RecorderApp(self.upstream_url, writer)
|
recorder_app = RecorderApp(self.upstream_url, writer)
|
||||||
|
|
||||||
@ -382,3 +390,8 @@ class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass):
|
|||||||
|
|
||||||
resp = self._test_warc_write(recorder_app, 'httpbin.org',
|
resp = self._test_warc_write(recorder_app, 'httpbin.org',
|
||||||
'/get?boo=far', '¶m.recorder.coll=FOO')
|
'/get?boo=far', '¶m.recorder.coll=FOO')
|
||||||
|
|
||||||
|
self._test_all_warcs('/warcs/FOO/', 2)
|
||||||
|
|
||||||
|
warcs = r.hgetall('FOO:warc')
|
||||||
|
assert len(warcs) == 2
|
||||||
|
@ -20,7 +20,7 @@ from pywb.utils.loaders import LimitReader, to_native_str
|
|||||||
from pywb.utils.bufferedreaders import BufferedReader
|
from pywb.utils.bufferedreaders import BufferedReader
|
||||||
from pywb.utils.timeutils import timestamp20_now
|
from pywb.utils.timeutils import timestamp20_now
|
||||||
|
|
||||||
from webagg.utils import ParamFormatter
|
from webagg.utils import ParamFormatter, res_template
|
||||||
|
|
||||||
from recorder.filters import ExcludeNone
|
from recorder.filters import ExcludeNone
|
||||||
|
|
||||||
@ -107,8 +107,8 @@ class BaseWARCWriter(object):
|
|||||||
print('Skipping due to dedup')
|
print('Skipping due to dedup')
|
||||||
return
|
return
|
||||||
|
|
||||||
formatter = ParamFormatter(params, name=self.rec_source_name)
|
params['_formatter'] = ParamFormatter(params, name=self.rec_source_name)
|
||||||
self._do_write_req_resp(req, resp, params, formatter)
|
self._do_write_req_resp(req, resp, params)
|
||||||
|
|
||||||
def _check_revisit(self, record, params):
|
def _check_revisit(self, record, params):
|
||||||
if not self.dedup_index:
|
if not self.dedup_index:
|
||||||
@ -248,18 +248,24 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
|
|
||||||
self.fh_cache = {}
|
self.fh_cache = {}
|
||||||
|
|
||||||
def _open_file(self, dir_):
|
def _open_file(self, dir_, params):
|
||||||
timestamp = timestamp20_now()
|
timestamp = timestamp20_now()
|
||||||
|
|
||||||
filename = dir_ + self.filename_template.format(hostname=self.hostname,
|
filename = dir_ + self.filename_template.format(hostname=self.hostname,
|
||||||
timestamp=timestamp)
|
timestamp=timestamp)
|
||||||
|
|
||||||
|
path, name = os.path.split(filename)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
os.makedirs(os.path.dirname(filename))
|
os.makedirs(path)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
fh = open(filename, 'a+b')
|
fh = open(filename, 'a+b')
|
||||||
|
|
||||||
|
if self.dedup_index:
|
||||||
|
self.dedup_index.add_warc_file(filename, params)
|
||||||
|
|
||||||
return fh, filename
|
return fh, filename
|
||||||
|
|
||||||
def _close_file(self, fh):
|
def _close_file(self, fh):
|
||||||
@ -272,8 +278,8 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
out, filename = result
|
out, filename = result
|
||||||
self._close_file(out)
|
self._close_file(out)
|
||||||
|
|
||||||
def _do_write_req_resp(self, req, resp, params, formatter):
|
def _do_write_req_resp(self, req, resp, params):
|
||||||
full_dir = formatter.format(self.dir_template)
|
full_dir = res_template(self.dir_template, params)
|
||||||
|
|
||||||
result = self.fh_cache.get(full_dir)
|
result = self.fh_cache.get(full_dir)
|
||||||
|
|
||||||
@ -283,7 +289,7 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
out, filename = result
|
out, filename = result
|
||||||
is_new = False
|
is_new = False
|
||||||
else:
|
else:
|
||||||
out, filename = self._open_file(full_dir)
|
out, filename = self._open_file(full_dir, params)
|
||||||
is_new = True
|
is_new = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user