diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index 9d77b6ca..b7d91251 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -1,3 +1,4 @@ +#from gevent import monkey; monkey.patch_all() from requests import request as remote_request from requests.structures import CaseInsensitiveDict @@ -12,10 +13,11 @@ from pywb.warc.recordloader import ArcWarcRecordLoader from recorder.warcrecorder import SingleFileWARCRecorder, PerRecordWARCRecorder from recorder.redisindexer import WritableRedisIndexer -from six.moves.urllib.parse import parse_qsl +from six.moves.urllib.parse import parse_qsl, quote import json import tempfile +import re import traceback @@ -23,51 +25,56 @@ import gevent.queue import gevent -#============================================================================== -write_queue = gevent.queue.Queue() - - #============================================================================== class RecorderApp(object): - def __init__(self, upstream_host, writer): + def __init__(self, upstream_host, writer, accept_colls='.*'): self.upstream_host = upstream_host self.writer = writer self.parser = StatusAndHeadersParser([], verify=False) - gevent.spawn(self._do_write) + self.write_queue = gevent.queue.Queue() + gevent.spawn(self._write_loop) - def _do_write(self): + self.rx_accept_colls = re.compile(accept_colls) + + def _write_loop(self): while True: + self._write_one() + + def _write_one(self): + try: + result = self.write_queue.get() + + req = None + resp = None + req_head, req_pay, resp_head, resp_pay, params = result + + if not self.rx_accept_colls.match(resp_head.get('WebAgg-Source-Coll', '')): + print('COLL', resp_head) + return + + req = self._create_req_record(req_head, req_pay, 'request') + resp = self._create_resp_record(resp_head, resp_pay, 'response') + + self.writer.write_req_resp(req, resp, params) + except: + traceback.print_exc() + + finally: try: - result = write_queue.get() - req = None - resp = None - req_head, req_pay, resp_head, resp_pay, params = result + if req: + req.stream.close() - req = self._create_req_record(req_head, req_pay, 'request') - resp = self._create_resp_record(resp_head, resp_pay, 'response') - - self.writer.write_req_resp(req, resp, params) - - except: + if resp: + resp.stream.close() + except Exception as e: traceback.print_exc() - finally: - try: - if req: - req.stream.close() - - if resp: - resp.stream.close() - except Exception as e: - traceback.print_exc() - def _create_req_record(self, req_headers, payload, type_, ct=''): len_ = payload.tell() payload.seek(0) - #warc_headers = StatusAndHeaders('WARC/1.0', req_headers.items()) warc_headers = req_headers status_headers = self.parser.parse(payload) @@ -76,7 +83,7 @@ class RecorderApp(object): status_headers, ct, len_) return record - def _create_resp_record(self, req_headers, payload, type_, ct=''): + def _create_resp_record(self, resp_headers, payload, type_, ct=''): len_ = payload.tell() payload.seek(0) @@ -95,18 +102,32 @@ class RecorderApp(object): ('Content-Length', str(len(message)))] start_response('400 Bad Request', headers) - return message + return [message.encode('utf-8')] + + def _get_request_uri(self, env): + req_uri = env.get('REQUEST_URI') + if req_uri: + return req_uri + + req_uri = quote(env.get('PATH_INFO', ''), safe='/~!$&\'()*+,;=:@') + query = env.get('QUERY_STRING') + if query: + req_uri += '?' + query + + return req_uri def __call__(self, environ, start_response): - request_uri = environ.get('REQUEST_URI') + request_uri = self._get_request_uri(environ) input_req = DirectWSGIInputRequest(environ) headers = input_req.get_req_headers() method = input_req.get_req_method() + input_buff = input_req.get_req_body() + params = dict(parse_qsl(environ.get('QUERY_STRING'))) - req_stream = Wrapper(input_req.get_req_body(), headers, None) + req_stream = ReqWrapper(input_buff, headers) try: res = remote_request(url=self.upstream_host + request_uri, @@ -121,24 +142,16 @@ class RecorderApp(object): start_response('200 OK', list(res.headers.items())) - resp_stream = Wrapper(res.raw, res.headers, req_stream, params) + resp_stream = RespWrapper(res.raw, res.headers, req_stream, params, self.write_queue) return StreamIter(ReadFullyStream(resp_stream)) #============================================================================== class Wrapper(object): - def __init__(self, stream, rec_headers, req_obj=None, - params=None): + def __init__(self, stream): self.stream = stream self.out = self._create_buffer() - self.headers = CaseInsensitiveDict(rec_headers) - for n in rec_headers.keys(): - if not n.upper().startswith('WARC-'): - del self.headers[n] - - self.req_obj = req_obj - self.params = params def _create_buffer(self): return tempfile.SpooledTemporaryFile(max_size=512*1024) @@ -153,21 +166,42 @@ class Wrapper(object): self.stream.close() except: traceback.print_exc() + finally: + self._after_close() - if not self.req_obj: + def _after_close(self): + pass + + +#============================================================================== +class RespWrapper(Wrapper): + def __init__(self, stream, headers, req, + params, queue): + + super(RespWrapper, self).__init__(stream) + self.headers = headers + self.req = req + self.params = params + self.queue = queue + + def _after_close(self): + if not self.req: return try: - entry = (self.req_obj.headers, self.req_obj.out, + entry = (self.req.headers, self.req.out, self.headers, self.out, self.params) - write_queue.put(entry) - self.req_obj = None + self.queue.put(entry) + self.req = None except: traceback.print_exc() #============================================================================== -application = RecorderApp('http://localhost:8080', - PerRecordWARCRecorder('./warcs/{user}/{coll}/', - dedup_index=WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', 'recorder'))) - +class ReqWrapper(Wrapper): + def __init__(self, stream, req_headers): + super(ReqWrapper, self).__init__(stream) + self.headers = CaseInsensitiveDict(req_headers) + for n in req_headers.keys(): + if not n.upper().startswith('WARC-'): + del self.headers[n] diff --git a/recorder/redisindexer.py b/recorder/redisindexer.py index f2b3c520..ce359672 100644 --- a/recorder/redisindexer.py +++ b/recorder/redisindexer.py @@ -5,6 +5,7 @@ from pywb.utils.timeutils import timestamp_to_datetime from pywb.utils.timeutils import datetime_to_iso_date, iso_date_to_timestamp from io import BytesIO +import os from webagg.indexsource import RedisIndexSource from webagg.aggregator import SimpleAggregator @@ -13,17 +14,21 @@ from webagg.utils import res_template #============================================================================== class WritableRedisIndexer(RedisIndexSource): - def __init__(self, redis_url, name): + def __init__(self, redis_url, rel_path_template='', name='recorder'): super(WritableRedisIndexer, self).__init__(redis_url) self.cdx_lookup = SimpleAggregator({name: self}) + self.rel_path_template = rel_path_template def add_record(self, stream, params, filename=None): if not filename and hasattr(stream, 'name'): filename = stream.name + rel_path = res_template(self.rel_path_template, params) + filename = os.path.relpath(filename, rel_path) + cdxout = BytesIO() write_cdx_index(cdxout, stream, filename, - cdxj=True, append_post=True) + cdxj=True, append_post=True, rel_root=rel_path) z_key = res_template(self.redis_key_template, params) diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py new file mode 100644 index 00000000..f5dbdaf7 --- /dev/null +++ b/recorder/test/test_recorder.py @@ -0,0 +1,248 @@ +#from gevent import monkey; monkey.patch_all() +import gevent + +from webagg.test.testutils import TempDirTests, LiveServerTests, BaseTestClass, to_path + +import os +import webtest + +from fakeredis import FakeStrictRedis +from mock import patch + +from recorder.recorderapp import RecorderApp +from recorder.redisindexer import WritableRedisIndexer +from recorder.warcrecorder import PerRecordWARCRecorder + +from webagg.utils import MementoUtils + +from pywb.cdx.cdxobject import CDXObject +from pywb.utils.statusandheaders import StatusAndHeadersParser +from pywb.utils.bufferedreaders import DecompressingBufferedReader +from pywb.warc.recordloader import ArcWarcRecordLoader + +from six.moves.urllib.parse import quote, unquote +from io import BytesIO + +general_req_data = "\ +GET {path} HTTP/1.1\r\n\ +Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n\ +User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36\r\n\ +Host: {host}\r\n\ +\r\n" + + + +class TestRecorder(LiveServerTests, TempDirTests, BaseTestClass): + @classmethod + def setup_class(cls): + super(TestRecorder, cls).setup_class() + + warcs = to_path(cls.root_dir + '/warcs') + + os.makedirs(warcs) + + cls.upstream_url = 'http://localhost:{0}'.format(cls.server.port) + + + def _test_per_warc(self, recorder_app, host, path, other_params=''): + url = 'http://' + host + path + req_url = '/live/resource/postreq?url=' + url + other_params + testapp = webtest.TestApp(recorder_app) + resp = testapp.post(req_url, general_req_data.format(host=host, path=path).encode('utf-8')) + #gevent.sleep(0.1) + recorder_app._write_one() + + assert resp.headers['WebAgg-Source-Coll'] == 'live' + + assert resp.headers['Link'] == MementoUtils.make_link(unquote(url), 'original') + assert resp.headers['Memento-Datetime'] != '' + + return resp + + def _test_all_warcs(self, dirname, num): + coll_dir = to_path(self.root_dir + dirname) + assert os.path.isdir(coll_dir) + + files = [x for x in os.listdir(coll_dir) if os.path.isfile(os.path.join(coll_dir, x))] + assert len(files) == num + assert all(x.endswith('.warc.gz') for x in files) + + def test_record_warc_1(self): + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/'))) + + resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/', 2) + + def test_record_warc_2(self): + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live') + + resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/', 4) + + def test_error_url(self): + recorder_app = RecorderApp(self.upstream_url + '01', + PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='live') + + + testapp = webtest.TestApp(recorder_app) + resp = testapp.get('/live/resource?url=http://example.com/', status=400) + + assert resp.json['error'] != '' + + self._test_all_warcs('/warcs/', 4) + + def test_record_cookies_header(self): + base_path = to_path(self.root_dir + '/warcs/cookiecheck/') + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(base_path), accept_colls='live') + + resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') + assert b'HTTP/1.1 302' in resp.body + + buff = BytesIO(resp.body) + record = ArcWarcRecordLoader().parse_record_stream(buff) + assert ('Set-Cookie', 'name=value; Path=/') in record.status_headers.headers + assert ('Set-Cookie', 'foo=bar; Path=/') in record.status_headers.headers + + warcs = os.listdir(base_path) + + stored_rec = None + for warc in warcs: + with open(os.path.join(base_path, warc), 'rb') as fh: + decomp = DecompressingBufferedReader(fh) + stored_rec = ArcWarcRecordLoader().parse_record_stream(decomp) + if stored_rec.rec_type == 'response': + break + + assert stored_rec is not None + assert ('Set-Cookie', 'name=value; Path=/') in stored_rec.status_headers.headers + assert ('Set-Cookie', 'foo=bar; Path=/') in stored_rec.status_headers.headers + + def test_record_cookies_skip_header(self): + base_path = to_path(self.root_dir + '/warcs/cookieskip/') + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(base_path, exclude_headers=['Set-Cookie', 'Cookie']), + accept_colls='live') + + resp = self._test_per_warc(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') + assert b'HTTP/1.1 302' in resp.body + + buff = BytesIO(resp.body) + record = ArcWarcRecordLoader().parse_record_stream(buff) + assert ('Set-Cookie', 'name=value; Path=/') in record.status_headers.headers + assert ('Set-Cookie', 'foo=bar; Path=/') in record.status_headers.headers + + warcs = os.listdir(base_path) + + stored_rec = None + for warc in warcs: + with open(os.path.join(base_path, warc), 'rb') as fh: + decomp = DecompressingBufferedReader(fh) + stored_rec = ArcWarcRecordLoader().parse_record_stream(decomp) + if stored_rec.rec_type == 'response': + break + + assert stored_rec is not None + assert ('Set-Cookie', 'name=value; Path=/') not in stored_rec.status_headers.headers + assert ('Set-Cookie', 'foo=bar; Path=/') not in stored_rec.status_headers.headers + + + def test_record_skip_wrong_coll(self): + recorder_app = RecorderApp(self.upstream_url, + writer=PerRecordWARCRecorder(to_path(self.root_dir + '/warcs/')), accept_colls='not-live') + + resp = self._test_per_warc(recorder_app, 'httpbin.org', '/get?foo=bar') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/', 4) + + @patch('redis.StrictRedis', FakeStrictRedis) + def test_record_param_user_coll(self): + + warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') + + + dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + rel_path_template=self.root_dir + '/warcs/') + + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + + self._test_all_warcs('/warcs/', 4) + + resp = self._test_per_warc(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/USER/COLL/', 2) + + r = FakeStrictRedis.from_url('redis://localhost/2') + + res = r.zrange('USER:COLL:cdxj', 0, -1) + assert len(res) == 1 + + cdx = CDXObject(res[0]) + assert cdx['urlkey'] == 'org,httpbin)/get?foo=bar' + assert cdx['mime'] == 'application/json' + assert cdx['offset'] == '0' + assert cdx['filename'].startswith('USER/COLL/') + assert cdx['filename'].endswith('.warc.gz') + + + @patch('redis.StrictRedis', FakeStrictRedis) + def test_record_param_user_coll_revisit(self): + warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') + + + dedup_index = WritableRedisIndexer('redis://localhost/2/{user}:{coll}:cdxj', + rel_path_template=self.root_dir + '/warcs/') + + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCRecorder(warc_path, dedup_index=dedup_index)) + + self._test_all_warcs('/warcs/', 4) + + resp = self._test_per_warc(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER¶m.recorder.coll=COLL') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs/USER/COLL/', 4) + + # Test Redis CDX + r = FakeStrictRedis.from_url('redis://localhost/2') + + res = r.zrange('USER:COLL:cdxj', 0, -1) + assert len(res) == 2 + + cdx = CDXObject(res[1]) + assert cdx['urlkey'] == 'org,httpbin)/get?foo=bar' + assert cdx['mime'] == 'warc/revisit' + assert cdx['offset'] == '0' + assert cdx['filename'].startswith('USER/COLL/') + assert cdx['filename'].endswith('.warc.gz') + + fullwarc = os.path.join(self.root_dir, 'warcs', cdx['filename']) + + with open(fullwarc, 'rb') as fh: + decomp = DecompressingBufferedReader(fh) + # Test refers-to headers + status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(decomp) + assert status_headers.get_header('WARC-Type') == 'revisit' + assert status_headers.get_header('WARC-Target-URI') == 'http://httpbin.org/get?foo=bar' + assert status_headers.get_header('WARC-Date') != '' + assert status_headers.get_header('WARC-Refers-To-Target-URI') == 'http://httpbin.org/get?foo=bar' + assert status_headers.get_header('WARC-Refers-To-Date') != '' + + + diff --git a/recorder/warcrecorder.py b/recorder/warcrecorder.py index 98d49361..c24e9f63 100644 --- a/recorder/warcrecorder.py +++ b/recorder/warcrecorder.py @@ -29,9 +29,14 @@ class BaseWARCRecorder(object): REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest' - def __init__(self, gzip=True, dedup_index=None): + def __init__(self, gzip=True, dedup_index=None, name='recorder', + exclude_headers=None): self.gzip = gzip self.dedup_index = dedup_index + self.rec_source_name = name + self.exclude_headers = exclude_headers + if self.exclude_headers: + self.exclude_headers = [x.lower() for x in self.exclude_headers] def ensure_digest(self, record): block_digest = record.rec_headers.get('WARC-Block-Digest') @@ -62,7 +67,8 @@ class BaseWARCRecorder(object): return Digester('sha1') def _set_header_buff(self, record): - record.status_headers.headers_buff = str(record.status_headers).encode('latin-1') + b'\r\n' + buff = record.status_headers.to_bytes(self.exclude_headers) + record.status_headers.headers_buff = buff def write_req_resp(self, req, resp, params): url = resp.rec_headers.get('WARC-Target-Uri') @@ -80,8 +86,6 @@ class BaseWARCRecorder(object): if resp_id: req.rec_headers['WARC-Concurrent-To'] = resp_id - #resp.status_headers.remove_header('Etag') - self._set_header_buff(req) self._set_header_buff(resp) @@ -208,12 +212,6 @@ class Digester(object): def update(self, buff): self.digester.update(buff) - def __eq__(self, string): - digest = str(base64.b32encode(self.digester.digest())) - if ':' in string: - digest = self._type_ + ':' + digest - return string == digest - def __str__(self): return self.type_ + ':' + to_native_str(base64.b32encode(self.digester.digest())) @@ -259,7 +257,9 @@ class PerRecordWARCRecorder(BaseWARCRecorder): resp_uuid = resp.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') req_uuid = req.rec_headers['WARC-Record-ID'].split(':')[-1].strip('<> ') - formatter = ParamFormatter(params) + formatter = ParamFormatter(params, name=self.rec_source_name) + print(params) + print(formatter.name) full_dir = formatter.format(self.warcdir) try: