1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-23 06:32:24 +01:00
pywb/recorder/recorderapp.py
2016-03-09 14:33:36 -08:00

174 lines
5.4 KiB
Python

from requests import request as remote_request
from requests.structures import CaseInsensitiveDict
from webagg.liverec import ReadFullyStream
from webagg.responseloader import StreamIter
from webagg.inputrequest import DirectWSGIInputRequest
from pywb.utils.statusandheaders import StatusAndHeadersParser
from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from recorder.warcrecorder import SingleFileWARCRecorder, PerRecordWARCRecorder
from recorder.redisindexer import WritableRedisIndexer
from six.moves.urllib.parse import parse_qsl
import json
import tempfile
import traceback
import gevent.queue
import gevent
#==============================================================================
write_queue = gevent.queue.Queue()
#==============================================================================
class RecorderApp(object):
def __init__(self, upstream_host, writer):
self.upstream_host = upstream_host
self.writer = writer
self.parser = StatusAndHeadersParser([], verify=False)
gevent.spawn(self._do_write)
def _do_write(self):
while True:
try:
result = write_queue.get()
req = None
resp = None
req_head, req_pay, resp_head, resp_pay, params = result
req = self._create_req_record(req_head, req_pay, 'request')
resp = self._create_resp_record(resp_head, resp_pay, 'response')
self.writer.write_req_resp(req, resp, params)
except:
traceback.print_exc()
finally:
try:
if req:
req.stream.close()
if resp:
resp.stream.close()
except Exception as e:
traceback.print_exc()
def _create_req_record(self, req_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
#warc_headers = StatusAndHeaders('WARC/1.0', req_headers.items())
warc_headers = req_headers
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def _create_resp_record(self, req_headers, payload, type_, ct=''):
len_ = payload.tell()
payload.seek(0)
warc_headers = self.parser.parse(payload)
warc_headers = CaseInsensitiveDict(warc_headers.headers)
status_headers = self.parser.parse(payload)
record = ArcWarcRecord('warc', type_, warc_headers, payload,
status_headers, ct, len_)
return record
def send_error(self, exc, start_response):
message = json.dumps({'error': repr(exc)})
headers = [('Content-Type', 'application/json; charset=utf-8'),
('Content-Length', str(len(message)))]
start_response('400 Bad Request', headers)
return message
def __call__(self, environ, start_response):
request_uri = environ.get('REQUEST_URI')
input_req = DirectWSGIInputRequest(environ)
headers = input_req.get_req_headers()
method = input_req.get_req_method()
params = dict(parse_qsl(environ.get('QUERY_STRING')))
req_stream = Wrapper(input_req.get_req_body(), headers, None)
try:
res = remote_request(url=self.upstream_host + request_uri,
method=method,
data=req_stream,
headers=headers,
allow_redirects=False,
stream=True)
except Exception as e:
traceback.print_exc()
return self.send_error(e, start_response)
start_response('200 OK', list(res.headers.items()))
resp_stream = Wrapper(res.raw, res.headers, req_stream, params)
return StreamIter(ReadFullyStream(resp_stream))
#==============================================================================
class Wrapper(object):
def __init__(self, stream, rec_headers, req_obj=None,
params=None):
self.stream = stream
self.out = self._create_buffer()
self.headers = CaseInsensitiveDict(rec_headers)
for n in rec_headers.keys():
if not n.upper().startswith('WARC-'):
del self.headers[n]
self.req_obj = req_obj
self.params = params
def _create_buffer(self):
return tempfile.SpooledTemporaryFile(max_size=512*1024)
def read(self, limit=-1):
buff = self.stream.read()
self.out.write(buff)
return buff
def close(self):
try:
self.stream.close()
except:
traceback.print_exc()
if not self.req_obj:
return
try:
entry = (self.req_obj.headers, self.req_obj.out,
self.headers, self.out, self.params)
write_queue.put(entry)
self.req_obj = None
except:
traceback.print_exc()
#==============================================================================
application = RecorderApp('http://localhost:8080',
PerRecordWARCRecorder('./warcs/{user}/{coll}/',
dedup_index=WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', 'recorder')))