1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

warcwriter: attempt to separate warc writing semantics from the recorder

use StatusAndHeaders instead of requests CaseInsensitiveDict for consistency
refactor writer api: create_warc_record() for creating new record
copy_warc_record() for copying a full record from a stream
add writer tests, separate from recorder
This commit is contained in:
Ilya Kreymer 2017-03-01 12:50:32 -08:00
parent c66d251a90
commit 3faa55906a
5 changed files with 256 additions and 146 deletions

View File

@ -5,11 +5,12 @@ from pywb.webagg.inputrequest import DirectWSGIInputRequest
from pywb.recorder.filters import SkipRangeRequestFilter, CollectionFilter
from six.moves.urllib.parse import parse_qsl
import six
import json
import tempfile
from requests.structures import CaseInsensitiveDict
#from requests.structures import CaseInsensitiveDict
import requests
import traceback
@ -67,10 +68,15 @@ class RecorderApp(object):
req_head, req_pay, resp_head, resp_pay, params = result
resp_type, resp = self.writer.read_resp_record(resp_head, resp_pay)
#resp_type, resp = self.writer.read_resp_record(resp_head, resp_pay)
resp = self.writer.copy_warc_record(resp_pay)
if resp_type == 'response':
req = self.writer.create_req_record(req_head, req_pay)
if resp.rec_type == 'response':
uri = resp.rec_headers.get_header('WARC-Target-Uri')
req = self.writer.create_warc_record(uri=uri,
record_type='request',
payload=req_pay,
warc_headers_dict=req_head)
self.writer.write_req_resp(req, resp, params)
@ -127,16 +133,16 @@ class RecorderApp(object):
content_type = headers.get('Content-Type')
record = self.writer.create_custom_record(params['url'],
req_stream.out,
record_type,
content_type,
req_stream.headers)
record = self.writer.create_warc_record(uri=params['url'],
record_type=record_type,
payload=req_stream.out,
warc_content_type=content_type,
warc_headers_dict=req_stream.headers)
self.writer.write_record(record, params)
msg = {'success': 'true',
'WARC-Date': record.rec_headers.get('WARC-Date')}
'WARC-Date': record.rec_headers.get_header('WARC-Date')}
finally:
if req_stream:
@ -311,11 +317,11 @@ class RespWrapper(Wrapper):
class ReqWrapper(Wrapper):
def __init__(self, stream, req_headers, params, create_func):
super(ReqWrapper, self).__init__(stream, params, create_func)
self.headers = CaseInsensitiveDict(req_headers)
self.headers = {}
for n in req_headers.keys():
if not n.upper().startswith('WARC-'):
del self.headers[n]
for n in six.iterkeys(req_headers):
if n.upper().startswith('WARC-'):
self.headers[n] = req_headers[n]
def close(self):
# no need to close wsgi.input

View File

@ -94,6 +94,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
files = [x for x in os.listdir(coll_dir) if os.path.isfile(os.path.join(coll_dir, x))]
assert len(files) == num
assert all(x.endswith('.warc.gz') for x in files)
self._verify_content_len(coll_dir, files)
return files, coll_dir
def _load_resp_req(self, base_path):
@ -105,7 +107,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
stored_req = None
with open(os.path.join(base_path, warc), 'rb') as fh:
for rec in ArchiveIterator(fh)():
for rec in ArchiveIterator(fh):
if rec.rec_type == 'response':
stored_resp = rec
elif rec.rec_type == 'request':
@ -115,6 +117,15 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
assert stored_req is not None
return stored_req, stored_resp
def _verify_content_len(self, base_dir, files):
for filename in files:
filename = os.path.join(base_dir, filename)
with open(filename, 'rb') as fh:
for record in ArchiveIterator(fh, no_record_parse=True):
assert record.status_headers == None
assert int(record.rec_headers.get_header('Content-Length')) == record.length
assert record.length == len(record.stream.read())
def test_record_warc_1(self):
recorder_app = RecorderApp(self.upstream_url,
PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')))
@ -168,6 +179,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
assert ('X-Other', 'foo') in stored_req.status_headers.headers
assert ('Cookie', 'boo=far') in stored_req.status_headers.headers
self._test_all_warcs('/warcs/cookiecheck/', 1)
def test_record_cookies_skip_header(self):
warc_path = to_path(self.root_dir + '/warcs/cookieskip/')
header_filter = ExcludeSpecificHeaders(['Set-Cookie', 'Cookie'])
@ -191,6 +204,8 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
assert ('X-Other', 'foo') in stored_req.status_headers.headers
assert ('Cookie', 'boo=far') not in stored_req.status_headers.headers
self._test_all_warcs('/warcs/cookieskip/', 1)
def test_record_skip_wrong_coll(self):
recorder_app = RecorderApp(self.upstream_url,
writer=PerRecordWARCWriter(to_path(self.root_dir + '/warcs/')), accept_colls='not-live')
@ -470,34 +485,6 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
self._test_all_warcs('/warcs/GOO/', 2)
def test_warcinfo_record(self):
simplewriter = SimpleTempWARCWriter(gzip=False)
params = {'software': 'recorder test',
'format': 'WARC File Format 1.0',
'json-metadata': json.dumps({'foo': 'bar'})}
record = simplewriter.create_warcinfo_record('testfile.warc.gz', params)
simplewriter.write_record(record)
buff = simplewriter.get_buffer()
assert isinstance(buff, bytes)
buff = BytesIO(buff)
parsed_record = ArcWarcRecordLoader().parse_record_stream(buff)
assert parsed_record.rec_headers.get_header('WARC-Type') == 'warcinfo'
assert parsed_record.rec_headers.get_header('Content-Type') == 'application/warc-fields'
assert parsed_record.rec_headers.get_header('WARC-Filename') == 'testfile.warc.gz'
buff = parsed_record.stream.read().decode('utf-8')
length = parsed_record.rec_headers.get_header('Content-Length')
assert len(buff) == int(length)
assert 'json-metadata: {"foo": "bar"}\r\n' in buff
assert 'format: WARC File Format 1.0\r\n' in buff
assert 'json-metadata: {"foo": "bar"}\r\n' in buff
def test_record_custom_record(self):
dedup_index = self._get_dedup_index(user=False)
@ -584,4 +571,3 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass)
assert status_headers.get_header('Content-Type') == 'application/vnd.youtube-dl_formats+json'
assert status_headers.get_header('WARC-Block-Digest') != ''
assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest')

View File

@ -0,0 +1,121 @@
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.recorder.warcwriter import SimpleTempWARCWriter
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.warc.archiveiterator import ArchiveIterator
from io import BytesIO
from collections import OrderedDict
import json
# ============================================================================
class FixedTestWARCWriter(SimpleTempWARCWriter):
@classmethod
def _make_warc_id(cls, id_=None):
return '<urn:uuid:12345678-feb0-11e6-8f83-68a86d1772ce>'
@classmethod
def _make_warc_date(cls):
return '2000-01-01T00:00:00Z'
# ============================================================================
class TestWarcWriter(object):
def _validate_record_content_len(self, stream):
for record in ArchiveIterator(stream, no_record_parse=True):
assert record.status_headers == None
assert int(record.rec_headers.get_header('Content-Length')) == record.length
assert record.length == len(record.stream.read())
def test_warcinfo_record(self):
simplewriter = FixedTestWARCWriter(gzip=False)
params = OrderedDict([('software', 'recorder test'),
('format', 'WARC File Format 1.0'),
('json-metadata', json.dumps({'foo': 'bar'}))])
record = simplewriter.create_warcinfo_record('testfile.warc.gz', params)
simplewriter.write_record(record)
buff = simplewriter.get_buffer()
assert isinstance(buff, bytes)
buff = BytesIO(buff)
parsed_record = ArcWarcRecordLoader().parse_record_stream(buff)
assert parsed_record.rec_headers.get_header('WARC-Type') == 'warcinfo'
assert parsed_record.rec_headers.get_header('Content-Type') == 'application/warc-fields'
assert parsed_record.rec_headers.get_header('WARC-Filename') == 'testfile.warc.gz'
buff = parsed_record.stream.read().decode('utf-8')
length = parsed_record.rec_headers.get_header('Content-Length')
assert len(buff) == int(length)
assert 'json-metadata: {"foo": "bar"}\r\n' in buff
assert 'format: WARC File Format 1.0\r\n' in buff
warcinfo_record = '\
WARC/1.0\r\n\
WARC-Type: warcinfo\r\n\
WARC-Record-ID: <urn:uuid:12345678-feb0-11e6-8f83-68a86d1772ce>\r\n\
WARC-Filename: testfile.warc.gz\r\n\
WARC-Date: 2000-01-01T00:00:00Z\r\n\
Content-Type: application/warc-fields\r\n\
Content-Length: 86\r\n\
\r\n\
software: recorder test\r\n\
format: WARC File Format 1.0\r\n\
json-metadata: {"foo": "bar"}\r\n\
\r\n\
\r\n\
'
assert simplewriter.get_buffer().decode('utf-8') == warcinfo_record
def test_generate_response(self):
headers_list = [('Content-Type', 'text/plain; charset="UTF-8"'),
('Custom-Header', 'somevalue')
]
payload = b'some\ntext'
status_headers = StatusAndHeaders('200 OK', headers_list, protocol='HTTP/1.0')
writer = FixedTestWARCWriter(gzip=False)
record = writer.create_warc_record('http://example.com/', 'response',
payload=BytesIO(payload),
length=len(payload),
status_headers=status_headers)
writer.write_record(record)
buff = writer.get_buffer()
self._validate_record_content_len(BytesIO(buff))
warc_record = '\
WARC/1.0\r\n\
WARC-Type: response\r\n\
WARC-Record-ID: <urn:uuid:12345678-feb0-11e6-8f83-68a86d1772ce>\r\n\
WARC-Target-URI: http://example.com/\r\n\
WARC-Date: 2000-01-01T00:00:00Z\r\n\
WARC-Block-Digest: sha1:B6QJ6BNJ3R4B23XXMRKZKHLPGJY2VE4O\r\n\
WARC-Payload-Digest: sha1:B6QJ6BNJ3R4B23XXMRKZKHLPGJY2VE4O\r\n\
Content-Type: application/http; msgtype=response\r\n\
Content-Length: 97\r\n\
\r\n\
HTTP/1.0 200 OK\r\n\
Content-Type: text/plain; charset="UTF-8"\r\n\
Custom-Header: somevalue\r\n\
\r\n\
some\n\
text\
\r\n\
\r\n\
'
assert buff.decode('utf-8') == warc_record

View File

@ -11,8 +11,6 @@ import shutil
import traceback
from collections import OrderedDict
from socket import gethostname
from io import BytesIO
@ -22,15 +20,12 @@ from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.utils.statusandheaders import StatusAndHeadersParser, StatusAndHeaders
from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from requests.structures import CaseInsensitiveDict
from pywb.webagg.utils import res_template, BUFF_SIZE
from pywb.recorder.filters import ExcludeNone
# ============================================================================
class BaseWARCWriter(object):
@ -45,14 +40,18 @@ class BaseWARCWriter(object):
FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
WARC_VERSION = 'WARC/1.0'
def __init__(self, gzip=True, dedup_index=None,
header_filter=ExcludeNone(), *args, **kwargs):
header_filter=None, *args, **kwargs):
self.gzip = gzip
self.dedup_index = dedup_index
self.header_filter = header_filter
self.hostname = gethostname()
self.parser = StatusAndHeadersParser([], verify=False)
self.warc_version = kwargs.get('warc_version', self.WARC_VERSION)
@staticmethod
def _iter_stream(stream):
@ -64,8 +63,8 @@ class BaseWARCWriter(object):
yield buf
def ensure_digest(self, record):
block_digest = record.rec_headers.get('WARC-Block-Digest')
payload_digest = record.rec_headers.get('WARC-Payload-Digest')
block_digest = record.rec_headers.get_header('WARC-Block-Digest')
payload_digest = record.rec_headers.get_header('WARC-Payload-Digest')
if block_digest and payload_digest:
return
@ -82,28 +81,30 @@ class BaseWARCWriter(object):
payload_digester.update(buf)
record.stream.seek(pos)
record.rec_headers['WARC-Block-Digest'] = str(block_digester)
record.rec_headers['WARC-Payload-Digest'] = str(payload_digester)
record.rec_headers.add_header('WARC-Block-Digest', str(block_digester))
record.rec_headers.add_header('WARC-Payload-Digest', str(payload_digester))
def _create_digester(self):
return Digester('sha1')
def _set_header_buff(self, record):
exclude_list = self.header_filter(record)
exclude_list = None
if self.header_filter:
exclude_list = self.header_filter(record)
buff = record.status_headers.to_bytes(exclude_list)
record.status_headers.headers_buff = buff
def write_req_resp(self, req, resp, params):
url = resp.rec_headers.get('WARC-Target-URI')
dt = resp.rec_headers.get('WARC-Date')
url = resp.rec_headers.get_header('WARC-Target-URI')
dt = resp.rec_headers.get_header('WARC-Date')
#req.rec_headers['Content-Type'] = req.content_type
req.rec_headers['WARC-Target-URI'] = url
req.rec_headers['WARC-Date'] = dt
req.rec_headers.replace_header('WARC-Target-URI', url)
req.rec_headers.replace_header('WARC-Date', dt)
resp_id = resp.rec_headers.get('WARC-Record-ID')
resp_id = resp.rec_headers.get_header('WARC-Record-ID')
if resp_id:
req.rec_headers['WARC-Concurrent-To'] = resp_id
req.rec_headers.add_header('WARC-Concurrent-To', resp_id)
resp = self._check_revisit(resp, params)
if not resp:
@ -112,55 +113,13 @@ class BaseWARCWriter(object):
self._do_write_req_resp(req, resp, params)
def create_req_record(self, req_headers, payload):
len_ = payload.tell()
payload.seek(0)
warc_headers = req_headers
warc_headers['WARC-Type'] = 'request'
if not warc_headers.get('WARC-Record-ID'):
warc_headers['WARC-Record-ID'] = self._make_warc_id()
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', 'request', warc_headers, payload,
status_headers, '', len_)
self._set_header_buff(record)
return record
def read_resp_record(self, resp_headers, payload):
len_ = payload.tell()
payload.seek(0)
warc_headers = self.parser.parse(payload)
warc_headers = CaseInsensitiveDict(warc_headers.headers)
record_type = warc_headers.get('WARC-Type', 'response')
if record_type == 'response':
status_headers = self.parser.parse(payload)
else:
status_headers = None
record = ArcWarcRecord('warc', record_type, warc_headers, payload,
status_headers, '', len_)
if record_type == 'response':
self._set_header_buff(record)
self.ensure_digest(record)
return record_type, record
def create_warcinfo_record(self, filename, info):
warc_headers = {}
warc_headers['WARC-Record-ID'] = self._make_warc_id()
warc_headers['WARC-Type'] = 'warcinfo'
warc_headers = StatusAndHeaders(self.warc_version, [])
warc_headers.add_header('WARC-Type', 'warcinfo')
warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
if filename:
warc_headers['WARC-Filename'] = filename
warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow())
warc_headers.add_header('WARC-Filename', filename)
warc_headers.add_header('WARC-Date', self._make_warc_date())
warcinfo = BytesIO()
for n, v in six.iteritems(info):
@ -173,24 +132,54 @@ class BaseWARCWriter(object):
return record
def create_custom_record(self, uri, payload, record_type, content_type,
warc_headers=None):
def copy_warc_record(self, payload):
len_ = payload.tell()
payload.seek(0)
warc_headers = warc_headers or {}
warc_headers['WARC-Record-ID'] = self._make_warc_id()
warc_headers['WARC-Type'] = record_type
warc_headers['WARC-Target-URI'] = uri
warc_headers = self.parser.parse(payload)
if 'WARC-Date' not in warc_headers:
warc_headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow())
record_type = warc_headers.get_header('WARC-Type', 'response')
return self._fill_record(record_type, warc_headers, None, payload, '', len_)
def create_warc_record(self, uri, record_type, payload,
length=None,
warc_content_type='',
warc_headers_dict={},
status_headers=None):
if length is None:
length = payload.tell()
payload.seek(0)
warc_headers = StatusAndHeaders(self.warc_version, list(warc_headers_dict.items()))
warc_headers.replace_header('WARC-Type', record_type)
if not warc_headers.get_header('WARC-Record-ID'):
warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
if uri:
warc_headers.replace_header('WARC-Target-URI', uri)
if not warc_headers.get_header('WARC-Date'):
warc_headers.add_header('WARC-Date', self._make_warc_date())
return self._fill_record(record_type, warc_headers, status_headers,
payload, warc_content_type, length)
def _fill_record(self, record_type, warc_headers, status_headers, payload, warc_content_type, len_):
has_http_headers = (record_type in ('request', 'response', 'revisit'))
if not status_headers and has_http_headers:
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', record_type, warc_headers, payload,
None, content_type, len_)
status_headers, warc_content_type, len_)
self.ensure_digest(record)
if has_http_headers:
self._set_header_buff(record)
return record
def _check_revisit(self, record, params):
@ -198,9 +187,9 @@ class BaseWARCWriter(object):
return record
try:
url = record.rec_headers.get('WARC-Target-URI')
digest = record.rec_headers.get('WARC-Payload-Digest')
iso_dt = record.rec_headers.get('WARC-Date')
url = record.rec_headers.get_header('WARC-Target-URI')
digest = record.rec_headers.get_header('WARC-Payload-Digest')
iso_dt = record.rec_headers.get_header('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
except Exception as e:
traceback.print_exc()
@ -210,11 +199,11 @@ class BaseWARCWriter(object):
return None
if isinstance(result, tuple) and result[0] == 'revisit':
record.rec_headers['WARC-Type'] = 'revisit'
record.rec_headers['WARC-Profile'] = self.REVISIT_PROFILE
record.rec_headers.replace_header('WARC-Type', 'revisit')
record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE)
record.rec_headers['WARC-Refers-To-Target-URI'] = result[1]
record.rec_headers['WARC-Refers-To-Date'] = result[2]
record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1])
record.rec_headers.add_header('WARC-Refers-To-Date', result[2])
return record
@ -222,31 +211,26 @@ class BaseWARCWriter(object):
if self.gzip:
out = GzippingWrapper(out)
self._line(out, b'WARC/1.0')
for n, v in six.iteritems(record.rec_headers):
if n.lower() in ('content-length', 'content-type'):
continue
self._header(out, n, v)
content_type = record.rec_headers.get('Content-Type')
# compute Content-Type
content_type = record.rec_headers.get_header('Content-Type')
if not content_type:
content_type = record.content_type
if not content_type:
content_type = self.WARC_RECORDS.get(record.rec_headers['WARC-Type'])
if not content_type:
content_type = self.WARC_RECORDS.get(record.rec_headers.get_header('WARC-Type'))
if content_type:
self._header(out, 'Content-Type', content_type)
if content_type:
record.rec_headers.replace_header('Content-Type', content_type)
#self._header(out, 'Content-Type', content_type)
if record.rec_headers['WARC-Type'] == 'revisit':
if record.rec_headers.get_header('WARC-Type') == 'revisit':
http_headers_only = True
else:
http_headers_only = False
if record.length:
# compute Content-Length
if record.length or record.status_headers:
actual_len = 0
if record.status_headers:
actual_len = len(record.status_headers.headers_buff)
@ -259,10 +243,14 @@ class BaseWARCWriter(object):
actual_len = record.length - diff
self._header(out, 'Content-Length', str(actual_len))
record.rec_headers.replace_header('Content-Length', str(actual_len))
#self._header(out, 'Content-Length', str(actual_len))
# add empty line
self._line(out, b'')
#self._line(out, b'')
# write record headers
out.write(record.rec_headers.to_bytes())
# write headers buffer, if any
if record.status_headers:
@ -290,12 +278,16 @@ class BaseWARCWriter(object):
def _line(self, out, line):
out.write(line + b'\r\n')
@staticmethod
def _make_warc_id(id_=None):
@classmethod
def _make_warc_id(cls, id_=None):
if not id_:
id_ = uuid.uuid1()
return '<urn:uuid:{0}>'.format(id_)
@classmethod
def _make_warc_date(cls):
return datetime_to_iso_date(datetime.datetime.utcnow())
# ============================================================================
class GzippingWrapper(object):
@ -421,7 +413,7 @@ class MultiFileWARCWriter(BaseWARCWriter):
def _do_write_req_resp(self, req, resp, params):
def write_callback(out, filename):
url = resp.rec_headers.get('WARC-Target-URI')
#url = resp.rec_headers.get_header('WARC-Target-URI')
#print('Writing req/resp {0} to {1} '.format(url, filename))
if resp and self._is_write_resp(resp, params):

View File

@ -26,7 +26,7 @@ class StatusAndHeaders(object):
self.protocol = protocol
self.total_len = total_len
def get_header(self, name):
def get_header(self, name, default_value=None):
"""
return header (name, value)
if found
@ -36,6 +36,11 @@ class StatusAndHeaders(object):
if value[0].lower() == name_lower:
return value[1]
return default_value
def add_header(self, name, value):
self.headers.append((name, value))
def replace_header(self, name, value):
"""
replace header with new value or add new header