1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

add recorder app, initial pass!

This commit is contained in:
Ilya Kreymer 2016-03-09 14:33:36 -08:00
parent 1499f0e611
commit 31fb2f926f
4 changed files with 513 additions and 0 deletions

0
recorder/__init__.py Normal file
View File

173
recorder/recorderapp.py Normal file
View File

@ -0,0 +1,173 @@
from requests import request as remote_request
from requests.structures import CaseInsensitiveDict
from webagg.liverec import ReadFullyStream
from webagg.responseloader import StreamIter
from webagg.inputrequest import DirectWSGIInputRequest
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from recorder.warcrecorder import SingleFileWARCRecorder, PerRecordWARCRecorder
from recorder.redisindexer import WritableRedisIndexer
from six.moves.urllib.parse import parse_qsl
import json
import tempfile
import traceback
import gevent.queue
import gevent
#==============================================================================
write_queue = gevent.queue.Queue()
#==============================================================================
class RecorderApp(object):
def __init__(self, upstream_host, writer):
self.upstream_host = upstream_host
self.writer = writer
self.parser = StatusAndHeadersParser([], verify=False)
gevent.spawn(self._do_write)
def _do_write(self):
while True:
try:
result = write_queue.get()
req = None
resp = None
req_head, req_pay, resp_head, resp_pay, params = result
req = self._create_req_record(req_head, req_pay, 'request')
resp = self._create_resp_record(resp_head, resp_pay, 'response')
self.writer.write_req_resp(req, resp, params)
except:
traceback.print_exc()
finally:
try:
if req:
req.stream.close()
if resp:
resp.stream.close()
except Exception as e:
traceback.print_exc()
def _create_req_record(self, req_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
#warc_headers = StatusAndHeaders('WARC/1.0', req_headers.items())
warc_headers = req_headers
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def _create_resp_record(self, req_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = self.parser.parse(payload)
warc_headers = CaseInsensitiveDict(warc_headers.headers)
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def send_error(self, exc, start_response):
message = json.dumps({'error': repr(exc)})
headers = [('Content-Type', 'application/json; charset=utf-8'),
('Content-Length', str(len(message)))]
start_response('400 Bad Request', headers)
return message
def __call__(self, environ, start_response):
request_uri = environ.get('REQUEST_URI')
input_req = DirectWSGIInputRequest(environ)
headers = input_req.get_req_headers()
method = input_req.get_req_method()
params = dict(parse_qsl(environ.get('QUERY_STRING')))
req_stream = Wrapper(input_req.get_req_body(), headers, None)
try:
res = remote_request(url=self.upstream_host + request_uri,
method=method,
data=req_stream,
headers=headers,
allow_redirects=False,
stream=True)
except Exception as e:
traceback.print_exc()
return self.send_error(e, start_response)
start_response('200 OK', list(res.headers.items()))
resp_stream = Wrapper(res.raw, res.headers, req_stream, params)
return StreamIter(ReadFullyStream(resp_stream))
#==============================================================================
class Wrapper(object):
def __init__(self, stream, rec_headers, req_obj=None,
params=None):
self.stream = stream
self.out = self._create_buffer()
self.headers = CaseInsensitiveDict(rec_headers)
for n in rec_headers.keys():
if not n.upper().startswith('WARC-'):
del self.headers[n]
self.req_obj = req_obj
self.params = params
def _create_buffer(self):
return tempfile.SpooledTemporaryFile(max_size=512*1024)
def read(self, limit=-1):
buff = self.stream.read()
self.out.write(buff)
return buff
def close(self):
try:
self.stream.close()
except:
traceback.print_exc()
if not self.req_obj:
return
try:
entry = (self.req_obj.headers, self.req_obj.out,
self.headers, self.out, self.params)
write_queue.put(entry)
self.req_obj = None
except:
traceback.print_exc()
#==============================================================================
application = RecorderApp('http://localhost:8080',
PerRecordWARCRecorder('./warcs/{user}/{coll}/',
dedup_index=WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', 'recorder')))

57
recorder/redisindexer.py Normal file
View File

@ -0,0 +1,57 @@
from pywb.utils.canonicalize import calc_search_range
from pywb.cdx.cdxobject import CDXObject
from pywb.warc.cdxindexer import write_cdx_index
from pywb.utils.timeutils import timestamp_to_datetime
from pywb.utils.timeutils import datetime_to_iso_date, iso_date_to_timestamp
from io import BytesIO
from webagg.indexsource import RedisIndexSource
from webagg.aggregator import SimpleAggregator
from webagg.utils import res_template
#==============================================================================
class WritableRedisIndexer(RedisIndexSource):
def __init__(self, redis_url, name):
super(WritableRedisIndexer, self).__init__(redis_url)
self.cdx_lookup = SimpleAggregator({name: self})
def add_record(self, stream, params, filename=None):
if not filename and hasattr(stream, 'name'):
filename = stream.name
cdxout = BytesIO()
write_cdx_index(cdxout, stream, filename,
cdxj=True, append_post=True)
z_key = res_template(self.redis_key_template, params)
cdxes = cdxout.getvalue()
for cdx in cdxes.split(b'\n'):
if cdx:
self.redis.zadd(z_key, 0, cdx)
return cdx
def lookup_revisit(self, params, digest, url, iso_dt):
params['url'] = url
params['closest'] = iso_date_to_timestamp(iso_dt)
filters = []
filters.append('!mime:warc/revisit')
if digest and digest != '-':
filters.append('digest:' + digest.split(':')[-1])
params['filter'] = filters
cdx_iter, errs = self.cdx_lookup(params)
for cdx in cdx_iter:
dt = timestamp_to_datetime(cdx['timestamp'])
return ('revisit', cdx['url'],
datetime_to_iso_date(dt))
return None

283
recorder/warcrecorder.py Normal file
View File

@ -0,0 +1,283 @@
import tempfile
import uuid
import base64
import hashlib
import datetime
import zlib
import sys
import os
import six
import traceback
from collections import OrderedDict
from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader
from webagg.utils import ParamFormatter
# ============================================================================
class BaseWARCRecorder(object):
WARC_RECORDS = {'warcinfo': 'application/warc-fields',
'response': 'application/http; msgtype=response',
'revisit': 'application/http; msgtype=response',
'request': 'application/http; msgtype=request',
'metadata': 'application/warc-fields',
}
REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest'
def __init__(self, gzip=True, dedup_index=None):
self.gzip = gzip
self.dedup_index = dedup_index
def ensure_digest(self, record):
block_digest = record.rec_headers.get('WARC-Block-Digest')
payload_digest = record.rec_headers.get('WARC-Payload-Digest')
if block_digest and payload_digest:
return
block_digester = self._create_digester()
payload_digester = self._create_digester()
pos = record.stream.tell()
block_digester.update(record.status_headers.headers_buff)
while True:
buf = record.stream.read(8192)
if not buf:
break
block_digester.update(buf)
payload_digester.update(buf)
record.stream.seek(pos)
record.rec_headers['WARC-Block-Digest'] = str(block_digester)
record.rec_headers['WARC-Payload-Digest'] = str(payload_digester)
def _create_digester(self):
return Digester('sha1')
def _set_header_buff(self, record):
record.status_headers.headers_buff = str(record.status_headers).encode('latin-1') + b'\r\n'
def write_req_resp(self, req, resp, params):
url = resp.rec_headers.get('WARC-Target-Uri')
dt = resp.rec_headers.get('WARC-Date')
if not req.rec_headers.get('WARC-Record-ID'):
req.rec_headers['WARC-Record-ID'] = self._make_warc_id()
req.rec_headers['WARC-Target-Uri'] = url
req.rec_headers['WARC-Date'] = dt
req.rec_headers['WARC-Type'] = 'request'
req.rec_headers['Content-Type'] = req.content_type
resp_id = resp.rec_headers.get('WARC-Record-ID')
if resp_id:
req.rec_headers['WARC-Concurrent-To'] = resp_id
#resp.status_headers.remove_header('Etag')
self._set_header_buff(req)
self._set_header_buff(resp)
self.ensure_digest(resp)
resp = self._check_revisit(resp, params)
if not resp:
print('Skipping due to dedup')
return
self._do_write_req_resp(req, resp, params)
def _check_revisit(self, record, params):
if not self.dedup_index:
return record
try:
url = record.rec_headers.get('WARC-Target-URI')
digest = record.rec_headers.get('WARC-Payload-Digest')
iso_dt = record.rec_headers.get('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
except Exception as e:
traceback.print_exc()
result = None
if result == 'skip':
return None
if isinstance(result, tuple) and result[0] == 'revisit':
record.rec_headers['WARC-Type'] = 'revisit'
record.rec_headers['WARC-Profile'] = self.REVISIT_PROFILE
record.rec_headers['WARC-Refers-To-Target-URI'] = result[1]
record.rec_headers['WARC-Refers-To-Date'] = result[2]
return record
def _write_warc_record(self, out, record):
if self.gzip:
out = GzippingWriter(out)
self._line(out, b'WARC/1.0')
for n, v in six.iteritems(record.rec_headers):
self._header(out, n, v)
content_type = record.content_type
if not content_type:
content_type = self.WARC_RECORDS[record.rec_headers['WARC-Type']]
self._header(out, 'Content-Type', record.content_type)
if record.rec_headers['WARC-Type'] == 'revisit':
http_headers_only = True
else:
http_headers_only = False
if record.length:
actual_len = len(record.status_headers.headers_buff)
if not http_headers_only:
diff = record.stream.tell() - actual_len
actual_len = record.length - diff
self._header(out, 'Content-Length', str(actual_len))
# add empty line
self._line(out, b'')
# write headers and buffer
out.write(record.status_headers.headers_buff)
if not http_headers_only:
out.write(record.stream.read())
# add two lines
self._line(out, b'\r\n')
else:
# add three lines (1 for end of header, 2 for end of record)
self._line(out, b'Content-Length: 0\r\n\r\n')
out.flush()
def _header(self, out, name, value):
if not value:
return
self._line(out, (name + ': ' + str(value)).encode('latin-1'))
def _line(self, out, line):
out.write(line + b'\r\n')
@staticmethod
def _make_warc_id(id_=None):
if not id_:
id_ = uuid.uuid1()
return '<urn:uuid:{0}>'.format(id_)
# ============================================================================
class GzippingWriter(object):
def __init__(self, out):
self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
self.out = out
def write(self, buff):
#if isinstance(buff, str):
# buff = buff.encode('utf-8')
buff = self.compressor.compress(buff)
self.out.write(buff)
def flush(self):
buff = self.compressor.flush()
self.out.write(buff)
self.out.flush()
# ============================================================================
class Digester(object):
def __init__(self, type_='sha1'):
self.type_ = type_
self.digester = hashlib.new(type_)
def update(self, buff):
self.digester.update(buff)
def __eq__(self, string):
digest = str(base64.b32encode(self.digester.digest()))
if ':' in string:
digest = self._type_ + ':' + digest
return string == digest
def __str__(self):
return self.type_ + ':' + to_native_str(base64.b32encode(self.digester.digest()))
# ============================================================================
class SingleFileWARCRecorder(BaseWARCRecorder):
def __init__(self, warcfilename, *args, **kwargs):
super(SingleFileWARCRecorder, self).__init__(*args, **kwargs)
self.warcfilename = warcfilename
def _do_write_req_resp(self, req, resp, params):
print('Writing {0} to {1} '.format(url, self.warcfilename))
with open(self.warcfilename, 'a+b') as out:
start = out.tell()
self._write_warc_record(out, resp)
self._write_warc_record(out, req)
out.flush()
out.seek(start)
if self.dedup_index:
self.dedup_index.add_record(out, params, filename=self.warcfilename)
def add_user_record(self, url, content_type, data):
with open(self.warcfilename, 'a+b') as out:
start = out.tell()
self._write_warc_metadata(out, url, content_type, data)
out.flush()
#out.seek(start)
#if self.indexer:
# self.indexer.add_record(out, self.warcfilename)
# ============================================================================
class PerRecordWARCRecorder(BaseWARCRecorder):
def __init__(self, warcdir, *args, **kwargs):
super(PerRecordWARCRecorder, self).__init__(*args, **kwargs)
self.warcdir = warcdir
def _do_write_req_resp(self, req, resp, params):
resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ')
req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ')
formatter = ParamFormatter(params)
full_dir = formatter.format(self.warcdir)
try:
os.makedirs(full_dir)
except:
pass
resp_filename = os.path.join(full_dir, resp_uuid + '.warc.gz')
req_filename = os.path.join(full_dir, req_uuid + '.warc.gz')
self._write_record(resp_filename, resp, params, True)
self._write_record(req_filename, req, params, False)
def _write_record(self, filename, rec, params, index=False):
with open(filename, 'w+b') as out:
self._write_warc_record(out, rec)
if index and self.dedup_index:
out.seek(0)
self.dedup_index.add_record(out, params, filename=filename)