diff --git a/.coveragerc b/.coveragerc index 4b25f6fc..bc40de9d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,5 @@ [run] +concurrency = gevent omit = */test/* */tests/* diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index 5572bc45..73d8500f 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -1,10 +1,6 @@ from webagg.utils import StreamIter, chunk_encode_iter, BUFF_SIZE from webagg.inputrequest import DirectWSGIInputRequest -from pywb.utils.statusandheaders import StatusAndHeadersParser -from pywb.warc.recordloader import ArcWarcRecord -from pywb.warc.recordloader import ArcWarcRecordLoader - from recorder.filters import SkipRangeRequestFilter, CollectionFilter from six.moves.urllib.parse import parse_qsl @@ -27,7 +23,6 @@ class RecorderApp(object): self.upstream_host = upstream_host self.writer = writer - self.parser = StatusAndHeadersParser([], verify=False) self.write_queue = gevent.queue.Queue() gevent.spawn(self._write_loop) @@ -62,8 +57,8 @@ class RecorderApp(object): req_head, req_pay, resp_head, resp_pay, params = result - req = self._create_req_record(req_head, req_pay, 'request') - resp = self._create_resp_record(resp_head, resp_pay, 'response') + req = self.writer.create_req_record(req_head, req_pay, 'request') + resp = self.writer.create_resp_record(resp_head, resp_pay, 'response') self.writer.write_req_resp(req, resp, params) @@ -77,47 +72,66 @@ class RecorderApp(object): except Exception as e: traceback.print_exc() - def _create_req_record(self, req_headers, payload, type_, ct=''): - len_ = payload.tell() - payload.seek(0) - - warc_headers = req_headers - status_headers = self.parser.parse(payload) - - record = ArcWarcRecord('warc', type_, warc_headers, payload, - status_headers, ct, len_) - return record - - def _create_resp_record(self, resp_headers, payload, type_, ct=''): - len_ = payload.tell() - payload.seek(0) - - warc_headers = self.parser.parse(payload) - warc_headers = CaseInsensitiveDict(warc_headers.headers) - - status_headers = self.parser.parse(payload) - - record = ArcWarcRecord('warc', type_, warc_headers, payload, - status_headers, ct, len_) - return record - def send_error(self, exc, start_response): - message = json.dumps({'error': repr(exc)}) + return self.send_message({'error': repr(exc)}, + '400 Bad Request', + start_response) + + def send_message(self, msg, status, start_response): + message = json.dumps(msg) headers = [('Content-Type', 'application/json; charset=utf-8'), ('Content-Length', str(len(message)))] - start_response('400 Bad Request', headers) + start_response(status, headers) return [message.encode('utf-8')] + def _put_record(self, request_uri, input_buff, record_type, + headers, params, start_response): + + req_stream = ReqWrapper(input_buff, headers) + + while True: + buff = req_stream.read() + if not buff: + break + + content_type = headers.get('Content-Type') + + record = self.writer.create_custom_record(params['url'], + req_stream.out, + record_type, + content_type, + req_stream.headers) + + self.writer.write_record(record, params) + + return self.send_message({'success': 'true'}, + '200 OK', + start_response) + + def __call__(self, environ, start_response): input_req = DirectWSGIInputRequest(environ) - headers = input_req.get_req_headers() - method = input_req.get_req_method() + + params = dict(parse_qsl(environ.get('QUERY_STRING'))) + request_uri = input_req.get_full_request_uri() input_buff = input_req.get_req_body() - params = dict(parse_qsl(environ.get('QUERY_STRING'))) + headers = input_req.get_req_headers() + + method = input_req.get_req_method() + + # write request body as metadata/resource + put_record = params.get('put_record') + if put_record and method in ('PUT', 'POST'): + return self._put_record(request_uri, + input_buff, + put_record, + headers, + params, + start_response) skipping = any(x.skip_request(headers) for x in self.skip_filters) diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 1283e022..9c4acef6 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -52,9 +52,16 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) - def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy()): - dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{user}:{coll}:cdxj', - file_key_template='{user}:{coll}:warc', + def _get_dedup_index(self, dupe_policy=WriteRevisitDupePolicy(), user=True): + if user: + file_key_template = '{user}:{coll}:warc' + redis_url = 'redis://localhost/2/{user}:{coll}:cdxj' + else: + file_key_template = '{coll}:warc' + redis_url = 'redis://localhost/2/{coll}:cdxj' + + dedup_index = WritableRedisIndexer(redis_url=redis_url, + file_key_template=file_key_template, rel_path_template=self.root_dir + '/warcs/', dupe_policy=dupe_policy) @@ -340,10 +347,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) rel_path = self.root_dir + '/warcs/' - dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', - file_key_template='{coll}:warc', - rel_path_template=rel_path) - + dedup_index = self._get_dedup_index(user=False) writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index) recorder_app = RecorderApp(self.upstream_url, writer) @@ -409,9 +413,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) rel_path = self.root_dir + '/warcs/' - dedup_index = WritableRedisIndexer(redis_url='redis://localhost/2/{coll}:cdxj', - file_key_template='{coll}:warc', - rel_path_template=rel_path) + dedup_index = self._get_dedup_index(user=False) writer = MultiFileWARCWriter(warc_path, dedup_index=dedup_index, max_idle_secs=0.9) recorder_app = RecorderApp(self.upstream_url, writer) @@ -472,3 +474,54 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert 'format: WARC File Format 1.0\r\n' in buff assert 'json-metadata: {"foo": "bar"}\r\n' in buff + def test_record_custom_record(self): + dedup_index = self._get_dedup_index(user=False) + + warc_path = to_path(self.root_dir + '/warcs/meta/meta.warc.gz') + + recorder_app = RecorderApp(self.upstream_url, + MultiFileWARCWriter(warc_path, dedup_index=dedup_index)) + + req_url = '/live/resource/postreq?url=custom://httpbin.org¶m.recorder.coll=META&put_record=resource' + + buff = b'Some Data' + + testapp = webtest.TestApp(recorder_app) + headers = {'content-type': 'text/plain', + 'WARC-Custom': 'foo' + } + + resp = testapp.put(req_url, headers=headers, params=buff) + + self._test_all_warcs('/warcs/meta', 1) + + r = FakeStrictRedis.from_url('redis://localhost/2') + + warcs = r.hgetall('META:warc') + assert len(warcs) == 1 + + with open(warcs[b'meta/meta.warc.gz'], 'rb') as fh: + decomp = DecompressingBufferedReader(fh) + record = ArcWarcRecordLoader().parse_record_stream(decomp) + + status_headers = record.rec_headers + assert len(record.rec_headers.headers) == 9 + assert status_headers.get_header('WARC-Type') == 'resource' + assert status_headers.get_header('WARC-Target-URI') == 'custom://httpbin.org' + assert status_headers.get_header('WARC-Record-ID') != '' + assert status_headers.get_header('WARC-Date') != '' + assert status_headers.get_header('WARC-Block-Digest') != '' + assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest') + assert status_headers.get_header('Content-Type') == 'text/plain' + assert status_headers.get_header('Content-Length') == str(len(buff)) + assert status_headers.get_header('WARC-Custom') == 'foo' + + assert record.stream.read() == buff + + status_headers = record.status_headers + assert len(record.status_headers.headers) == 2 + + assert status_headers.get_header('Content-Type') == 'text/plain' + assert status_headers.get_header('Content-Length') == str(len(buff)) + + diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 3008d9e9..39d791f1 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -21,8 +21,11 @@ from pywb.utils.loaders import LimitReader, to_native_str from pywb.utils.bufferedreaders import BufferedReader from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date +from pywb.utils.statusandheaders import StatusAndHeadersParser from pywb.warc.recordloader import ArcWarcRecord +from pywb.warc.recordloader import ArcWarcRecordLoader +from requests.structures import CaseInsensitiveDict from webagg.utils import ParamFormatter, res_template from recorder.filters import ExcludeNone @@ -51,6 +54,8 @@ class BaseWARCWriter(object): self.header_filter = header_filter self.hostname = gethostname() + self.parser = StatusAndHeadersParser([], verify=False) + def ensure_digest(self, record): block_digest = record.rec_headers.get('WARC-Block-Digest') payload_digest = record.rec_headers.get('WARC-Payload-Digest') @@ -62,7 +67,8 @@ class BaseWARCWriter(object): pos = record.stream.tell() - block_digester.update(record.status_headers.headers_buff) + if record.status_headers and hasattr(record.status_headers, 'headers_buff'): + block_digester.update(record.status_headers.headers_buff) while True: buf = record.stream.read(self.BUFF_SIZE) @@ -100,11 +106,6 @@ class BaseWARCWriter(object): if resp_id: req.rec_headers['WARC-Concurrent-To'] = resp_id - self._set_header_buff(req) - self._set_header_buff(resp) - - self.ensure_digest(resp) - resp = self._check_revisit(resp, params) if not resp: print('Skipping due to dedup') @@ -113,13 +114,45 @@ class BaseWARCWriter(object): params['_formatter'] = ParamFormatter(params, name=self.rec_source_name) self._do_write_req_resp(req, resp, params) + def create_req_record(self, req_headers, payload, type_, content_type=''): + len_ = payload.tell() + payload.seek(0) + + warc_headers = req_headers + status_headers = self.parser.parse(payload) + + record = ArcWarcRecord('warc', type_, warc_headers, payload, + status_headers, content_type, len_) + + self._set_header_buff(record) + + return record + + def create_resp_record(self, resp_headers, payload, type_, content_type=''): + len_ = payload.tell() + payload.seek(0) + + warc_headers = self.parser.parse(payload) + warc_headers = CaseInsensitiveDict(warc_headers.headers) + + status_headers = self.parser.parse(payload) + + record = ArcWarcRecord('warc', type_, warc_headers, payload, + status_headers, content_type, len_) + + self._set_header_buff(record) + + self.ensure_digest(record) + + return record + def create_warcinfo_record(self, filename, **kwargs): - headers = {} - headers['WARC-Record_ID'] = self._make_warc_id() - headers['WARC-Type'] = 'warcinfo' + warc_headers = {} + warc_headers['WARC-Record-ID'] = self._make_warc_id() + warc_headers['WARC-Type'] = 'warcinfo' if filename: - headers['WARC-Filename'] = filename - headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) + warc_headers['WARC-Filename'] = filename + warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) warcinfo = BytesIO() for n, v in six.iteritems(kwargs): @@ -127,11 +160,29 @@ class BaseWARCWriter(object): warcinfo.seek(0) - record = ArcWarcRecord('warc', 'warcinfo', headers, warcinfo, + record = ArcWarcRecord('warc', 'warcinfo', warc_headers, warcinfo, None, '', len(warcinfo.getvalue())) return record + def create_custom_record(self, uri, payload, record_type, content_type, + warc_headers=None): + len_ = payload.tell() + payload.seek(0) + + warc_headers = warc_headers or {} + warc_headers['WARC-Record-ID'] = self._make_warc_id() + warc_headers['WARC-Type'] = record_type + warc_headers['WARC-Target-URI'] = uri + warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) + + record = ArcWarcRecord('warc', record_type, warc_headers, payload, + None, content_type, len_) + + self.ensure_digest(record) + + return record + def _check_revisit(self, record, params): if not self.dedup_index: return record @@ -171,9 +222,10 @@ class BaseWARCWriter(object): content_type = record.content_type if not content_type: - content_type = self.WARC_RECORDS[record.rec_headers['WARC-Type']] + content_type = self.WARC_RECORDS.get(record.rec_headers['WARC-Type']) - self._header(out, 'Content-Type', content_type) + if content_type: + self._header(out, 'Content-Type', content_type) if record.rec_headers['WARC-Type'] == 'revisit': http_headers_only = True @@ -320,6 +372,11 @@ class MultiFileWARCWriter(BaseWARCWriter): def _is_write_req(self, req, params): return True + def write_record(self, record, params=None): + params = params or {} + params['_formatter'] = ParamFormatter(params, name=self.rec_source_name) + self._do_write_req_resp(None, record, params) + def _do_write_req_resp(self, req, resp, params): full_dir = res_template(self.dir_template, params) @@ -340,10 +397,10 @@ class MultiFileWARCWriter(BaseWARCWriter): start = out.tell() - if self._is_write_resp(resp, params): + if resp and self._is_write_resp(resp, params): self._write_warc_record(out, resp) - if self._is_write_req(req, params): + if req and self._is_write_req(req, params): self._write_warc_record(out, req) out.flush() @@ -420,7 +477,7 @@ class SimpleTempWARCWriter(BaseWARCWriter): self._write_warc_record(self.out, resp) self._write_warc_record(self.out, req) - def write_record(self, record): + def write_record(self, record, params=None): self._write_warc_record(self.out, record) def get_buffer(self): diff --git a/webagg/test/test_handlers.py b/webagg/test/test_handlers.py index 7c5a1aff..84bf4e5a 100644 --- a/webagg/test/test_handlers.py +++ b/webagg/test/test_handlers.py @@ -353,8 +353,8 @@ foo=bar&test=abc""" def test_error_fallback_live_not_found(self): resp = self.testapp.get('/fallback/resource?url=http://invalid.url-not-found', status=400) - assert resp.json == {'message': 'http://invalid.url-not-found', - 'errors': {'LiveWebLoader': 'http://invalid.url-not-found'}} + assert resp.json == {'message': 'http://invalid.url-not-found/', + 'errors': {'LiveWebLoader': 'http://invalid.url-not-found/'}} assert resp.text == resp.headers['ResErrors']