1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00

recorder put custom record: add support for put/post of a custom record. If put_record= param is included, the request body

is written to the specified record type.
move record creation functions to the warcwriter
add tests for custom record
This commit is contained in:
Ilya Kreymer 2016-05-26 20:49:40 -07:00
parent ea3efdf84d
commit 30f9d0aca7
5 changed files with 190 additions and 65 deletions

View File

@ -1,4 +1,5 @@
[run] [run]
concurrency = gevent
omit = omit =
*/test/* */test/*
*/tests/* */tests/*

View File

@ -1,10 +1,6 @@
from webagg.utils import StreamIter, chunk_encode_iter, BUFF_SIZE from webagg.utils import StreamIter, chunk_encode_iter, BUFF_SIZE
from webagg.inputrequest import DirectWSGIInputRequest from webagg.inputrequest import DirectWSGIInputRequest
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from recorder.filters import SkipRangeRequestFilter, CollectionFilter from recorder.filters import SkipRangeRequestFilter, CollectionFilter
from six.moves.urllib.parse import parse_qsl from six.moves.urllib.parse import parse_qsl
@ -27,7 +23,6 @@ class RecorderApp(object):
self.upstream_host = upstream_host self.upstream_host = upstream_host
self.writer = writer self.writer = writer
self.parser = StatusAndHeadersParser([], verify=False)
self.write_queue = gevent.queue.Queue() self.write_queue = gevent.queue.Queue()
gevent.spawn(self._write_loop) gevent.spawn(self._write_loop)
@ -62,8 +57,8 @@ class RecorderApp(object):
req_head, req_pay, resp_head, resp_pay, params = result req_head, req_pay, resp_head, resp_pay, params = result
req = self._create_req_record(req_head, req_pay, 'request') req = self.writer.create_req_record(req_head, req_pay, 'request')
resp = self._create_resp_record(resp_head, resp_pay, 'response') resp = self.writer.create_resp_record(resp_head, resp_pay, 'response')
self.writer.write_req_resp(req, resp, params) self.writer.write_req_resp(req, resp, params)
@ -77,47 +72,66 @@ class RecorderApp(object):
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
def _create_req_record(self, req_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = req_headers
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def _create_resp_record(self, resp_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = self.parser.parse(payload)
warc_headers = CaseInsensitiveDict(warc_headers.headers)
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def send_error(self, exc, start_response): def send_error(self, exc, start_response):
message = json.dumps({'error': repr(exc)}) return self.send_message({'error': repr(exc)},
'400 Bad Request',
start_response)
def send_message(self, msg, status, start_response):
message = json.dumps(msg)
headers = [('Content-Type', 'application/json; charset=utf-8'), headers = [('Content-Type', 'application/json; charset=utf-8'),
('Content-Length', str(len(message)))] ('Content-Length', str(len(message)))]
start_response('400 Bad Request', headers) start_response(status, headers)
return [message.encode('utf-8')] return [message.encode('utf-8')]
def _put_record(self, request_uri, input_buff, record_type,
headers, params, start_response):
req_stream = ReqWrapper(input_buff, headers)
while True:
buff = req_stream.read()
if not buff:
break
content_type = headers.get('Content-Type')
record = self.writer.create_custom_record(params['url'],
req_stream.out,
record_type,
content_type,
req_stream.headers)
self.writer.write_record(record, params)
return self.send_message({'success': 'true'},
'200 OK',
start_response)
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
input_req = DirectWSGIInputRequest(environ) input_req = DirectWSGIInputRequest(environ)
headers = input_req.get_req_headers()
method = input_req.get_req_method() params = dict(parse_qsl(environ.get('QUERY_STRING')))
request_uri = input_req.get_full_request_uri() request_uri = input_req.get_full_request_uri()
input_buff = input_req.get_req_body() input_buff = input_req.get_req_body()
params = dict(parse_qsl(environ.get('QUERY_STRING'))) headers = input_req.get_req_headers()
method = input_req.get_req_method()
# write request body as metadata/resource
put_record = params.get('put_record')
if put_record and method in ('PUT', 'POST'):
return self._put_record(request_uri,
input_buff,
put_record,
headers,
params,
start_response)
skipping = any(x.skip_request(headers) for x in self.skip_filters) skipping = any(x.skip_request(headers) for x in self.skip_filters)

View File

@ -52,9 +52,16 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port)
def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()): def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy(), user=True):
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{user}:{coll}:cdxj', if user:
file_key_template='{user}:{coll}:warc', file_key_template = '{user}:{coll}:warc'
redis_url = 'redis://localhost/2/{user}:{coll}:cdxj'
else:
file_key_template = '{coll}:warc'
redis_url = 'redis://localhost/2/{coll}:cdxj'
dedup_index = WritableRedisIndexer(redis_url=redis_url,
file_key_template=file_key_template,
rel_path_template=self.root_dir + '/warcs/', rel_path_template=self.root_dir + '/warcs/',
dupe_policy=dupe_policy) dupe_policy=dupe_policy)
@ -340,10 +347,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
rel_path = self.root_dir + '/warcs/' rel_path = self.root_dir + '/warcs/'
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', dedup_index = self._get_dedup_index(user=False)
file_key_template='{coll}:warc',
rel_path_template=rel_path)
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index) writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index)
recorder_app = RecorderApp(self.upstream_url, writer) recorder_app = RecorderApp(self.upstream_url, writer)
@ -409,9 +413,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
rel_path = self.root_dir + '/warcs/' rel_path = self.root_dir + '/warcs/'
dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', dedup_index = self._get_dedup_index(user=False)
file_key_template='{coll}:warc',
rel_path_template=rel_path)
writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9) writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9)
recorder_app = RecorderApp(self.upstream_url, writer) recorder_app = RecorderApp(self.upstream_url, writer)
@ -472,3 +474,54 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
assert 'format: WARC File Format 1.0\r\n' in buff assert 'format: WARC File Format 1.0\r\n' in buff
assert 'json-metadata: {"foo": "bar"}\r\n' in buff assert 'json-metadata: {"foo": "bar"}\r\n' in buff
def test_record_custom_record(self):
dedup_index = self._get_dedup_index(user=False)
warc_path = to_path(self.root_dir + '/warcs/meta/meta.warc.gz')
recorder_app = RecorderApp(self.upstream_url,
MultiFileWARCWriter(warc_path, dedup_index=dedup_index))
req_url = '/live/resource/postreq?url=custom://httpbin.org&param.recorder.coll=META&put_record=resource'
buff = b'Some Data'
testapp = webtest.TestApp(recorder_app)
headers = {'content-type': 'text/plain',
'WARC-Custom': 'foo'
}
resp = testapp.put(req_url, headers=headers, params=buff)
self._test_all_warcs('/warcs/meta', 1)
r = FakeStrictRedis.from_url('redis://localhost/2')
warcs = r.hgetall('META:warc')
assert len(warcs) == 1
with open(warcs[b'meta/meta.warc.gz'], 'rb') as fh:
decomp = DecompressingBufferedReader(fh)
record = ArcWarcRecordLoader().parse_record_stream(decomp)
status_headers = record.rec_headers
assert len(record.rec_headers.headers) == 9
assert status_headers.get_header('WARC-Type') == 'resource'
assert status_headers.get_header('WARC-Target-URI') == 'custom://httpbin.org'
assert status_headers.get_header('WARC-Record-ID') != ''
assert status_headers.get_header('WARC-Date') != ''
assert status_headers.get_header('WARC-Block-Digest') != ''
assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest')
assert status_headers.get_header('Content-Type') == 'text/plain'
assert status_headers.get_header('Content-Length') == str(len(buff))
assert status_headers.get_header('WARC-Custom') == 'foo'
assert record.stream.read() == buff
status_headers = record.status_headers
assert len(record.status_headers.headers) == 2
assert status_headers.get_header('Content-Type') == 'text/plain'
assert status_headers.get_header('Content-Length') == str(len(buff))

View File

@ -21,8 +21,11 @@ from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.warc.recordloader import ArcWarcRecord from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from requests.structures import CaseInsensitiveDict
from webagg.utils import ParamFormatter, res_template from webagg.utils import ParamFormatter, res_template
from recorder.filters import ExcludeNone from recorder.filters import ExcludeNone
@ -51,6 +54,8 @@ class BaseWARCWriter(object):
self.header_filter = header_filter self.header_filter = header_filter
self.hostname = gethostname() self.hostname = gethostname()
self.parser = StatusAndHeadersParser([], verify=False)
def ensure_digest(self, record): def ensure_digest(self, record):
block_digest = record.rec_headers.get('WARC-Block-Digest') block_digest = record.rec_headers.get('WARC-Block-Digest')
payload_digest = record.rec_headers.get('WARC-Payload-Digest') payload_digest = record.rec_headers.get('WARC-Payload-Digest')
@ -62,7 +67,8 @@ class BaseWARCWriter(object):
pos = record.stream.tell() pos = record.stream.tell()
block_digester.update(record.status_headers.headers_buff) if record.status_headers and hasattr(record.status_headers, 'headers_buff'):
block_digester.update(record.status_headers.headers_buff)
while True: while True:
buf = record.stream.read(self.BUFF_SIZE) buf = record.stream.read(self.BUFF_SIZE)
@ -100,11 +106,6 @@ class BaseWARCWriter(object):
if resp_id: if resp_id:
req.rec_headers['WARC-Concurrent-To'] = resp_id req.rec_headers['WARC-Concurrent-To'] = resp_id
self._set_header_buff(req)
self._set_header_buff(resp)
self.ensure_digest(resp)
resp = self._check_revisit(resp, params) resp = self._check_revisit(resp, params)
if not resp: if not resp:
print('Skipping due to dedup') print('Skipping due to dedup')
@ -113,13 +114,45 @@ class BaseWARCWriter(object):
params['_formatter'] = ParamFormatter(params, name=self.rec_source_name) params['_formatter'] = ParamFormatter(params, name=self.rec_source_name)
self._do_write_req_resp(req, resp, params) self._do_write_req_resp(req, resp, params)
def create_req_record(self, req_headers, payload, type_, content_type=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = req_headers
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, content_type, len_)
self._set_header_buff(record)
return record
def create_resp_record(self, resp_headers, payload, type_, content_type=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = self.parser.parse(payload)
warc_headers = CaseInsensitiveDict(warc_headers.headers)
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, content_type, len_)
self._set_header_buff(record)
self.ensure_digest(record)
return record
def create_warcinfo_record(self, filename, **kwargs): def create_warcinfo_record(self, filename, **kwargs):
headers = {} warc_headers = {}
headers['WARC-Record_ID'] = self._make_warc_id() warc_headers['WARC-Record-ID'] = self._make_warc_id()
headers['WARC-Type'] = 'warcinfo' warc_headers['WARC-Type'] = 'warcinfo'
if filename: if filename:
headers['WARC-Filename'] = filename warc_headers['WARC-Filename'] = filename
headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow())
warcinfo = BytesIO() warcinfo = BytesIO()
for n, v in six.iteritems(kwargs): for n, v in six.iteritems(kwargs):
@ -127,11 +160,29 @@ class BaseWARCWriter(object):
warcinfo.seek(0) warcinfo.seek(0)
record = ArcWarcRecord('warc', 'warcinfo', headers, warcinfo, record = ArcWarcRecord('warc', 'warcinfo', warc_headers, warcinfo,
None, '', len(warcinfo.getvalue())) None, '', len(warcinfo.getvalue()))
return record return record
def create_custom_record(self, uri, payload, record_type, content_type,
warc_headers=None):
len_ = payload.tell()
payload.seek(0)
warc_headers = warc_headers or {}
warc_headers['WARC-Record-ID'] = self._make_warc_id()
warc_headers['WARC-Type'] = record_type
warc_headers['WARC-Target-URI'] = uri
warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow())
record = ArcWarcRecord('warc', record_type, warc_headers, payload,
None, content_type, len_)
self.ensure_digest(record)
return record
def _check_revisit(self, record, params): def _check_revisit(self, record, params):
if not self.dedup_index: if not self.dedup_index:
return record return record
@ -171,9 +222,10 @@ class BaseWARCWriter(object):
content_type = record.content_type content_type = record.content_type
if not content_type: if not content_type:
content_type = self.WARC_RECORDS[record.rec_headers['WARC-Type']] content_type = self.WARC_RECORDS.get(record.rec_headers['WARC-Type'])
self._header(out, 'Content-Type', content_type) if content_type:
self._header(out, 'Content-Type', content_type)
if record.rec_headers['WARC-Type'] == 'revisit': if record.rec_headers['WARC-Type'] == 'revisit':
http_headers_only = True http_headers_only = True
@ -320,6 +372,11 @@ class MultiFileWARCWriter(BaseWARCWriter):
def _is_write_req(self, req, params): def _is_write_req(self, req, params):
return True return True
def write_record(self, record, params=None):
params = params or {}
params['_formatter'] = ParamFormatter(params, name=self.rec_source_name)
self._do_write_req_resp(None, record, params)
def _do_write_req_resp(self, req, resp, params): def _do_write_req_resp(self, req, resp, params):
full_dir = res_template(self.dir_template, params) full_dir = res_template(self.dir_template, params)
@ -340,10 +397,10 @@ class MultiFileWARCWriter(BaseWARCWriter):
start = out.tell() start = out.tell()
if self._is_write_resp(resp, params): if resp and self._is_write_resp(resp, params):
self._write_warc_record(out, resp) self._write_warc_record(out, resp)
if self._is_write_req(req, params): if req and self._is_write_req(req, params):
self._write_warc_record(out, req) self._write_warc_record(out, req)
out.flush() out.flush()
@ -420,7 +477,7 @@ class SimpleTempWARCWriter(BaseWARCWriter):
self._write_warc_record(self.out, resp) self._write_warc_record(self.out, resp)
self._write_warc_record(self.out, req) self._write_warc_record(self.out, req)
def write_record(self, record): def write_record(self, record, params=None):
self._write_warc_record(self.out, record) self._write_warc_record(self.out, record)
def get_buffer(self): def get_buffer(self):

View File

@ -353,8 +353,8 @@ foo=bar&test=abc"""
def test_error_fallback_live_not_found(self): def test_error_fallback_live_not_found(self):
resp = self.testapp.get('/fallback/resource?url=http://invalid.url-not-found', status=400) resp = self.testapp.get('/fallback/resource?url=http://invalid.url-not-found', status=400)
assert resp.json == {'message': 'http://invalid.url-not-found', assert resp.json == {'message': 'http://invalid.url-not-found/',
'errors': {'LiveWebLoader': 'http://invalid.url-not-found'}} 'errors': {'LiveWebLoader': 'http://invalid.url-not-found/'}}
assert resp.text == resp.headers['ResErrors'] assert resp.text == resp.headers['ResErrors']