From 49b6ae78a80bff416bdaa3a8b396ef6e1caf1750 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 12 Mar 2016 22:15:24 -0800 Subject: [PATCH] live loader: remove liverec (doesn't work well with gevent), use regular requests instead of overriden version. reconstruct header block from httplib header pairs list move ReadFullyStream to utils --- webagg/app.py | 3 +- webagg/indexsource.py | 3 +- webagg/inputrequest.py | 16 ++- webagg/liverec.py | 246 ----------------------------------- webagg/responseloader.py | 144 +++++++++++--------- webagg/test/test_handlers.py | 4 +- webagg/test/testutils.py | 5 +- webagg/utils.py | 44 +++++++ 8 files changed, 152 insertions(+), 313 deletions(-) delete mode 100644 webagg/liverec.py diff --git a/webagg/app.py b/webagg/app.py index 2745223d..e3302fae 100644 --- a/webagg/app.py +++ b/webagg/app.py @@ -1,9 +1,8 @@ -from webagg.liverec import request as remote_request - from webagg.inputrequest import DirectWSGIInputRequest, POSTInputRequest from bottle import route, request, response, abort, Bottle import bottle +import requests import traceback import json diff --git a/webagg/indexsource.py b/webagg/indexsource.py index c83d3006..b37604ba 100644 --- a/webagg/indexsource.py +++ b/webagg/indexsource.py @@ -8,7 +8,8 @@ from pywb.utils.wbexception import NotFoundException from pywb.cdx.cdxobject import CDXObject -from webagg.liverec import patched_requests as requests +#from webagg.liverec import patched_requests as requests +import requests from webagg.utils import ParamFormatter, res_template from webagg.utils import MementoUtils diff --git a/webagg/inputrequest.py b/webagg/inputrequest.py index 332716a2..f15de60b 100644 --- a/webagg/inputrequest.py +++ b/webagg/inputrequest.py @@ -2,8 +2,8 @@ from pywb.utils.loaders import extract_post_query, append_post_query from pywb.utils.loaders import LimitReader from pywb.utils.statusandheaders import StatusAndHeadersParser -from six.moves.urllib.parse import urlsplit -from six import StringIO, iteritems +from six.moves.urllib.parse import urlsplit, quote +from six import iteritems from io import BytesIO @@ -80,6 +80,18 @@ class DirectWSGIInputRequest(object): return url + def get_full_request_uri(self): + req_uri = self.env.get('REQUEST_URI') + if req_uri: + return req_uri + + req_uri = quote(self.env.get('PATH_INFO', ''), safe='/~!$&\'()*+,;=:@') + query = self.env.get('QUERY_STRING') + if query: + req_uri += '?' + query + + return req_uri + #============================================================================= class POSTInputRequest(DirectWSGIInputRequest): diff --git a/webagg/liverec.py b/webagg/liverec.py deleted file mode 100644 index e0fe1298..00000000 --- a/webagg/liverec.py +++ /dev/null @@ -1,246 +0,0 @@ -from io import BytesIO - -try: - import httplib -except ImportError: - import http.client as httplib - - -orig_connection = httplib.HTTPConnection - -from contextlib import contextmanager - -import ssl -from array import array - -from time import sleep - - -BUFF_SIZE = 8192 - - -# ============================================================================ -class RecordingStream(object): - def __init__(self, fp, recorder): - self.fp = fp - self.recorder = recorder - self.incomplete = False - - if hasattr(self.fp, 'unread'): - self.unread = self.fp.unread - - if hasattr(self.fp, 'tell'): - self.tell = self.fp.tell - - def read(self, *args, **kwargs): - buff = self.fp.read(*args, **kwargs) - self.recorder.write_response_buff(buff) - return buff - - def readinto(self, buff): - res = self.fp.readinto(buff) - self.recorder.write_response_buff(buff) - return res - - def readline(self, maxlen=-1): - line = self.fp.readline(maxlen) - self.recorder.write_response_header_line(line) - return line - - def flush(self): - self.fp.flush() - - def close(self): - try: - self.recorder.finish_response(self.incomplete) - except Exception as e: - import traceback - traceback.print_exc() - - res = self.fp.close() - return res - - -# ============================================================================ -class RecordingHTTPResponse(httplib.HTTPResponse): - def __init__(self, recorder, *args, **kwargs): - httplib.HTTPResponse.__init__(self, *args, **kwargs) - self.fp = RecordingStream(self.fp, recorder) - - def mark_incomplete(self): - self.fp.incomplete = True - - -# ============================================================================ -class RecordingHTTPConnection(httplib.HTTPConnection): - global_recorder_maker = None - - def __init__(self, *args, **kwargs): - orig_connection.__init__(self, *args, **kwargs) - if not self.global_recorder_maker: - self.recorder = None - else: - self.recorder = self.global_recorder_maker() - - def make_recording_response(*args, **kwargs): - return RecordingHTTPResponse(self.recorder, *args, **kwargs) - - self.response_class = make_recording_response - - def send(self, data): - if not self.recorder: - orig_connection.send(self, data) - return - - if hasattr(data,'read') and not isinstance(data, array): - url = None - while True: - buff = data.read(BUFF_SIZE) - if not buff: - break - - orig_connection.send(self, buff) - self.recorder.write_request(url, buff) - else: - orig_connection.send(self, data) - self.recorder.write_request(self, data) - - - def get_url(self, data): - try: - buff = BytesIO(data) - line = buff.readline() - - path = line.split(' ', 2)[1] - host = self.host - port = self.port - scheme = 'https' if isinstance(self.sock, ssl.SSLSocket) else 'http' - - url = scheme + '://' + host - if (scheme == 'https' and port != '443') and (scheme == 'http' and port != '80'): - url += ':' + port - - url += path - except Exception as e: - raise - - return url - - - def request(self, *args, **kwargs): - #if self.recorder: - # self.recorder.start_request(self) - - res = orig_connection.request(self, *args, **kwargs) - - if self.recorder: - self.recorder.finish_request(self.sock) - - return res - - -# ============================================================================ -class BaseRecorder(object): - def write_request(self, conn, buff): - #url = conn.get_url() - pass - - def write_response_header_line(self, line): - pass - - def write_response_buff(self, buff): - pass - - def finish_request(self, socket): - pass - - def finish_response(self, incomplete=False): - pass - - -#================================================================= -class ReadFullyStream(object): - def __init__(self, stream): - self.stream = stream - - def read(self, *args, **kwargs): - try: - return self.stream.read(*args, **kwargs) - except: - self.mark_incomplete() - raise - - def readline(self, *args, **kwargs): - try: - return self.stream.readline(*args, **kwargs) - except: - self.mark_incomplete() - raise - - def mark_incomplete(self): - if (hasattr(self.stream, '_fp') and - hasattr(self.stream._fp, 'mark_incomplete')): - self.stream._fp.mark_incomplete() - - def close(self): - try: - while True: - buff = self.stream.read(BUFF_SIZE) - sleep(0) - if not buff: - break - - except Exception as e: - import traceback - traceback.print_exc() - self.mark_incomplete() - finally: - self.stream.close() - - -# ============================================================================ -httplib.HTTPConnection = RecordingHTTPConnection -# ============================================================================ - -class DefaultRecorderMaker(object): - def __call__(self): - return BaseRecorder() - - -class FixedRecorder(object): - def __init__(self, recorder): - self.recorder = recorder - - def __call__(self): - return self.recorder - -@contextmanager -def record_requests(url, recorder_maker): - RecordingHTTPConnection.global_recorder_maker = recorder_maker - yield - RecordingHTTPConnection.global_recorder_maker = None - -@contextmanager -def orig_requests(): - httplib.HTTPConnection = orig_connection - yield - httplib.HTTPConnection = RecordingHTTPConnection - - -import requests as patched_requests - -def request(url, method='GET', recorder=None, recorder_maker=None, session=patched_requests, **kwargs): - if kwargs.get('skip_recording'): - recorder_maker = None - elif recorder: - recorder_maker = FixedRecorder(recorder) - elif not recorder_maker: - recorder_maker = DefaultRecorderMaker() - - with record_requests(url, recorder_maker): - kwargs['allow_redirects'] = False - r = session.request(method=method, - url=url, - **kwargs) - - return r diff --git a/webagg/responseloader.py b/webagg/responseloader.py index 31a5298e..52906301 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -1,6 +1,3 @@ -from webagg.liverec import BaseRecorder -from webagg.liverec import request as remote_request - from webagg.utils import MementoUtils from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_timestamp @@ -12,12 +9,12 @@ from pywb.utils.statusandheaders import StatusAndHeaders from pywb.warc.resolvingloader import ResolvingLoader - from io import BytesIO import uuid import six import itertools +import requests #============================================================================= @@ -79,9 +76,6 @@ class BaseLoader(object): out_headers['Memento-Datetime'] = other_headers.get('Memento-Datetime') out_headers['Content-Length'] = other_headers.get('Content-Length') - #for n, v in other_headers.items(): - # out_headers[n] = v - return out_headers, StreamIter(stream) out_headers['Link'] = MementoUtils.make_link( @@ -93,13 +87,19 @@ class BaseLoader(object): warc_headers_buff = warc_headers.to_bytes() - self._set_content_len(warc_headers.get_header('Content-Length'), - out_headers, - len(warc_headers_buff)) + lenset = self._set_content_len(warc_headers.get_header('Content-Length'), + out_headers, + len(warc_headers_buff)) - return out_headers, StreamIter(stream, - header1=warc_headers_buff, - header2=other_headers) + streamiter = StreamIter(stream, + header1=warc_headers_buff, + header2=other_headers) + + if not lenset: + out_headers['Transfer-Encoding'] = 'chunked' + streamiter = self._chunk_encode(streamiter) + + return out_headers, streamiter def _set_content_len(self, content_len_str, headers, existing_len): # Try to set content-length, if it is available and valid @@ -111,6 +111,21 @@ class BaseLoader(object): if content_len >= 0: content_len += existing_len headers['Content-Length'] = str(content_len) + return True + + return False + + @staticmethod + def _chunk_encode(orig_iter): + for chunk in orig_iter: + if not len(chunk): + continue + chunk_len = b'%X\r\n' % len(chunk) + yield chunk_len + yield chunk + yield b'\r\n' + + yield b'0\r\n\r\n' #============================================================================= @@ -183,17 +198,20 @@ class WARCPathLoader(BaseLoader): #============================================================================= class LiveWebLoader(BaseLoader): - SKIP_HEADERS = (b'link', - b'memento-datetime', - b'content-location', - b'x-archive') + SKIP_HEADERS = ('link', + 'memento-datetime', + 'content-location', + 'x-archive') + + def __init__(self): + self.sesh = requests.session() def load_resource(self, cdx, params): load_url = cdx.get('load_url') if not load_url: return None - recorder = HeaderRecorder(self.SKIP_HEADERS) + #recorder = HeaderRecorder(self.SKIP_HEADERS) input_req = params['_input_req'] @@ -215,14 +233,13 @@ class LiveWebLoader(BaseLoader): data = input_req.get_req_body() try: - upstream_res = remote_request(url=load_url, - method=method, - recorder=recorder, - stream=True, - allow_redirects=False, - headers=req_headers, - data=data, - timeout=params.get('_timeout')) + upstream_res = self.sesh.request(url=load_url, + method=method, + stream=True, + allow_redirects=False, + headers=req_headers, + data=data, + timeout=params.get('_timeout')) except Exception as e: raise LiveResourceException(load_url) @@ -240,7 +257,47 @@ class LiveWebLoader(BaseLoader): cdx['source'] = upstream_res.headers.get('WebAgg-Source-Coll') return None, upstream_res.headers, upstream_res.raw - http_headers_buff = recorder.get_headers_buff() + if upstream_res.raw.version == 11: + version = '1.1' + else: + version = '1.0' + + status = 'HTTP/{version} {status} {reason}\r\n' + status = status.format(version=version, + status=upstream_res.status_code, + reason=upstream_res.reason) + + http_headers_buff = status + + orig_resp = upstream_res.raw._original_response + + try: #pragma: no cover + #PY 3 + resp_headers = orig_resp.headers._headers + for n, v in resp_headers: + if n.lower() in self.SKIP_HEADERS: + continue + + http_headers_buff += n + ': ' + v + '\r\n' + except: #pragma: no cover + #PY 2 + resp_headers = orig_resp.msg.headers + for n, v in zip(orig_resp.getheaders(), resp_headers): + if n in self.SKIP_HEADERS: + continue + + http_headers_buff += v + + http_headers_buff += '\r\n' + http_headers_buff = http_headers_buff.encode('latin-1') + + try: + fp = upstream_res.raw._fp.fp + if hasattr(fp, 'raw'): + fp = fp.raw + remote_ip = fp._sock.getpeername()[0] + except: #pragma: no cover + remote_ip = None warc_headers = {} @@ -248,8 +305,8 @@ class LiveWebLoader(BaseLoader): warc_headers['WARC-Record-ID'] = self._make_warc_id() warc_headers['WARC-Target-URI'] = cdx['url'] warc_headers['WARC-Date'] = datetime_to_iso_date(dt) - if recorder.target_ip: - warc_headers['WARC-IP-Address'] = recorder.target_ip + if remote_ip: + warc_headers['WARC-IP-Address'] = remote_ip warc_headers['Content-Type'] = 'application/http; msgtype=response' @@ -269,32 +326,3 @@ class LiveWebLoader(BaseLoader): def __str__(self): return 'LiveWebLoader' - -#============================================================================= -class HeaderRecorder(BaseRecorder): - def __init__(self, skip_list=None): - self.buff = BytesIO() - self.skip_list = skip_list - self.skipped = [] - self.target_ip = None - - def write_response_header_line(self, line): - if self.accept_header(line): - self.buff.write(line) - - def get_headers_buff(self): - return self.buff.getvalue() - - def accept_header(self, line): - if self.skip_list and line.lower().startswith(self.skip_list): - self.skipped.append(line) - return False - - return True - - def finish_request(self, socket): - ip = socket.getpeername() - if ip: - self.target_ip = ip[0] - - diff --git a/webagg/test/test_handlers.py b/webagg/test/test_handlers.py index 138584d6..5b9e510f 100644 --- a/webagg/test/test_handlers.py +++ b/webagg/test/test_handlers.py @@ -1,4 +1,4 @@ -from gevent import monkey; monkey.patch_all(thread=False) +#from gevent import monkey; monkey.patch_all(thread=False) from collections import OrderedDict @@ -12,6 +12,7 @@ from webagg.app import ResAggApp from webagg.utils import MementoUtils from pywb.utils.statusandheaders import StatusAndHeadersParser +from pywb.utils.bufferedreaders import ChunkedDataReader from io import BytesIO import webtest @@ -71,6 +72,7 @@ class TestResAgg(object): def _check_uri_date(self, resp, uri, dt): buff = BytesIO(resp.body) + buff = ChunkedDataReader(buff) status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff) assert status_headers.get_header('WARC-Target-URI') == uri if dt == True: diff --git a/webagg/test/testutils.py b/webagg/test/testutils.py index 51d91364..4c5c42b6 100644 --- a/webagg/test/testutils.py +++ b/webagg/test/testutils.py @@ -71,7 +71,7 @@ class LiveServerTests(object): @classmethod def teardown_class(cls): super(LiveServerTests, cls).teardown_class() - cls.server.stop_thread() + cls.server.stop() # ============================================================================ @@ -87,8 +87,7 @@ class ServerThreadRunner(object): #self.proc.daemon = True self.proc.start() - def stop_thread(self): - #self.httpd.shutdown() + def stop(self): self.proc.terminate() diff --git a/webagg/utils.py b/webagg/utils.py index ea4cec10..8913a443 100644 --- a/webagg/utils.py +++ b/webagg/utils.py @@ -1,6 +1,7 @@ import re import six import string +import time from pywb.utils.timeutils import timestamp_to_http_date from pywb.utils.wbexception import BadRequestException @@ -10,6 +11,8 @@ LINK_SEG_SPLIT = re.compile(';\s*') LINK_URL = re.compile('<(.*)>') LINK_PROP = re.compile('([\w]+)="([^"]+)') +BUFF_SIZE = 8192 + #============================================================================= class MementoException(BadRequestException): @@ -142,3 +145,44 @@ def res_template(template, params): return res +#================================================================= +class ReadFullyStream(object): + def __init__(self, stream): + self.stream = stream + + def read(self, *args, **kwargs): + try: + return self.stream.read(*args, **kwargs) + except: + self.mark_incomplete() + raise + + def readline(self, *args, **kwargs): + try: + return self.stream.readline(*args, **kwargs) + except: + self.mark_incomplete() + raise + + def mark_incomplete(self): + if (hasattr(self.stream, '_fp') and + hasattr(self.stream._fp, 'mark_incomplete')): + self.stream._fp.mark_incomplete() + + def close(self): + try: + while True: + buff = self.stream.read(BUFF_SIZE) + time.sleep(0) + if not buff: + break + + except Exception as e: + import traceback + traceback.print_exc() + self.mark_incomplete() + finally: + self.stream.close() + + +