1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

live loader: remove liverec (doesn't work well with gevent), use regular requests

instead of overriden version.
reconstruct header block from httplib header pairs list
move ReadFullyStream to utils
This commit is contained in:
Ilya Kreymer 2016-03-12 22:15:24 -08:00
parent 9adb8da3b7
commit 49b6ae78a8
8 changed files with 152 additions and 313 deletions

View File

@ -1,9 +1,8 @@
from webagg.liverec import request as remote_request
from webagg.inputrequest import DirectWSGIInputRequest, POSTInputRequest
from bottle import route, request, response, abort, Bottle
import bottle
import requests
import traceback
import json

View File

@ -8,7 +8,8 @@ from pywb.utils.wbexception import NotFoundException
from pywb.cdx.cdxobject import CDXObject
from webagg.liverec import patched_requests as requests
#from webagg.liverec import patched_requests as requests
import requests
from webagg.utils import ParamFormatter, res_template
from webagg.utils import MementoUtils

View File

@ -2,8 +2,8 @@ from pywb.utils.loaders import extract_post_query, append_post_query
from pywb.utils.loaders import LimitReader
from pywb.utils.statusandheaders import StatusAndHeadersParser
from six.moves.urllib.parse import urlsplit
from six import StringIO, iteritems
from six.moves.urllib.parse import urlsplit, quote
from six import iteritems
from io import BytesIO
@ -80,6 +80,18 @@ class DirectWSGIInputRequest(object):
return url
def get_full_request_uri(self):
req_uri = self.env.get('REQUEST_URI')
if req_uri:
return req_uri
req_uri = quote(self.env.get('PATH_INFO', ''), safe='/~!$&\'()*+,;=:@')
query = self.env.get('QUERY_STRING')
if query:
req_uri += '?' + query
return req_uri
#=============================================================================
class POSTInputRequest(DirectWSGIInputRequest):

View File

@ -1,246 +0,0 @@
from io import BytesIO
try:
import httplib
except ImportError:
import http.client as httplib
orig_connection = httplib.HTTPConnection
from contextlib import contextmanager
import ssl
from array import array
from time import sleep
BUFF_SIZE = 8192
# ============================================================================
class RecordingStream(object):
def __init__(self, fp, recorder):
self.fp = fp
self.recorder = recorder
self.incomplete = False
if hasattr(self.fp, 'unread'):
self.unread = self.fp.unread
if hasattr(self.fp, 'tell'):
self.tell = self.fp.tell
def read(self, *args, **kwargs):
buff = self.fp.read(*args, **kwargs)
self.recorder.write_response_buff(buff)
return buff
def readinto(self, buff):
res = self.fp.readinto(buff)
self.recorder.write_response_buff(buff)
return res
def readline(self, maxlen=-1):
line = self.fp.readline(maxlen)
self.recorder.write_response_header_line(line)
return line
def flush(self):
self.fp.flush()
def close(self):
try:
self.recorder.finish_response(self.incomplete)
except Exception as e:
import traceback
traceback.print_exc()
res = self.fp.close()
return res
# ============================================================================
class RecordingHTTPResponse(httplib.HTTPResponse):
def __init__(self, recorder, *args, **kwargs):
httplib.HTTPResponse.__init__(self, *args, **kwargs)
self.fp = RecordingStream(self.fp, recorder)
def mark_incomplete(self):
self.fp.incomplete = True
# ============================================================================
class RecordingHTTPConnection(httplib.HTTPConnection):
global_recorder_maker = None
def __init__(self, *args, **kwargs):
orig_connection.__init__(self, *args, **kwargs)
if not self.global_recorder_maker:
self.recorder = None
else:
self.recorder = self.global_recorder_maker()
def make_recording_response(*args, **kwargs):
return RecordingHTTPResponse(self.recorder, *args, **kwargs)
self.response_class = make_recording_response
def send(self, data):
if not self.recorder:
orig_connection.send(self, data)
return
if hasattr(data,'read') and not isinstance(data, array):
url = None
while True:
buff = data.read(BUFF_SIZE)
if not buff:
break
orig_connection.send(self, buff)
self.recorder.write_request(url, buff)
else:
orig_connection.send(self, data)
self.recorder.write_request(self, data)
def get_url(self, data):
try:
buff = BytesIO(data)
line = buff.readline()
path = line.split(' ', 2)[1]
host = self.host
port = self.port
scheme = 'https' if isinstance(self.sock, ssl.SSLSocket) else 'http'
url = scheme + '://' + host
if (scheme == 'https' and port != '443') and (scheme == 'http' and port != '80'):
url += ':' + port
url += path
except Exception as e:
raise
return url
def request(self, *args, **kwargs):
#if self.recorder:
# self.recorder.start_request(self)
res = orig_connection.request(self, *args, **kwargs)
if self.recorder:
self.recorder.finish_request(self.sock)
return res
# ============================================================================
class BaseRecorder(object):
def write_request(self, conn, buff):
#url = conn.get_url()
pass
def write_response_header_line(self, line):
pass
def write_response_buff(self, buff):
pass
def finish_request(self, socket):
pass
def finish_response(self, incomplete=False):
pass
#=================================================================
class ReadFullyStream(object):
def __init__(self, stream):
self.stream = stream
def read(self, *args, **kwargs):
try:
return self.stream.read(*args, **kwargs)
except:
self.mark_incomplete()
raise
def readline(self, *args, **kwargs):
try:
return self.stream.readline(*args, **kwargs)
except:
self.mark_incomplete()
raise
def mark_incomplete(self):
if (hasattr(self.stream, '_fp') and
hasattr(self.stream._fp, 'mark_incomplete')):
self.stream._fp.mark_incomplete()
def close(self):
try:
while True:
buff = self.stream.read(BUFF_SIZE)
sleep(0)
if not buff:
break
except Exception as e:
import traceback
traceback.print_exc()
self.mark_incomplete()
finally:
self.stream.close()
# ============================================================================
httplib.HTTPConnection = RecordingHTTPConnection
# ============================================================================
class DefaultRecorderMaker(object):
def __call__(self):
return BaseRecorder()
class FixedRecorder(object):
def __init__(self, recorder):
self.recorder = recorder
def __call__(self):
return self.recorder
@contextmanager
def record_requests(url, recorder_maker):
RecordingHTTPConnection.global_recorder_maker = recorder_maker
yield
RecordingHTTPConnection.global_recorder_maker = None
@contextmanager
def orig_requests():
httplib.HTTPConnection = orig_connection
yield
httplib.HTTPConnection = RecordingHTTPConnection
import requests as patched_requests
def request(url, method='GET', recorder=None, recorder_maker=None, session=patched_requests, **kwargs):
if kwargs.get('skip_recording'):
recorder_maker = None
elif recorder:
recorder_maker = FixedRecorder(recorder)
elif not recorder_maker:
recorder_maker = DefaultRecorderMaker()
with record_requests(url, recorder_maker):
kwargs['allow_redirects'] = False
r = session.request(method=method,
url=url,
**kwargs)
return r

View File

@ -1,6 +1,3 @@
from webagg.liverec import BaseRecorder
from webagg.liverec import request as remote_request
from webagg.utils import MementoUtils
from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_timestamp
@ -12,12 +9,12 @@ from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.warc.resolvingloader import ResolvingLoader
from io import BytesIO
import uuid
import six
import itertools
import requests
#=============================================================================
@ -79,9 +76,6 @@ class BaseLoader(object):
out_headers['Memento-Datetime'] = other_headers.get('Memento-Datetime')
out_headers['Content-Length'] = other_headers.get('Content-Length')
#for n, v in other_headers.items():
# out_headers[n] = v
return out_headers, StreamIter(stream)
out_headers['Link'] = MementoUtils.make_link(
@ -93,13 +87,19 @@ class BaseLoader(object):
warc_headers_buff = warc_headers.to_bytes()
self._set_content_len(warc_headers.get_header('Content-Length'),
out_headers,
len(warc_headers_buff))
lenset = self._set_content_len(warc_headers.get_header('Content-Length'),
out_headers,
len(warc_headers_buff))
return out_headers, StreamIter(stream,
header1=warc_headers_buff,
header2=other_headers)
streamiter = StreamIter(stream,
header1=warc_headers_buff,
header2=other_headers)
if not lenset:
out_headers['Transfer-Encoding'] = 'chunked'
streamiter = self._chunk_encode(streamiter)
return out_headers, streamiter
def _set_content_len(self, content_len_str, headers, existing_len):
# Try to set content-length, if it is available and valid
@ -111,6 +111,21 @@ class BaseLoader(object):
if content_len >= 0:
content_len += existing_len
headers['Content-Length'] = str(content_len)
return True
return False
@staticmethod
def _chunk_encode(orig_iter):
for chunk in orig_iter:
if not len(chunk):
continue
chunk_len = b'%X\r\n' % len(chunk)
yield chunk_len
yield chunk
yield b'\r\n'
yield b'0\r\n\r\n'
#=============================================================================
@ -183,17 +198,20 @@ class WARCPathLoader(BaseLoader):
#=============================================================================
class LiveWebLoader(BaseLoader):
SKIP_HEADERS = (b'link',
b'memento-datetime',
b'content-location',
b'x-archive')
SKIP_HEADERS = ('link',
'memento-datetime',
'content-location',
'x-archive')
def __init__(self):
self.sesh = requests.session()
def load_resource(self, cdx, params):
load_url = cdx.get('load_url')
if not load_url:
return None
recorder = HeaderRecorder(self.SKIP_HEADERS)
#recorder = HeaderRecorder(self.SKIP_HEADERS)
input_req = params['_input_req']
@ -215,14 +233,13 @@ class LiveWebLoader(BaseLoader):
data = input_req.get_req_body()
try:
upstream_res = remote_request(url=load_url,
method=method,
recorder=recorder,
stream=True,
allow_redirects=False,
headers=req_headers,
data=data,
timeout=params.get('_timeout'))
upstream_res = self.sesh.request(url=load_url,
method=method,
stream=True,
allow_redirects=False,
headers=req_headers,
data=data,
timeout=params.get('_timeout'))
except Exception as e:
raise LiveResourceException(load_url)
@ -240,7 +257,47 @@ class LiveWebLoader(BaseLoader):
cdx['source'] = upstream_res.headers.get('WebAgg-Source-Coll')
return None, upstream_res.headers, upstream_res.raw
http_headers_buff = recorder.get_headers_buff()
if upstream_res.raw.version == 11:
version = '1.1'
else:
version = '1.0'
status = 'HTTP/{version} {status} {reason}\r\n'
status = status.format(version=version,
status=upstream_res.status_code,
reason=upstream_res.reason)
http_headers_buff = status
orig_resp = upstream_res.raw._original_response
try: #pragma: no cover
#PY 3
resp_headers = orig_resp.headers._headers
for n, v in resp_headers:
if n.lower() in self.SKIP_HEADERS:
continue
http_headers_buff += n + ': ' + v + '\r\n'
except: #pragma: no cover
#PY 2
resp_headers = orig_resp.msg.headers
for n, v in zip(orig_resp.getheaders(), resp_headers):
if n in self.SKIP_HEADERS:
continue
http_headers_buff += v
http_headers_buff += '\r\n'
http_headers_buff = http_headers_buff.encode('latin-1')
try:
fp = upstream_res.raw._fp.fp
if hasattr(fp, 'raw'):
fp = fp.raw
remote_ip = fp._sock.getpeername()[0]
except: #pragma: no cover
remote_ip = None
warc_headers = {}
@ -248,8 +305,8 @@ class LiveWebLoader(BaseLoader):
warc_headers['WARC-Record-ID'] = self._make_warc_id()
warc_headers['WARC-Target-URI'] = cdx['url']
warc_headers['WARC-Date'] = datetime_to_iso_date(dt)
if recorder.target_ip:
warc_headers['WARC-IP-Address'] = recorder.target_ip
if remote_ip:
warc_headers['WARC-IP-Address'] = remote_ip
warc_headers['Content-Type'] = 'application/http; msgtype=response'
@ -269,32 +326,3 @@ class LiveWebLoader(BaseLoader):
def __str__(self):
return 'LiveWebLoader'
#=============================================================================
class HeaderRecorder(BaseRecorder):
def __init__(self, skip_list=None):
self.buff = BytesIO()
self.skip_list = skip_list
self.skipped = []
self.target_ip = None
def write_response_header_line(self, line):
if self.accept_header(line):
self.buff.write(line)
def get_headers_buff(self):
return self.buff.getvalue()
def accept_header(self, line):
if self.skip_list and line.lower().startswith(self.skip_list):
self.skipped.append(line)
return False
return True
def finish_request(self, socket):
ip = socket.getpeername()
if ip:
self.target_ip = ip[0]

View File

@ -1,4 +1,4 @@
from gevent import monkey; monkey.patch_all(thread=False)
#from gevent import monkey; monkey.patch_all(thread=False)
from collections import OrderedDict
@ -12,6 +12,7 @@ from webagg.app import ResAggApp
from webagg.utils import MementoUtils
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.utils.bufferedreaders import ChunkedDataReader
from io import BytesIO
import webtest
@ -71,6 +72,7 @@ class TestResAgg(object):
def _check_uri_date(self, resp, uri, dt):
buff = BytesIO(resp.body)
buff = ChunkedDataReader(buff)
status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff)
assert status_headers.get_header('WARC-Target-URI') == uri
if dt == True:

View File

@ -71,7 +71,7 @@ class LiveServerTests(object):
@classmethod
def teardown_class(cls):
super(LiveServerTests, cls).teardown_class()
cls.server.stop_thread()
cls.server.stop()
# ============================================================================
@ -87,8 +87,7 @@ class ServerThreadRunner(object):
#self.proc.daemon = True
self.proc.start()
def stop_thread(self):
#self.httpd.shutdown()
def stop(self):
self.proc.terminate()

View File

@ -1,6 +1,7 @@
import re
import six
import string
import time
from pywb.utils.timeutils import timestamp_to_http_date
from pywb.utils.wbexception import BadRequestException
@ -10,6 +11,8 @@ LINK_SEG_SPLIT = re.compile(';\s*')
LINK_URL = re.compile('<(.*)>')
LINK_PROP = re.compile('([\w]+)="([^"]+)')
BUFF_SIZE = 8192
#=============================================================================
class MementoException(BadRequestException):
@ -142,3 +145,44 @@ def res_template(template, params):
return res
#=================================================================
class ReadFullyStream(object):
def __init__(self, stream):
self.stream = stream
def read(self, *args, **kwargs):
try:
return self.stream.read(*args, **kwargs)
except:
self.mark_incomplete()
raise
def readline(self, *args, **kwargs):
try:
return self.stream.readline(*args, **kwargs)
except:
self.mark_incomplete()
raise
def mark_incomplete(self):
if (hasattr(self.stream, '_fp') and
hasattr(self.stream._fp, 'mark_incomplete')):
self.stream._fp.mark_incomplete()
def close(self):
try:
while True:
buff = self.stream.read(BUFF_SIZE)
time.sleep(0)
if not buff:
break
except Exception as e:
import traceback
traceback.print_exc()
self.mark_incomplete()
finally:
self.stream.close()