1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-19 18:29:37 +01:00
pywb/recorder/test/test_recorder.py

580 lines
22 KiB
Python
Raw Normal View History

#from gevent import monkey; monkey.patch_all()
import gevent
from webagg.test.testutils import TempDirTests, LiveServerTests, BaseTestClass, to_path
from webagg.test.testutils import FakeRedisTests
import os
import webtest
from pytest import raises
from fakeredis import FakeStrictRedis
from recorder.recorderapp import RecorderApp
from recorder.redisindexer import WritableRedisIndexer
from recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter, SimpleTempWARCWriter
from recorder.filters import ExcludeSpecificHeaders
from recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy
from webagg.utils import MementoUtils
from pywb.cdx.cdxobject import CDXObject
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.warc.cdxindexer import write_cdx_index
from pywb.warc.archiveiterator import ArchiveIterator
from six.moves.urllib.parse import quote, unquote, urlencode
from io import BytesIO
import time
import json
general_req_data = "\
GET {path} HTTP/1.1\r\n\
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n\
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36\r\n\
X-Other: foo\r\n\
Host: {host}\r\n\
Cookie: boo=far\r\n\
\r\n"
class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass):
@classmethod
def setup_class(cls):
super(TestRecorder, cls).setup_class()
cls.warcs_dir = to_path(cls.root_dir + '/warcs')
os.makedirs(cls.warcs_dir)
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy(), user=True):
if user:
file_key_template = '{user}:{coll}:warc'
redis_url = 'redis://localhost/2/{user}:{coll}:cdxj'
else:
file_key_template = '{coll}:warc'
redis_url = 'redis://localhost/2/{coll}:cdxj'
dedup_index = WritableRedisIndexer(redis_url=redis_url,
file_key_template=file_key_template,
rel_path_template=self.root_dir + '/warcs/',
dupe_policy=dupe_policy)
return dedup_index
def _test_warc_write(self, recorder_app, host, path, other_params='', link_url=''):
url = 'http://' + host + path
req_url = '/live/resource/postreq?url=' + url + other_params
testapp = webtest.TestApp(recorder_app)
resp = testapp.post(req_url, general_req_data.format(host=host, path=path).encode('utf-8'))
if not recorder_app.write_queue.empty():
recorder_app._write_one()
assert resp.headers['WebAgg-Source-Coll'] == 'live'
if not link_url:
link_url = unquote(url)
assert resp.headers['Link'] == MementoUtils.make_link(link_url, 'original')
assert resp.headers['Memento-Datetime'] != ''
return resp
def _test_all_warcs(self, dirname, num):
coll_dir = to_path(self.root_dir + dirname)
assert os.path.isdir(coll_dir)
files = [x for x in os.listdir(coll_dir) if os.path.isfile(os.path.join(coll_dir, x))]
assert len(files) == num
assert all(x.endswith('.warc.gz') for x in files)
return files, coll_dir
def _load_resp_req(self, base_path):
warcs = os.listdir(base_path)
assert len(warcs) == 1
warc = warcs[0]
stored_resp = None
stored_req = None
with open(os.path.join(base_path, warc), 'rb') as fh:
for rec in ArchiveIterator(fh)():
if rec.rec_type == 'response':
stored_resp = rec
elif rec.rec_type == 'request':
stored_req = rec
assert stored_resp is not None
assert stored_req is not None
return stored_req, stored_resp
def test_record_warc_1(self):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')))
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/', 1)
def test_record_warc_2(self):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='live')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/', 2)
def test_error_url(self):
recorder_app = RecorderApp(self.upstream_url + '01',
PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='live')
testapp = webtest.TestApp(recorder_app)
resp = testapp.get('/live/resource?url=http://example.com/', status=400)
assert resp.json['error'] != ''
self._test_all_warcs('/warcs/', 2)
def test_record_cookies_header(self):
base_path = to_path(self.root_dir + '/warcs/cookiecheck/')
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(base_path), accept_colls='live')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
assert b'HTTP/1.1 302' in resp.body
buff = BytesIO(resp.body)
record = ArcWarcRecordLoader().parse_record_stream(buff)
assert ('Set-Cookie', 'name=value; Path=/') in record.status_headers.headers
assert ('Set-Cookie', 'foo=bar; Path=/') in record.status_headers.headers
stored_req, stored_resp = self._load_resp_req(base_path)
assert ('Set-Cookie', 'name=value; Path=/') in stored_resp.status_headers.headers
assert ('Set-Cookie', 'foo=bar; Path=/') in stored_resp.status_headers.headers
assert ('X-Other', 'foo') in stored_req.status_headers.headers
assert ('Cookie', 'boo=far') in stored_req.status_headers.headers
def test_record_cookies_skip_header(self):
warc_path = to_path(self.root_dir + '/warcs/cookieskip/')
header_filter = ExcludeSpecificHeaders(['Set-Cookie', 'Cookie'])
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(warc_path, header_filter=header_filter),
accept_colls='live')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar')
assert b'HTTP/1.1 302' in resp.body
buff = BytesIO(resp.body)
record = ArcWarcRecordLoader().parse_record_stream(buff)
assert ('Set-Cookie', 'name=value; Path=/') in record.status_headers.headers
assert ('Set-Cookie', 'foo=bar; Path=/') in record.status_headers.headers
stored_req, stored_resp = self._load_resp_req(warc_path)
assert ('Set-Cookie', 'name=value; Path=/') not in stored_resp.status_headers.headers
assert ('Set-Cookie', 'foo=bar; Path=/') not in stored_resp.status_headers.headers
assert ('X-Other', 'foo') in stored_req.status_headers.headers
assert ('Cookie', 'boo=far') not in stored_req.status_headers.headers
def test_record_skip_wrong_coll(self):
recorder_app = RecorderApp(self.upstream_url,
writer=PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='not-live')
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/', 2)
def test_record_param_user_coll(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = self._get_dedup_index()
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
self._test_all_warcs('/warcs/', 2)
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 1)
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 1
cdx = CDXObject(res[0])
assert cdx['urlkey'] == 'org,httpbin)/get?foo=bar'
assert cdx['mime'] == 'application/json'
assert cdx['offset'] == '0'
assert cdx['filename'].startswith('USER/COLL/')
assert cdx['filename'].endswith('.warc.gz')
warcs = r.hgetall('USER:COLL:warc')
full_path = self.root_dir + '/warcs/' + cdx['filename']
assert warcs == {cdx['filename'].encode('utf-8'): full_path.encode('utf-8')}
def test_record_param_user_coll_same_dir(self):
warc_path = to_path(self.root_dir + '/warcs2/')
dedup_index = self._get_dedup_index()
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(warc_path, dedup_index=dedup_index, key_template='{user}:{coll}'))
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER2&param.recorder.coll=COLL2')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER2&param.recorder.coll=COLL3')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs2', 2)
def test_record_param_user_coll_revisit(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = self._get_dedup_index()
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
self._test_all_warcs('/warcs/', 2)
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 2)
# Test Redis CDX
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
cdx = CDXObject(res[1])
assert cdx['urlkey'] == 'org,httpbin)/get?foo=bar'
assert cdx['mime'] == 'warc/revisit'
assert cdx['offset'] == '0'
assert cdx['filename'].startswith('USER/COLL/')
assert cdx['filename'].endswith('.warc.gz')
fullwarc = os.path.join(self.root_dir, 'warcs', cdx['filename'])
warcs = r.hgetall('USER:COLL:warc')
assert len(warcs) == 2
assert warcs[cdx['filename'].encode('utf-8')] == fullwarc.encode('utf-8')
with open(fullwarc, 'rb') as fh:
decomp = DecompressingBufferedReader(fh)
# Test refers-to headers
status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(decomp)
assert status_headers.get_header('WARC-Type') == 'revisit'
assert status_headers.get_header('WARC-Target-URI') == 'http://httpbin.org/get?foo=bar'
assert status_headers.get_header('WARC-Date') != ''
assert status_headers.get_header('WARC-Refers-To-Target-URI') == 'http://httpbin.org/get?foo=bar'
assert status_headers.get_header('WARC-Refers-To-Date') != ''
def test_record_param_user_coll_skip(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = self._get_dedup_index(dupe_policy=SkipDupePolicy())
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(warc_path, dedup_index=dedup_index))
# No new entries written
self._test_all_warcs('/warcs/', 2)
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 2)
# Test Redis CDX
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
def test_record_param_user_coll_write_dupe_no_revisit(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = self._get_dedup_index(dupe_policy=WriteDupePolicy())
writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer)
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.user=USER&param.recorder.coll=COLL')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
self._test_all_warcs('/warcs/USER/COLL/', 3)
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('USER:COLL:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 3
mimes = [CDXObject(x)['mime'] for x in res]
assert sorted(mimes) == ['application/json', 'application/json', 'warc/revisit']
assert len(writer.fh_cache) == 0
# Keep Open
def test_record_file_warc_keep_open(self):
path = to_path(self.root_dir + '/warcs/A.warc.gz')
writer = MultiFileWARCWriter(path)
recorder_app = RecorderApp(self.upstream_url, writer)
resp = self._test_warc_write(recorder_app, 'httpbin.org', '/get?foo=bar')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
assert os.path.isfile(path)
assert len(writer.fh_cache) == 1
def test_record_multiple_writes_keep_open(self):
warc_path = to_path(self.root_dir + '/warcs/FOO/ABC-{hostname}-{timestamp}.warc.gz')
rel_path = self.root_dir + '/warcs/'
dedup_index = self._get_dedup_index(user=False)
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer)
# First Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.coll=FOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
# Second Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=FOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"boo": "far"' in resp.body
self._test_all_warcs('/warcs/FOO/', 1)
# Check two records in WARC
r = FakeStrictRedis.from_url('redis://localhost/2')
res = r.zrangebylex('FOO:cdxj', '[org,httpbin)/', '(org,httpbin,')
assert len(res) == 2
files, coll_dir = self._test_all_warcs('/warcs/FOO/', 1)
fullname = coll_dir + files[0]
cdxout = BytesIO()
with open(fullname, 'rb') as fh:
filename = os.path.relpath(fullname, rel_path)
write_cdx_index(cdxout, fh, filename,
cdxj=True, append_post=True, sort=True)
res = [CDXObject(x) for x in res]
cdxres = cdxout.getvalue().strip()
cdxres = cdxres.split(b'\n')
cdxres = [CDXObject(x) for x in cdxres]
assert cdxres == res
assert len(writer.fh_cache) == 1
writer.close_key(self.root_dir + '/warcs/FOO/')
assert len(writer.fh_cache) == 0
writer.close()
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=FOO')
self._test_all_warcs('/warcs/FOO/', 2)
warcs = r.hgetall('FOO:warc')
assert len(warcs) == 2
def test_record_multiple_writes_rollover_idle(self):
warc_path = to_path(self.root_dir + '/warcs/GOO/ABC-{hostname}-{timestamp}.warc.gz')
rel_path = self.root_dir + '/warcs/'
dedup_index = self._get_dedup_index(user=False)
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9)
recorder_app = RecorderApp(self.upstream_url, writer)
# First Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?foo=bar', '&param.recorder.coll=GOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"foo": "bar"' in resp.body
# Second Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?boo=far', '&param.recorder.coll=GOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"boo": "far"' in resp.body
self._test_all_warcs('/warcs/GOO/', 1)
time.sleep(1.0)
writer.close_idle_files()
# Third Record
resp = self._test_warc_write(recorder_app, 'httpbin.org',
'/get?goo=bar', '&param.recorder.coll=GOO')
assert b'HTTP/1.1 200 OK' in resp.body
assert b'"goo": "bar"' in resp.body
self._test_all_warcs('/warcs/GOO/', 2)
def test_warcinfo_record(self):
simplewriter = SimpleTempWARCWriter(gzip=False)
params = {'software': 'recorder test',
'format': 'WARC File Format 1.0',
'json-metadata': json.dumps({'foo': 'bar'})}
record = simplewriter.create_warcinfo_record('testfile.warc.gz', **params)
simplewriter.write_record(record)
buff = simplewriter.get_buffer()
assert isinstance(buff, bytes)
buff = BytesIO(buff)
parsed_record = ArcWarcRecordLoader().parse_record_stream(buff)
assert parsed_record.rec_headers.get_header('WARC-Type') == 'warcinfo'
assert parsed_record.rec_headers.get_header('Content-Type') == 'application/warc-fields'
assert parsed_record.rec_headers.get_header('WARC-Filename') == 'testfile.warc.gz'
buff = parsed_record.stream.read().decode('utf-8')
length = parsed_record.rec_headers.get_header('Content-Length')
assert len(buff) == int(length)
assert 'json-metadata: {"foo": "bar"}\r\n' in buff
assert 'format: WARC File Format 1.0\r\n' in buff
assert 'json-metadata: {"foo": "bar"}\r\n' in buff
def test_record_custom_record(self):
dedup_index = self._get_dedup_index(user=False)
warc_path = to_path(self.root_dir + '/warcs/meta/meta.warc.gz')
recorder_app = RecorderApp(self.upstream_url,
MultiFileWARCWriter(warc_path, dedup_index=dedup_index))
req_url = '/live/resource/postreq?url=custom://httpbin.org&param.recorder.coll=META&put_record=resource'
buff = b'Some Data'
testapp = webtest.TestApp(recorder_app)
headers = {'content-type': 'text/plain',
'WARC-Custom': 'foo'
}
resp = testapp.put(req_url, headers=headers, params=buff)
self._test_all_warcs('/warcs/meta', 1)
r = FakeStrictRedis.from_url('redis://localhost/2')
warcs = r.hgetall('META:warc')
assert len(warcs) == 1
with open(warcs[b'meta/meta.warc.gz'], 'rb') as fh:
decomp = DecompressingBufferedReader(fh)
record = ArcWarcRecordLoader().parse_record_stream(decomp)
status_headers = record.rec_headers
assert len(record.rec_headers.headers) == 9
assert status_headers.get_header('WARC-Type') == 'resource'
assert status_headers.get_header('WARC-Target-URI') == 'custom://httpbin.org'
assert status_headers.get_header('WARC-Record-ID') != ''
assert status_headers.get_header('WARC-Date') != ''
assert status_headers.get_header('WARC-Block-Digest') != ''
assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest')
assert status_headers.get_header('Content-Type') == 'text/plain'
assert status_headers.get_header('Content-Length') == str(len(buff))
assert status_headers.get_header('WARC-Custom') == 'foo'
assert record.stream.read() == buff
status_headers = record.status_headers
assert len(record.status_headers.headers) == 2
assert status_headers.get_header('Content-Type') == 'text/plain'
assert status_headers.get_header('Content-Length') == str(len(buff))
def test_record_video_metadata(self):
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
dedup_index = self._get_dedup_index()
writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer)
params = {'param.recorder.user': 'USER',
'param.recorder.coll': 'VIDEO',
'content_type': 'application/vnd.youtube-dl_formats+json'
}
resp = self._test_warc_write(recorder_app,
'www.youtube.com', '/v/BfBgWtAIbRc', '&' + urlencode(params),
link_url='metadata://www.youtube.com/v/BfBgWtAIbRc')
r = FakeStrictRedis.from_url('redis://localhost/2')
warcs = r.hgetall('USER:VIDEO:warc')
assert len(warcs) == 1
filename = list(warcs.values())[0]
with open(filename, 'rb') as fh:
decomp = DecompressingBufferedReader(fh)
record = ArcWarcRecordLoader().parse_record_stream(decomp)
status_headers = record.rec_headers
assert status_headers.get_header('WARC-Type') == 'metadata'
assert status_headers.get_header('Content-Type') == 'application/vnd.youtube-dl_formats+json'
assert status_headers.get_header('WARC-Block-Digest') != ''
assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest')