From 37198767ed3b0c1c4314f8b1a35e334c65381eac Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 19 Feb 2016 17:27:19 -0800 Subject: [PATCH] add utils, responseloader and liverec --- liverec.py | 245 ++++++++++++++++++++++++++++++++++++++++++++++ responseloader.py | 118 ++++++++++++++++++++++ utils.py | 122 +++++++++++++++++++++++ 3 files changed, 485 insertions(+) create mode 100644 liverec.py create mode 100644 responseloader.py create mode 100644 utils.py diff --git a/liverec.py b/liverec.py new file mode 100644 index 00000000..eb375e3f --- /dev/null +++ b/liverec.py @@ -0,0 +1,245 @@ +from io import BytesIO + +try: + import httplib +except ImportError: + import http.client as httplib + + +orig_connection = httplib.HTTPConnection + +from contextlib import contextmanager + +import ssl +from array import array + +from time import sleep + + +BUFF_SIZE = 8192 + + +# ============================================================================ +class RecordingStream(object): + def __init__(self, fp, recorder): + self.fp = fp + self.recorder = recorder + self.incomplete = False + + if hasattr(self.fp, 'unread'): + self.unread = self.fp.unread + + if hasattr(self.fp, 'tell'): + self.tell = self.fp.tell + + def read(self, *args, **kwargs): + buff = self.fp.read(*args, **kwargs) + self.recorder.write_response_buff(buff) + return buff + + def readinto(self, buff): + res = self.fp.readinto(buff) + self.recorder.write_response_buff(buff) + return res + + def readline(self, maxlen=None): + line = self.fp.readline(maxlen) + self.recorder.write_response_header_line(line) + return line + + def flush(self): + self.fp.flush() + + def close(self): + try: + self.recorder.finish_response(self.incomplete) + except Exception as e: + import traceback + traceback.print_exc() + + res = self.fp.close() + return res + + +# ============================================================================ +class RecordingHTTPResponse(httplib.HTTPResponse): + def __init__(self, recorder, *args, **kwargs): + httplib.HTTPResponse.__init__(self, *args, **kwargs) + self.fp = RecordingStream(self.fp, recorder) + + def mark_incomplete(self): + self.fp.incomplete = True + + +# ============================================================================ +class RecordingHTTPConnection(httplib.HTTPConnection): + global_recorder_maker = None + + def __init__(self, *args, **kwargs): + orig_connection.__init__(self, *args, **kwargs) + if not self.global_recorder_maker: + self.recorder = None + else: + self.recorder = self.global_recorder_maker() + + def make_recording_response(*args, **kwargs): + return RecordingHTTPResponse(self.recorder, *args, **kwargs) + + self.response_class = make_recording_response + + def send(self, data): + if not self.recorder: + orig_connection.send(self, data) + return + + if hasattr(data,'read') and not isinstance(data, array): + url = None + while True: + buff = data.read(self.BUFF_SIZE) + if not buff: + break + + orig_connection.send(self, buff) + self.recorder.write_request(url, buff) + else: + orig_connection.send(self, data) + self.recorder.write_request(self, data) + + + def get_url(self, data): + try: + buff = BytesIO(data) + line = buff.readline() + + path = line.split(' ', 2)[1] + host = self.host + port = self.port + scheme = 'https' if isinstance(self.sock, ssl.SSLSocket) else 'http' + + url = scheme + '://' + host + if (scheme == 'https' and port != '443') and (scheme == 'http' and port != '80'): + url += ':' + port + + url += path + except Exception as e: + raise + + return url + + + def request(self, *args, **kwargs): + #if self.recorder: + # self.recorder.start_request(self) + + res = orig_connection.request(self, *args, **kwargs) + + if self.recorder: + self.recorder.finish_request(self.sock) + + return res + + +# ============================================================================ +class BaseRecorder(object): + def write_request(self, conn, buff): + #url = conn.get_url() + pass + + def write_response_header_line(self, line): + pass + + def write_response_buff(self, buff): + pass + + def finish_request(self, socket): + pass + + def finish_response(self, incomplete=False): + pass + +#================================================================= +class ReadFullyStream(object): + def __init__(self, stream): + self.stream = stream + + def read(self, *args, **kwargs): + try: + return self.stream.read(*args, **kwargs) + except: + self.mark_incomplete() + raise + + def readline(self, *args, **kwargs): + try: + return self.stream.readline(*args, **kwargs) + except: + self.mark_incomplete() + raise + + def mark_incomplete(self): + if (hasattr(self.stream, '_fp') and + hasattr(self.stream._fp, 'mark_incomplete')): + self.stream._fp.mark_incomplete() + + def close(self): + try: + while True: + buff = self.stream.read(BUFF_SIZE) + sleep(0) + if not buff: + break + + except Exception as e: + import traceback + traceback.print_exc() + self.mark_incomplete() + finally: + self.stream.close() + + +# ============================================================================ +httplib.HTTPConnection = RecordingHTTPConnection +# ============================================================================ + +class DefaultRecorderMaker(object): + def __call__(self): + return BaseRecorder() + + +class FixedRecorder(object): + def __init__(self, recorder): + self.recorder = recorder + + def __call__(self): + return self.recorder + +@contextmanager +def record_requests(url, recorder_maker): + RecordingHTTPConnection.global_recorder_maker = recorder_maker + yield + RecordingHTTPConnection.global_recorder_maker = None + +@contextmanager +def orig_requests(): + httplib.HTTPConnection = orig_connection + yield + httplib.HTTPConnection = RecordingHTTPConnection + + +import requests as patched_requests + +def request(url, method='GET', recorder=None, recorder_maker=None, session=patched_requests, **kwargs): + if kwargs.get('skip_recording'): + recorder_maker = None + elif recorder: + recorder_maker = FixedRecorder(recorder) + elif not recorder_maker: + recorder_maker = DefaultRecorderMaker() + + with record_requests(url, recorder_maker): + kwargs['allow_redirects'] = False + r = session.request(method=method, + url=url, + **kwargs) + + return r diff --git a/responseloader.py b/responseloader.py new file mode 100644 index 00000000..880f1a9d --- /dev/null +++ b/responseloader.py @@ -0,0 +1,118 @@ +from liverec import BaseRecorder +from liverec import request as remote_request + +from pywb.warc.recordloader import ArcWarcRecordLoader, ArchiveLoadFailed +from pywb.utils.timeutils import timestamp_to_datetime + +from io import BytesIO +from bottle import response + +import uuid + + +#============================================================================= +def incr_reader(stream, header=None, size=8192): + if header: + yield header + + while True: + data = stream.read(size) + if data: + yield data + else: + break + + +#============================================================================= +class WARCPathPrefixLoader(object): + def __init__(self, prefix): + self.prefix = prefix + self.record_loader = ArcWarcRecordLoader() + + def __call__(self, cdx): + filename = cdx.get('filename') + offset = cdx.get('offset') + length = cdx.get('length', -1) + + if filename is None or offset is None: + raise Exception + + record = self.record_loader.load(self.prefix + filename, + offset, + length, + no_record_parse=True) + + for n, v in record.rec_headers.headers: + response.headers[n] = v + + return incr_reader(record.stream) + + +#============================================================================= +class HeaderRecorder(BaseRecorder): + def __init__(self, skip_list=None): + self.buff = BytesIO() + self.skip_list = skip_list + self.skipped = [] + + def write_response_header_line(self, line): + if self.accept_header(line): + self.buff.write(line) + + def get_header(self): + return self.buff.getvalue() + + def accept_header(self, line): + if self.skip_list and line.lower().startswith(self.skip_list): + self.skipped.append(line) + return False + + return True + + +#============================================================================= +class LiveWebLoader(object): + SKIP_HEADERS = (b'link', + b'memento-datetime', + b'content-location', + b'x-archive', + b'set-cookie') + + def __call__(self, cdx): + load_url = cdx.get('load_url') + if not load_url: + raise Exception + + recorder = HeaderRecorder(self.SKIP_HEADERS) + + upstream_res = remote_request(load_url, recorder=recorder, stream=True, + headers={'Accept-Encoding': 'identity'}) + + response.headers['Content-Type'] = 'application/http; msgtype=response' + + response.headers['WARC-Type'] = 'response' + response.headers['WARC-Record-ID'] = self._make_warc_id() + response.headers['WARC-Target-URI'] = cdx['url'] + response.headers['WARC-Date'] = self._make_date(cdx['timestamp']) + + # Try to set content-length, if it is available and valid + try: + content_len = int(upstream_res.headers.get('content-length', 0)) + if content_len > 0: + content_len += len(recorder.get_header()) + response.headers['Content-Length'] = content_len + except: + pass + + return incr_reader(upstream_res.raw, header=recorder.get_header()) + + @staticmethod + def _make_date(ts): + return timestamp_to_datetime(ts).strftime('%Y-%m-%dT%H:%M:%SZ') + + @staticmethod + def _make_warc_id(id_=None): + if not id_: + id_ = uuid.uuid1() + return ''.format(id_) + diff --git a/utils.py b/utils.py new file mode 100644 index 00000000..a5299825 --- /dev/null +++ b/utils.py @@ -0,0 +1,122 @@ +import re, json +from pywb.utils.canonicalize import canonicalize +from pywb.utils.timeutils import timestamp_to_sec, http_date_to_timestamp +from pywb.cdx.cdxobject import CDXObject + + +LINK_SPLIT = re.compile(',\s*(?=[<])') +LINK_SEG_SPLIT = re.compile(';\s*') +LINK_URL = re.compile('<(.*)>') +LINK_PROP = re.compile('([\w]+)="([^"]+)') + + +#================================================================= +class MementoUtils(object): + @staticmethod + def parse_links(link_header, def_name='timemap'): + links = LINK_SPLIT.split(link_header) + results = {} + mementos = [] + + for link in links: + props = LINK_SEG_SPLIT.split(link) + m = LINK_URL.match(props[0]) + if not m: + raise Exception('Invalid Link Url: ' + props[0]) + + result = dict(url=m.group(1)) + key = '' + is_mem = False + + for prop in props[1:]: + m = LINK_PROP.match(prop) + if not m: + raise Exception('Invalid prop ' + prop) + + name = m.group(1) + value = m.group(2) + + if name == 'rel': + if 'memento' in value: + is_mem = True + result[name] = value + elif value == 'self': + key = def_name + else: + key = value + else: + result[name] = value + + if key: + results[key] = result + elif is_mem: + mementos.append(result) + + results['mementos'] = mementos + return results + + @staticmethod + def links_to_json(link_header, def_name='timemap', sort=False): + results = MementoUtils.parse_links(link_header, def_name) + + #meta = MementoUtils.meta_field('timegate', results) + #if meta: + # yield meta + + #meta = MementoUtils.meta_field('timemap', results) + #if meta: + # yield meta + + #meta = MementoUtils.meta_field('original', results) + #if meta: + # yield meta + + original = results['original']['url'] + key = canonicalize(original) + + mementos = results['mementos'] + if sort: + mementos = sorted(mementos) + + def link_iter(): + for val in mementos: + dt = val.get('datetime') + if not dt: + continue + + ts = http_date_to_timestamp(dt) + line = CDXObject() + line['urlkey'] = key + line['timestamp'] = ts + line['url'] = original + line['mem_rel'] = val.get('rel', '') + line['memento_url'] = val['url'] + yield line + + return original, link_iter + + @staticmethod + def meta_field(name, results): + v = results.get(name) + if v: + c = CDXObject() + c['key'] = '@' + name + c['url'] = v['url'] + return c + + + + +#================================================================= +def cdx_sort_closest(closest, cdx_json): + closest_sec = timestamp_to_sec(closest) + + def get_key(cdx): + sec = timestamp_to_sec(cdx['timestamp']) + return abs(closest_sec - sec) + + cdx_sorted = sorted(cdx_json, key=get_key) + return cdx_sorted + + +