diff --git a/recorder/__init__.py b/recorder/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py new file mode 100644 index 00000000..9d77b6ca --- /dev/null +++ b/recorder/recorderapp.py @@ -0,0 +1,173 @@ +from requests import request as remote_request +from requests.structures import CaseInsensitiveDict + +from webagg.liverec import ReadFullyStream +from webagg.responseloader import StreamIter +from webagg.inputrequest import DirectWSGIInputRequest + +from pywb.utils.statusandheaders import StatusAndHeadersParser +from pywb.warc.recordloader import ArcWarcRecord +from pywb.warc.recordloader import ArcWarcRecordLoader + +from recorder.warcrecorder import SingleFileWARCRecorder, PerRecordWARCRecorder +from recorder.redisindexer import WritableRedisIndexer + +from six.moves.urllib.parse import parse_qsl + +import json +import tempfile + +import traceback + +import gevent.queue +import gevent + + +#============================================================================== +write_queue = gevent.queue.Queue() + + +#============================================================================== +class RecorderApp(object): + def __init__(self, upstream_host, writer): + self.upstream_host = upstream_host + + self.writer = writer + self.parser = StatusAndHeadersParser([], verify=False) + + gevent.spawn(self._do_write) + + def _do_write(self): + while True: + try: + result = write_queue.get() + req = None + resp = None + req_head, req_pay, resp_head, resp_pay, params = result + + req = self._create_req_record(req_head, req_pay, 'request') + resp = self._create_resp_record(resp_head, resp_pay, 'response') + + self.writer.write_req_resp(req, resp, params) + + except: + traceback.print_exc() + + finally: + try: + if req: + req.stream.close() + + if resp: + resp.stream.close() + except Exception as e: + traceback.print_exc() + + def _create_req_record(self, req_headers, payload, type_, ct=''): + len_ = payload.tell() + payload.seek(0) + + #warc_headers = StatusAndHeaders('WARC/1.0', req_headers.items()) + warc_headers = req_headers + + status_headers = self.parser.parse(payload) + + record = ArcWarcRecord('warc', type_, warc_headers, payload, + status_headers, ct, len_) + return record + + def _create_resp_record(self, req_headers, payload, type_, ct=''): + len_ = payload.tell() + payload.seek(0) + + warc_headers = self.parser.parse(payload) + warc_headers = CaseInsensitiveDict(warc_headers.headers) + + status_headers = self.parser.parse(payload) + + record = ArcWarcRecord('warc', type_, warc_headers, payload, + status_headers, ct, len_) + return record + + def send_error(self, exc, start_response): + message = json.dumps({'error': repr(exc)}) + headers = [('Content-Type', 'application/json; charset=utf-8'), + ('Content-Length', str(len(message)))] + + start_response('400 Bad Request', headers) + return message + + def __call__(self, environ, start_response): + request_uri = environ.get('REQUEST_URI') + + input_req = DirectWSGIInputRequest(environ) + headers = input_req.get_req_headers() + method = input_req.get_req_method() + + params = dict(parse_qsl(environ.get('QUERY_STRING'))) + + req_stream = Wrapper(input_req.get_req_body(), headers, None) + + try: + res = remote_request(url=self.upstream_host + request_uri, + method=method, + data=req_stream, + headers=headers, + allow_redirects=False, + stream=True) + except Exception as e: + traceback.print_exc() + return self.send_error(e, start_response) + + start_response('200 OK', list(res.headers.items())) + + resp_stream = Wrapper(res.raw, res.headers, req_stream, params) + + return StreamIter(ReadFullyStream(resp_stream)) + + +#============================================================================== +class Wrapper(object): + def __init__(self, stream, rec_headers, req_obj=None, + params=None): + self.stream = stream + self.out = self._create_buffer() + self.headers = CaseInsensitiveDict(rec_headers) + for n in rec_headers.keys(): + if not n.upper().startswith('WARC-'): + del self.headers[n] + + self.req_obj = req_obj + self.params = params + + def _create_buffer(self): + return tempfile.SpooledTemporaryFile(max_size=512*1024) + + def read(self, limit=-1): + buff = self.stream.read() + self.out.write(buff) + return buff + + def close(self): + try: + self.stream.close() + except: + traceback.print_exc() + + if not self.req_obj: + return + + try: + entry = (self.req_obj.headers, self.req_obj.out, + self.headers, self.out, self.params) + write_queue.put(entry) + self.req_obj = None + except: + traceback.print_exc() + + +#============================================================================== +application = RecorderApp('http://localhost:8080', + PerRecordWARCRecorder('./warcs/{user}/{coll}/', + dedup_index=WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', 'recorder'))) + diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py new file mode 100644 index 00000000..f2b3c520 --- /dev/null +++ b/recorder/redisindexer.py @@ -0,0 +1,57 @@ +from pywb.utils.canonicalize import calc_search_range +from pywb.cdx.cdxobject import CDXObject +from pywb.warc.cdxindexer import write_cdx_index +from pywb.utils.timeutils import timestamp_to_datetime +from pywb.utils.timeutils import datetime_to_iso_date, iso_date_to_timestamp + +from io import BytesIO + +from webagg.indexsource import RedisIndexSource +from webagg.aggregator import SimpleAggregator +from webagg.utils import res_template + + +#============================================================================== +class WritableRedisIndexer(RedisIndexSource): + def __init__(self, redis_url, name): + super(WritableRedisIndexer, self).__init__(redis_url) + self.cdx_lookup = SimpleAggregator({name: self}) + + def add_record(self, stream, params, filename=None): + if not filename and hasattr(stream, 'name'): + filename = stream.name + + cdxout = BytesIO() + write_cdx_index(cdxout, stream, filename, + cdxj=True, append_post=True) + + z_key = res_template(self.redis_key_template, params) + + cdxes = cdxout.getvalue() + for cdx in cdxes.split(b'\n'): + if cdx: + self.redis.zadd(z_key, 0, cdx) + + return cdx + + def lookup_revisit(self, params, digest, url, iso_dt): + params['url'] = url + params['closest'] = iso_date_to_timestamp(iso_dt) + + filters = [] + + filters.append('!mime:warc/revisit') + + if digest and digest != '-': + filters.append('digest:' + digest.split(':')[-1]) + + params['filter'] = filters + + cdx_iter, errs = self.cdx_lookup(params) + + for cdx in cdx_iter: + dt = timestamp_to_datetime(cdx['timestamp']) + return ('revisit', cdx['url'], + datetime_to_iso_date(dt)) + + return None diff --git a/recorder/warcrecorder.py b/recorder/warcrecorder.py new file mode 100644 index 00000000..98d49361 --- /dev/null +++ b/recorder/warcrecorder.py @@ -0,0 +1,283 @@ +import tempfile +import uuid +import base64 +import hashlib +import datetime +import zlib +import sys +import os +import six + +import traceback + +from collections import OrderedDict + +from pywb.utils.loaders import LimitReader, to_native_str +from pywb.utils.bufferedreaders import BufferedReader + +from webagg.utils import ParamFormatter + + +# ============================================================================ +class BaseWARCRecorder(object): + WARC_RECORDS = {'warcinfo': 'application/warc-fields', + 'response': 'application/http; msgtype=response', + 'revisit': 'application/http; msgtype=response', + 'request': 'application/http; msgtype=request', + 'metadata': 'application/warc-fields', + } + + REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest' + + def __init__(self, gzip=True, dedup_index=None): + self.gzip = gzip + self.dedup_index = dedup_index + + def ensure_digest(self, record): + block_digest = record.rec_headers.get('WARC-Block-Digest') + payload_digest = record.rec_headers.get('WARC-Payload-Digest') + if block_digest and payload_digest: + return + + block_digester = self._create_digester() + payload_digester = self._create_digester() + + pos = record.stream.tell() + + block_digester.update(record.status_headers.headers_buff) + + while True: + buf = record.stream.read(8192) + if not buf: + break + + block_digester.update(buf) + payload_digester.update(buf) + + record.stream.seek(pos) + record.rec_headers['WARC-Block-Digest'] = str(block_digester) + record.rec_headers['WARC-Payload-Digest'] = str(payload_digester) + + def _create_digester(self): + return Digester('sha1') + + def _set_header_buff(self, record): + record.status_headers.headers_buff = str(record.status_headers).encode('latin-1') + b'\r\n' + + def write_req_resp(self, req, resp, params): + url = resp.rec_headers.get('WARC-Target-Uri') + dt = resp.rec_headers.get('WARC-Date') + + if not req.rec_headers.get('WARC-Record-ID'): + req.rec_headers['WARC-Record-ID'] = self._make_warc_id() + + req.rec_headers['WARC-Target-Uri'] = url + req.rec_headers['WARC-Date'] = dt + req.rec_headers['WARC-Type'] = 'request' + req.rec_headers['Content-Type'] = req.content_type + + resp_id = resp.rec_headers.get('WARC-Record-ID') + if resp_id: + req.rec_headers['WARC-Concurrent-To'] = resp_id + + #resp.status_headers.remove_header('Etag') + + self._set_header_buff(req) + self._set_header_buff(resp) + + self.ensure_digest(resp) + + resp = self._check_revisit(resp, params) + if not resp: + print('Skipping due to dedup') + return + + self._do_write_req_resp(req, resp, params) + + def _check_revisit(self, record, params): + if not self.dedup_index: + return record + + try: + url = record.rec_headers.get('WARC-Target-URI') + digest = record.rec_headers.get('WARC-Payload-Digest') + iso_dt = record.rec_headers.get('WARC-Date') + result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt) + except Exception as e: + traceback.print_exc() + result = None + + if result == 'skip': + return None + + if isinstance(result, tuple) and result[0] == 'revisit': + record.rec_headers['WARC-Type'] = 'revisit' + record.rec_headers['WARC-Profile'] = self.REVISIT_PROFILE + + record.rec_headers['WARC-Refers-To-Target-URI'] = result[1] + record.rec_headers['WARC-Refers-To-Date'] = result[2] + + return record + + def _write_warc_record(self, out, record): + if self.gzip: + out = GzippingWriter(out) + + self._line(out, b'WARC/1.0') + + for n, v in six.iteritems(record.rec_headers): + self._header(out, n, v) + + content_type = record.content_type + if not content_type: + content_type = self.WARC_RECORDS[record.rec_headers['WARC-Type']] + + self._header(out, 'Content-Type', record.content_type) + + if record.rec_headers['WARC-Type'] == 'revisit': + http_headers_only = True + else: + http_headers_only = False + + if record.length: + actual_len = len(record.status_headers.headers_buff) + + if not http_headers_only: + diff = record.stream.tell() - actual_len + actual_len = record.length - diff + + self._header(out, 'Content-Length', str(actual_len)) + + # add empty line + self._line(out, b'') + + # write headers and buffer + out.write(record.status_headers.headers_buff) + + if not http_headers_only: + out.write(record.stream.read()) + + # add two lines + self._line(out, b'\r\n') + else: + # add three lines (1 for end of header, 2 for end of record) + self._line(out, b'Content-Length: 0\r\n\r\n') + + out.flush() + + def _header(self, out, name, value): + if not value: + return + + self._line(out, (name + ': ' + str(value)).encode('latin-1')) + + def _line(self, out, line): + out.write(line + b'\r\n') + + @staticmethod + def _make_warc_id(id_=None): + if not id_: + id_ = uuid.uuid1() + return ''.format(id_) + + +# ============================================================================ +class GzippingWriter(object): + def __init__(self, out): + self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) + self.out = out + + def write(self, buff): + #if isinstance(buff, str): + # buff = buff.encode('utf-8') + buff = self.compressor.compress(buff) + self.out.write(buff) + + def flush(self): + buff = self.compressor.flush() + self.out.write(buff) + self.out.flush() + + +# ============================================================================ +class Digester(object): + def __init__(self, type_='sha1'): + self.type_ = type_ + self.digester = hashlib.new(type_) + + def update(self, buff): + self.digester.update(buff) + + def __eq__(self, string): + digest = str(base64.b32encode(self.digester.digest())) + if ':' in string: + digest = self._type_ + ':' + digest + return string == digest + + def __str__(self): + return self.type_ + ':' + to_native_str(base64.b32encode(self.digester.digest())) + + +# ============================================================================ +class SingleFileWARCRecorder(BaseWARCRecorder): + def __init__(self, warcfilename, *args, **kwargs): + super(SingleFileWARCRecorder, self).__init__(*args, **kwargs) + self.warcfilename = warcfilename + + def _do_write_req_resp(self, req, resp, params): + print('Writing {0} to {1} '.format(url, self.warcfilename)) + with open(self.warcfilename, 'a+b') as out: + start = out.tell() + + self._write_warc_record(out, resp) + self._write_warc_record(out, req) + + out.flush() + out.seek(start) + + if self.dedup_index: + self.dedup_index.add_record(out, params, filename=self.warcfilename) + + def add_user_record(self, url, content_type, data): + with open(self.warcfilename, 'a+b') as out: + start = out.tell() + self._write_warc_metadata(out, url, content_type, data) + out.flush() + + #out.seek(start) + #if self.indexer: + # self.indexer.add_record(out, self.warcfilename) + + +# ============================================================================ +class PerRecordWARCRecorder(BaseWARCRecorder): + def __init__(self, warcdir, *args, **kwargs): + super(PerRecordWARCRecorder, self).__init__(*args, **kwargs) + self.warcdir = warcdir + + def _do_write_req_resp(self, req, resp, params): + resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') + req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') + + formatter = ParamFormatter(params) + full_dir = formatter.format(self.warcdir) + + try: + os.makedirs(full_dir) + except: + pass + + resp_filename = os.path.join(full_dir, resp_uuid + '.warc.gz') + req_filename = os.path.join(full_dir, req_uuid + '.warc.gz') + + self._write_record(resp_filename, resp, params, True) + self._write_record(req_filename, req, params, False) + + def _write_record(self, filename, rec, params, index=False): + with open(filename, 'w+b') as out: + self._write_warc_record(out, rec) + if index and self.dedup_index: + out.seek(0) + self.dedup_index.add_record(out, params, filename=filename) + +