diff --git a/pywb/recorder/recorderapp.py b/pywb/recorder/recorderapp.py index bea431c8..03c101cf 100644 --- a/pywb/recorder/recorderapp.py +++ b/pywb/recorder/recorderapp.py @@ -5,11 +5,12 @@ from pywb.webagg.inputrequest import DirectWSGIInputRequest from pywb.recorder.filters import SkipRangeRequestFilter, CollectionFilter from six.moves.urllib.parse import parse_qsl +import six import json import tempfile -from requests.structures import CaseInsensitiveDict +#from requests.structures import CaseInsensitiveDict import requests import traceback @@ -67,10 +68,15 @@ class RecorderApp(object): req_head, req_pay, resp_head, resp_pay, params = result - resp_type, resp = self.writer.read_resp_record(resp_head, resp_pay) + #resp_type, resp = self.writer.read_resp_record(resp_head, resp_pay) + resp = self.writer.copy_warc_record(resp_pay) - if resp_type == 'response': - req = self.writer.create_req_record(req_head, req_pay) + if resp.rec_type == 'response': + uri = resp.rec_headers.get_header('WARC-Target-Uri') + req = self.writer.create_warc_record(uri=uri, + record_type='request', + payload=req_pay, + warc_headers_dict=req_head) self.writer.write_req_resp(req, resp, params) @@ -127,16 +133,16 @@ class RecorderApp(object): content_type = headers.get('Content-Type') - record = self.writer.create_custom_record(params['url'], - req_stream.out, - record_type, - content_type, - req_stream.headers) + record = self.writer.create_warc_record(uri=params['url'], + record_type=record_type, + payload=req_stream.out, + warc_content_type=content_type, + warc_headers_dict=req_stream.headers) self.writer.write_record(record, params) msg = {'success': 'true', - 'WARC-Date': record.rec_headers.get('WARC-Date')} + 'WARC-Date': record.rec_headers.get_header('WARC-Date')} finally: if req_stream: @@ -311,11 +317,11 @@ class RespWrapper(Wrapper): class ReqWrapper(Wrapper): def __init__(self, stream, req_headers, params, create_func): super(ReqWrapper, self).__init__(stream, params, create_func) - self.headers = CaseInsensitiveDict(req_headers) + self.headers = {} - for n in req_headers.keys(): - if not n.upper().startswith('WARC-'): - del self.headers[n] + for n in six.iterkeys(req_headers): + if n.upper().startswith('WARC-'): + self.headers[n] = req_headers[n] def close(self): # no need to close wsgi.input diff --git a/pywb/recorder/test/test_recorder.py b/pywb/recorder/test/test_recorder.py index 7710aa52..122c57ba 100644 --- a/pywb/recorder/test/test_recorder.py +++ b/pywb/recorder/test/test_recorder.py @@ -94,6 +94,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) files = [x for x in os.listdir(coll_dir) if os.path.isfile(os.path.join(coll_dir, x))] assert len(files) == num assert all(x.endswith('.warc.gz') for x in files) + + self._verify_content_len(coll_dir, files) return files, coll_dir def _load_resp_req(self, base_path): @@ -105,7 +107,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) stored_req = None with open(os.path.join(base_path, warc), 'rb') as fh: - for rec in ArchiveIterator(fh)(): + for rec in ArchiveIterator(fh): if rec.rec_type == 'response': stored_resp = rec elif rec.rec_type == 'request': @@ -115,6 +117,15 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert stored_req is not None return stored_req, stored_resp + def _verify_content_len(self, base_dir, files): + for filename in files: + filename = os.path.join(base_dir, filename) + with open(filename, 'rb') as fh: + for record in ArchiveIterator(fh, no_record_parse=True): + assert record.status_headers == None + assert int(record.rec_headers.get_header('Content-Length')) == record.length + assert record.length == len(record.stream.read()) + def test_record_warc_1(self): recorder_app = RecorderApp(self.upstream_url, PerRecordWARCWriter(to_path(self.root_dir + '/warcs/'))) @@ -168,6 +179,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert ('X-Other', 'foo') in stored_req.status_headers.headers assert ('Cookie', 'boo=far') in stored_req.status_headers.headers + self._test_all_warcs('/warcs/cookiecheck/', 1) + def test_record_cookies_skip_header(self): warc_path = to_path(self.root_dir + '/warcs/cookieskip/') header_filter = ExcludeSpecificHeaders(['Set-Cookie', 'Cookie']) @@ -191,6 +204,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert ('X-Other', 'foo') in stored_req.status_headers.headers assert ('Cookie', 'boo=far') not in stored_req.status_headers.headers + self._test_all_warcs('/warcs/cookieskip/', 1) + def test_record_skip_wrong_coll(self): recorder_app = RecorderApp(self.upstream_url, writer=PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='not-live') @@ -470,34 +485,6 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) self._test_all_warcs('/warcs/GOO/', 2) - def test_warcinfo_record(self): - simplewriter = SimpleTempWARCWriter(gzip=False) - params = {'software': 'recorder test', - 'format': 'WARC File Format 1.0', - 'json-metadata': json.dumps({'foo': 'bar'})} - - record = simplewriter.create_warcinfo_record('testfile.warc.gz', params) - simplewriter.write_record(record) - buff = simplewriter.get_buffer() - assert isinstance(buff, bytes) - - buff = BytesIO(buff) - parsed_record = ArcWarcRecordLoader().parse_record_stream(buff) - - assert parsed_record.rec_headers.get_header('WARC-Type') == 'warcinfo' - assert parsed_record.rec_headers.get_header('Content-Type') == 'application/warc-fields' - assert parsed_record.rec_headers.get_header('WARC-Filename') == 'testfile.warc.gz' - - buff = parsed_record.stream.read().decode('utf-8') - - length = parsed_record.rec_headers.get_header('Content-Length') - - assert len(buff) == int(length) - - assert 'json-metadata: {"foo": "bar"}\r\n' in buff - assert 'format: WARC File Format 1.0\r\n' in buff - assert 'json-metadata: {"foo": "bar"}\r\n' in buff - def test_record_custom_record(self): dedup_index = self._get_dedup_index(user=False) @@ -584,4 +571,3 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert status_headers.get_header('Content-Type') == 'application/vnd.youtube-dl_formats+json' assert status_headers.get_header('WARC-Block-Digest') != '' assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest') - diff --git a/pywb/recorder/test/test_writer.py b/pywb/recorder/test/test_writer.py new file mode 100644 index 00000000..40bf1e97 --- /dev/null +++ b/pywb/recorder/test/test_writer.py @@ -0,0 +1,121 @@ +from pywb.utils.statusandheaders import StatusAndHeaders +from pywb.recorder.warcwriter import SimpleTempWARCWriter +from pywb.warc.recordloader import ArcWarcRecordLoader +from pywb.warc.archiveiterator import ArchiveIterator + +from io import BytesIO +from collections import OrderedDict +import json + + +# ============================================================================ +class FixedTestWARCWriter(SimpleTempWARCWriter): + @classmethod + def _make_warc_id(cls, id_=None): + return '' + + @classmethod + def _make_warc_date(cls): + return '2000-01-01T00:00:00Z' + + +# ============================================================================ +class TestWarcWriter(object): + def _validate_record_content_len(self, stream): + for record in ArchiveIterator(stream, no_record_parse=True): + assert record.status_headers == None + assert int(record.rec_headers.get_header('Content-Length')) == record.length + assert record.length == len(record.stream.read()) + + + def test_warcinfo_record(self): + simplewriter = FixedTestWARCWriter(gzip=False) + params = OrderedDict([('software', 'recorder test'), + ('format', 'WARC File Format 1.0'), + ('json-metadata', json.dumps({'foo': 'bar'}))]) + + record = simplewriter.create_warcinfo_record('testfile.warc.gz', params) + simplewriter.write_record(record) + buff = simplewriter.get_buffer() + assert isinstance(buff, bytes) + + buff = BytesIO(buff) + parsed_record = ArcWarcRecordLoader().parse_record_stream(buff) + + assert parsed_record.rec_headers.get_header('WARC-Type') == 'warcinfo' + assert parsed_record.rec_headers.get_header('Content-Type') == 'application/warc-fields' + assert parsed_record.rec_headers.get_header('WARC-Filename') == 'testfile.warc.gz' + + buff = parsed_record.stream.read().decode('utf-8') + + length = parsed_record.rec_headers.get_header('Content-Length') + + assert len(buff) == int(length) + + assert 'json-metadata: {"foo": "bar"}\r\n' in buff + assert 'format: WARC File Format 1.0\r\n' in buff + + warcinfo_record = '\ +WARC/1.0\r\n\ +WARC-Type: warcinfo\r\n\ +WARC-Record-ID: \r\n\ +WARC-Filename: testfile.warc.gz\r\n\ +WARC-Date: 2000-01-01T00:00:00Z\r\n\ +Content-Type: application/warc-fields\r\n\ +Content-Length: 86\r\n\ +\r\n\ +software: recorder test\r\n\ +format: WARC File Format 1.0\r\n\ +json-metadata: {"foo": "bar"}\r\n\ +\r\n\ +\r\n\ +' + + assert simplewriter.get_buffer().decode('utf-8') == warcinfo_record + + def test_generate_response(self): + headers_list = [('Content-Type', 'text/plain; charset="UTF-8"'), + ('Custom-Header', 'somevalue') + ] + + payload = b'some\ntext' + + status_headers = StatusAndHeaders('200 OK', headers_list, protocol='HTTP/1.0') + + + writer = FixedTestWARCWriter(gzip=False) + + record = writer.create_warc_record('http://example.com/', 'response', + payload=BytesIO(payload), + length=len(payload), + status_headers=status_headers) + + + writer.write_record(record) + + buff = writer.get_buffer() + + self._validate_record_content_len(BytesIO(buff)) + + warc_record = '\ +WARC/1.0\r\n\ +WARC-Type: response\r\n\ +WARC-Record-ID: \r\n\ +WARC-Target-URI: http://example.com/\r\n\ +WARC-Date: 2000-01-01T00:00:00Z\r\n\ +WARC-Block-Digest: sha1:B6QJ6BNJ3R4B23XXMRKZKHLPGJY2VE4O\r\n\ +WARC-Payload-Digest: sha1:B6QJ6BNJ3R4B23XXMRKZKHLPGJY2VE4O\r\n\ +Content-Type: application/http; msgtype=response\r\n\ +Content-Length: 97\r\n\ +\r\n\ +HTTP/1.0 200 OK\r\n\ +Content-Type: text/plain; charset="UTF-8"\r\n\ +Custom-Header: somevalue\r\n\ +\r\n\ +some\n\ +text\ +\r\n\ +\r\n\ +' + assert buff.decode('utf-8') == warc_record + diff --git a/pywb/recorder/warcwriter.py b/pywb/recorder/warcwriter.py index 5f26ee24..6c9eca8a 100644 --- a/pywb/recorder/warcwriter.py +++ b/pywb/recorder/warcwriter.py @@ -11,8 +11,6 @@ import shutil import traceback -from collections import OrderedDict - from socket import gethostname from io import BytesIO @@ -22,15 +20,12 @@ from pywb.utils.loaders import LimitReader, to_native_str from pywb.utils.bufferedreaders import BufferedReader from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date -from pywb.utils.statusandheaders import StatusAndHeadersParser +from pywb.utils.statusandheaders import StatusAndHeadersParser, StatusAndHeaders from pywb.warc.recordloader import ArcWarcRecord from pywb.warc.recordloader import ArcWarcRecordLoader -from requests.structures import CaseInsensitiveDict from pywb.webagg.utils import res_template, BUFF_SIZE -from pywb.recorder.filters import ExcludeNone - # ============================================================================ class BaseWARCWriter(object): @@ -45,14 +40,18 @@ class BaseWARCWriter(object): FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' + WARC_VERSION = 'WARC/1.0' + def __init__(self, gzip=True, dedup_index=None, - header_filter=ExcludeNone(), *args, **kwargs): + header_filter=None, *args, **kwargs): + self.gzip = gzip self.dedup_index = dedup_index self.header_filter = header_filter self.hostname = gethostname() self.parser = StatusAndHeadersParser([], verify=False) + self.warc_version = kwargs.get('warc_version', self.WARC_VERSION) @staticmethod def _iter_stream(stream): @@ -64,8 +63,8 @@ class BaseWARCWriter(object): yield buf def ensure_digest(self, record): - block_digest = record.rec_headers.get('WARC-Block-Digest') - payload_digest = record.rec_headers.get('WARC-Payload-Digest') + block_digest = record.rec_headers.get_header('WARC-Block-Digest') + payload_digest = record.rec_headers.get_header('WARC-Payload-Digest') if block_digest and payload_digest: return @@ -82,28 +81,30 @@ class BaseWARCWriter(object): payload_digester.update(buf) record.stream.seek(pos) - record.rec_headers['WARC-Block-Digest'] = str(block_digester) - record.rec_headers['WARC-Payload-Digest'] = str(payload_digester) + record.rec_headers.add_header('WARC-Block-Digest', str(block_digester)) + record.rec_headers.add_header('WARC-Payload-Digest', str(payload_digester)) def _create_digester(self): return Digester('sha1') def _set_header_buff(self, record): - exclude_list = self.header_filter(record) + exclude_list = None + if self.header_filter: + exclude_list = self.header_filter(record) buff = record.status_headers.to_bytes(exclude_list) record.status_headers.headers_buff = buff def write_req_resp(self, req, resp, params): - url = resp.rec_headers.get('WARC-Target-URI') - dt = resp.rec_headers.get('WARC-Date') + url = resp.rec_headers.get_header('WARC-Target-URI') + dt = resp.rec_headers.get_header('WARC-Date') #req.rec_headers['Content-Type'] = req.content_type - req.rec_headers['WARC-Target-URI'] = url - req.rec_headers['WARC-Date'] = dt + req.rec_headers.replace_header('WARC-Target-URI', url) + req.rec_headers.replace_header('WARC-Date', dt) - resp_id = resp.rec_headers.get('WARC-Record-ID') + resp_id = resp.rec_headers.get_header('WARC-Record-ID') if resp_id: - req.rec_headers['WARC-Concurrent-To'] = resp_id + req.rec_headers.add_header('WARC-Concurrent-To', resp_id) resp = self._check_revisit(resp, params) if not resp: @@ -112,55 +113,13 @@ class BaseWARCWriter(object): self._do_write_req_resp(req, resp, params) - def create_req_record(self, req_headers, payload): - len_ = payload.tell() - payload.seek(0) - - warc_headers = req_headers - warc_headers['WARC-Type'] = 'request' - if not warc_headers.get('WARC-Record-ID'): - warc_headers['WARC-Record-ID'] = self._make_warc_id() - - status_headers = self.parser.parse(payload) - - record = ArcWarcRecord('warc', 'request', warc_headers, payload, - status_headers, '', len_) - - self._set_header_buff(record) - - return record - - def read_resp_record(self, resp_headers, payload): - len_ = payload.tell() - payload.seek(0) - - warc_headers = self.parser.parse(payload) - warc_headers = CaseInsensitiveDict(warc_headers.headers) - - record_type = warc_headers.get('WARC-Type', 'response') - - if record_type == 'response': - status_headers = self.parser.parse(payload) - else: - status_headers = None - - record = ArcWarcRecord('warc', record_type, warc_headers, payload, - status_headers, '', len_) - - if record_type == 'response': - self._set_header_buff(record) - - self.ensure_digest(record) - - return record_type, record - def create_warcinfo_record(self, filename, info): - warc_headers = {} - warc_headers['WARC-Record-ID'] = self._make_warc_id() - warc_headers['WARC-Type'] = 'warcinfo' + warc_headers = StatusAndHeaders(self.warc_version, []) + warc_headers.add_header('WARC-Type', 'warcinfo') + warc_headers.add_header('WARC-Record-ID', self._make_warc_id()) if filename: - warc_headers['WARC-Filename'] = filename - warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) + warc_headers.add_header('WARC-Filename', filename) + warc_headers.add_header('WARC-Date', self._make_warc_date()) warcinfo = BytesIO() for n, v in six.iteritems(info): @@ -173,24 +132,54 @@ class BaseWARCWriter(object): return record - def create_custom_record(self, uri, payload, record_type, content_type, - warc_headers=None): + def copy_warc_record(self, payload): len_ = payload.tell() payload.seek(0) - warc_headers = warc_headers or {} - warc_headers['WARC-Record-ID'] = self._make_warc_id() - warc_headers['WARC-Type'] = record_type - warc_headers['WARC-Target-URI'] = uri + warc_headers = self.parser.parse(payload) - if 'WARC-Date' not in warc_headers: - warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow()) + record_type = warc_headers.get_header('WARC-Type', 'response') + + return self._fill_record(record_type, warc_headers, None, payload, '', len_) + + def create_warc_record(self, uri, record_type, payload, + length=None, + warc_content_type='', + warc_headers_dict={}, + status_headers=None): + + if length is None: + length = payload.tell() + payload.seek(0) + + warc_headers = StatusAndHeaders(self.warc_version, list(warc_headers_dict.items())) + warc_headers.replace_header('WARC-Type', record_type) + if not warc_headers.get_header('WARC-Record-ID'): + warc_headers.add_header('WARC-Record-ID', self._make_warc_id()) + + if uri: + warc_headers.replace_header('WARC-Target-URI', uri) + + if not warc_headers.get_header('WARC-Date'): + warc_headers.add_header('WARC-Date', self._make_warc_date()) + + return self._fill_record(record_type, warc_headers, status_headers, + payload, warc_content_type, length) + + def _fill_record(self, record_type, warc_headers, status_headers, payload, warc_content_type, len_): + has_http_headers = (record_type in ('request', 'response', 'revisit')) + + if not status_headers and has_http_headers: + status_headers = self.parser.parse(payload) record = ArcWarcRecord('warc', record_type, warc_headers, payload, - None, content_type, len_) + status_headers, warc_content_type, len_) self.ensure_digest(record) + if has_http_headers: + self._set_header_buff(record) + return record def _check_revisit(self, record, params): @@ -198,9 +187,9 @@ class BaseWARCWriter(object): return record try: - url = record.rec_headers.get('WARC-Target-URI') - digest = record.rec_headers.get('WARC-Payload-Digest') - iso_dt = record.rec_headers.get('WARC-Date') + url = record.rec_headers.get_header('WARC-Target-URI') + digest = record.rec_headers.get_header('WARC-Payload-Digest') + iso_dt = record.rec_headers.get_header('WARC-Date') result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt) except Exception as e: traceback.print_exc() @@ -210,11 +199,11 @@ class BaseWARCWriter(object): return None if isinstance(result, tuple) and result[0] == 'revisit': - record.rec_headers['WARC-Type'] = 'revisit' - record.rec_headers['WARC-Profile'] = self.REVISIT_PROFILE + record.rec_headers.replace_header('WARC-Type', 'revisit') + record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE) - record.rec_headers['WARC-Refers-To-Target-URI'] = result[1] - record.rec_headers['WARC-Refers-To-Date'] = result[2] + record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1]) + record.rec_headers.add_header('WARC-Refers-To-Date', result[2]) return record @@ -222,31 +211,26 @@ class BaseWARCWriter(object): if self.gzip: out = GzippingWrapper(out) - self._line(out, b'WARC/1.0') - - for n, v in six.iteritems(record.rec_headers): - if n.lower() in ('content-length', 'content-type'): - continue - - self._header(out, n, v) - - content_type = record.rec_headers.get('Content-Type') + # compute Content-Type + content_type = record.rec_headers.get_header('Content-Type') if not content_type: content_type = record.content_type - if not content_type: - content_type = self.WARC_RECORDS.get(record.rec_headers['WARC-Type']) + if not content_type: + content_type = self.WARC_RECORDS.get(record.rec_headers.get_header('WARC-Type')) - if content_type: - self._header(out, 'Content-Type', content_type) + if content_type: + record.rec_headers.replace_header('Content-Type', content_type) + #self._header(out, 'Content-Type', content_type) - if record.rec_headers['WARC-Type'] == 'revisit': + if record.rec_headers.get_header('WARC-Type') == 'revisit': http_headers_only = True else: http_headers_only = False - if record.length: + # compute Content-Length + if record.length or record.status_headers: actual_len = 0 if record.status_headers: actual_len = len(record.status_headers.headers_buff) @@ -259,10 +243,14 @@ class BaseWARCWriter(object): actual_len = record.length - diff - self._header(out, 'Content-Length', str(actual_len)) + record.rec_headers.replace_header('Content-Length', str(actual_len)) + #self._header(out, 'Content-Length', str(actual_len)) # add empty line - self._line(out, b'') + #self._line(out, b'') + + # write record headers + out.write(record.rec_headers.to_bytes()) # write headers buffer, if any if record.status_headers: @@ -290,12 +278,16 @@ class BaseWARCWriter(object): def _line(self, out, line): out.write(line + b'\r\n') - @staticmethod - def _make_warc_id(id_=None): + @classmethod + def _make_warc_id(cls, id_=None): if not id_: id_ = uuid.uuid1() return ''.format(id_) + @classmethod + def _make_warc_date(cls): + return datetime_to_iso_date(datetime.datetime.utcnow()) + # ============================================================================ class GzippingWrapper(object): @@ -421,7 +413,7 @@ class MultiFileWARCWriter(BaseWARCWriter): def _do_write_req_resp(self, req, resp, params): def write_callback(out, filename): - url = resp.rec_headers.get('WARC-Target-URI') + #url = resp.rec_headers.get_header('WARC-Target-URI') #print('Writing req/resp {0} to {1} '.format(url, filename)) if resp and self._is_write_resp(resp, params): diff --git a/pywb/utils/statusandheaders.py b/pywb/utils/statusandheaders.py index 82f6caf1..a5edea20 100644 --- a/pywb/utils/statusandheaders.py +++ b/pywb/utils/statusandheaders.py @@ -26,7 +26,7 @@ class StatusAndHeaders(object): self.protocol = protocol self.total_len = total_len - def get_header(self, name): + def get_header(self, name, default_value=None): """ return header (name, value) if found @@ -36,6 +36,11 @@ class StatusAndHeaders(object): if value[0].lower() == name_lower: return value[1] + return default_value + + def add_header(self, name, value): + self.headers.append((name, value)) + def replace_header(self, name, value): """ replace header with new value or add new header