1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-23 06:32:24 +01:00
pywb/recorder/warcwriter.py
Ilya Kreymer 94d6098238 app: separate json_encode() func
compat: py2 fixes
2016-05-11 11:38:59 -07:00

411 lines
12 KiB
Python

import tempfile
import uuid
import base64
import hashlib
import datetime
import zlib
import sys
import os
import six
import traceback
from collections import OrderedDict
from socket import gethostname
from io import BytesIO
import fcntl
from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date
from pywb.warc.recordloader import ArcWarcRecord
from webagg.utils import ParamFormatter, res_template
from recorder.filters import ExcludeNone
# ============================================================================
class BaseWARCWriter(object):
WARC_RECORDS = {'warcinfo': 'application/warc-fields',
'response': 'application/http; msgtype=response',
'revisit': 'application/http; msgtype=response',
'request': 'application/http; msgtype=request',
'metadata': 'application/warc-fields',
}
REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest'
BUFF_SIZE = 8192
FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
def __init__(self, gzip=True, dedup_index=None, name='recorder',
header_filter=ExcludeNone(), *args, **kwargs):
self.gzip = gzip
self.dedup_index = dedup_index
self.rec_source_name = name
self.header_filter = header_filter
self.hostname = gethostname()
def ensure_digest(self, record):
block_digest = record.rec_headers.get('WARC-Block-Digest')
payload_digest = record.rec_headers.get('WARC-Payload-Digest')
if block_digest and payload_digest:
return
block_digester = self._create_digester()
payload_digester = self._create_digester()
pos = record.stream.tell()
block_digester.update(record.status_headers.headers_buff)
while True:
buf = record.stream.read(self.BUFF_SIZE)
if not buf:
break
block_digester.update(buf)
payload_digester.update(buf)
record.stream.seek(pos)
record.rec_headers['WARC-Block-Digest'] = str(block_digester)
record.rec_headers['WARC-Payload-Digest'] = str(payload_digester)
def _create_digester(self):
return Digester('sha1')
def _set_header_buff(self, record):
exclude_list = self.header_filter(record)
buff = record.status_headers.to_bytes(exclude_list)
record.status_headers.headers_buff = buff
def write_req_resp(self, req, resp, params):
url = resp.rec_headers.get('WARC-Target-URI')
dt = resp.rec_headers.get('WARC-Date')
if not req.rec_headers.get('WARC-Record-ID'):
req.rec_headers['WARC-Record-ID'] = self._make_warc_id()
req.rec_headers['WARC-Target-URI'] = url
req.rec_headers['WARC-Date'] = dt
req.rec_headers['WARC-Type'] = 'request'
#req.rec_headers['Content-Type'] = req.content_type
resp_id = resp.rec_headers.get('WARC-Record-ID')
if resp_id:
req.rec_headers['WARC-Concurrent-To'] = resp_id
self._set_header_buff(req)
self._set_header_buff(resp)
self.ensure_digest(resp)
resp = self._check_revisit(resp, params)
if not resp:
print('Skipping due to dedup')
return
params['_formatter'] = ParamFormatter(params, name=self.rec_source_name)
self._do_write_req_resp(req, resp, params)
def create_warcinfo_record(self, filename, **kwargs):
headers = {}
headers['WARC-Record_ID'] = self._make_warc_id()
headers['WARC-Type'] = 'warcinfo'
if filename:
headers['WARC-Filename'] = filename
headers['WARC-Date'] = datetime_to_iso_date(datetime.datetime.utcnow())
warcinfo = BytesIO()
for n, v in six.iteritems(kwargs):
self._header(warcinfo, n, v)
warcinfo.seek(0)
record = ArcWarcRecord('warc', 'warcinfo', headers, warcinfo,
None, '', len(warcinfo.getvalue()))
return record
def _check_revisit(self, record, params):
if not self.dedup_index:
return record
try:
url = record.rec_headers.get('WARC-Target-URI')
digest = record.rec_headers.get('WARC-Payload-Digest')
iso_dt = record.rec_headers.get('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
except Exception as e:
traceback.print_exc()
result = None
if result == 'skip':
return None
if isinstance(result, tuple) and result[0] == 'revisit':
record.rec_headers['WARC-Type'] = 'revisit'
record.rec_headers['WARC-Profile'] = self.REVISIT_PROFILE
record.rec_headers['WARC-Refers-To-Target-URI'] = result[1]
record.rec_headers['WARC-Refers-To-Date'] = result[2]
return record
def _write_warc_record(self, out, record):
if self.gzip:
out = GzippingWrapper(out)
self._line(out, b'WARC/1.0')
for n, v in six.iteritems(record.rec_headers):
if n.lower() in ('content-length', 'content-type'):
continue
self._header(out, n, v)
content_type = record.content_type
if not content_type:
content_type = self.WARC_RECORDS[record.rec_headers['WARC-Type']]
self._header(out, 'Content-Type', content_type)
if record.rec_headers['WARC-Type'] == 'revisit':
http_headers_only = True
else:
http_headers_only = False
if record.length:
actual_len = 0
if record.status_headers:
actual_len = len(record.status_headers.headers_buff)
if not http_headers_only:
diff = record.stream.tell() - actual_len
actual_len = record.length - diff
self._header(out, 'Content-Length', str(actual_len))
# add empty line
self._line(out, b'')
# write headers buffer, if any
if record.status_headers:
out.write(record.status_headers.headers_buff)
if not http_headers_only:
out.write(record.stream.read())
# add two lines
self._line(out, b'\r\n')
else:
# add three lines (1 for end of header, 2 for end of record)
self._line(out, b'Content-Length: 0\r\n\r\n')
out.flush()
def _header(self, out, name, value):
if not value:
return
self._line(out, (name + ': ' + str(value)).encode('latin-1'))
def _line(self, out, line):
out.write(line + b'\r\n')
@staticmethod
def _make_warc_id(id_=None):
if not id_:
id_ = uuid.uuid1()
return '<urn:uuid:{0}>'.format(id_)
# ============================================================================
class GzippingWrapper(object):
def __init__(self, out):
self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
self.out = out
def write(self, buff):
#if isinstance(buff, str):
# buff = buff.encode('utf-8')
buff = self.compressor.compress(buff)
self.out.write(buff)
def flush(self):
buff = self.compressor.flush()
self.out.write(buff)
self.out.flush()
# ============================================================================
class Digester(object):
def __init__(self, type_='sha1'):
self.type_ = type_
self.digester = hashlib.new(type_)
def update(self, buff):
self.digester.update(buff)
def __str__(self):
return self.type_ + ':' + to_native_str(base64.b32encode(self.digester.digest()))
# ============================================================================
class MultiFileWARCWriter(BaseWARCWriter):
def __init__(self, dir_template, filename_template=None, max_size=0,
*args, **kwargs):
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
if not filename_template:
dir_template, filename_template = os.path.split(dir_template)
dir_template += os.path.sep
if not filename_template:
filename_template = self.FILE_TEMPLATE
self.dir_template = dir_template
self.filename_template = filename_template
self.max_size = max_size
self.fh_cache = {}
def _open_file(self, dir_, params):
timestamp = timestamp20_now()
filename = dir_ + res_template(self.filename_template, params,
hostname=self.hostname,
timestamp=timestamp)
path, name = os.path.split(filename)
try:
os.makedirs(path)
except:
pass
fh = open(filename, 'a+b')
if self.dedup_index:
self.dedup_index.add_warc_file(filename, params)
return fh, filename
def _close_file(self, fh):
fcntl.flock(fh, fcntl.LOCK_UN)
fh.close()
def close_file(self, full_dir):
#full_dir = res_template(self.dir_template, params)
result = self.fh_cache.pop(full_dir, None)
if not result:
return
out, filename = result
self._close_file(out)
return filename
def _is_write_resp(self, resp, params):
return True
def _is_write_req(self, req, params):
return True
def _do_write_req_resp(self, req, resp, params):
full_dir = res_template(self.dir_template, params)
result = self.fh_cache.get(full_dir)
close_file = False
if result:
out, filename = result
is_new = False
else:
out, filename = self._open_file(full_dir, params)
is_new = True
try:
url = resp.rec_headers.get('WARC-Target-URI')
print('Writing req/resp {0} to {1} '.format(url, filename))
start = out.tell()
if self._is_write_resp(resp, params):
self._write_warc_record(out, resp)
if self._is_write_req(req, params):
self._write_warc_record(out, req)
out.flush()
new_size = out.tell()
out.seek(start)
if self.dedup_index:
self.dedup_index.add_urls_to_index(out, params,
filename,
new_size - start)
except Exception as e:
traceback.print_exc()
close_file = True
finally:
# check for rollover
if self.max_size and new_size > self.max_size:
close_file = True
if close_file:
self._close_file(out)
if not is_new:
self.fh_cache.pop(full_dir, None)
elif is_new:
fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.fh_cache[full_dir] = (out, filename)
def close(self):
for n, v in self.fh_cache.items():
out, filename = v
self._close_file(out)
self.fh_cache = {}
# ============================================================================
class PerRecordWARCWriter(MultiFileWARCWriter):
def __init__(self, *args, **kwargs):
kwargs['max_size'] = 1
super(PerRecordWARCWriter, self).__init__(*args, **kwargs)
# ============================================================================
class SimpleTempWARCWriter(BaseWARCWriter):
def __init__(self, *args, **kwargs):
super(SimpleTempWARCWriter, self).__init__(*args, **kwargs)
self.out = self._create_buffer()
def _create_buffer(self):
return tempfile.SpooledTemporaryFile(max_size=512*1024)
def _do_write_req_resp(self, req, resp, params):
self._write_warc_record(self.out, resp)
self._write_warc_record(self.out, req)
def write_record(self, record):
self._write_warc_record(self.out, record)
def get_buffer(self):
pos = self.out.tell()
self.out.seek(0)
buff = self.out.read()
self.out.seek(pos)
return buff