From 4c7da0f6ef145d02117b7086b74dc238b4720dd1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 7 Jun 2016 12:55:04 -0400 Subject: [PATCH] recorder: support overridings get_params() in subclass multiwarcwriter: support multiple warcs in same dir, support random component in path, and a custom key template for selecting current warc file, not related to current directory --- recorder/recorderapp.py | 5 ++++- recorder/test/test_recorder.py | 29 ++++++++++++++++++++++------ recorder/warcwriter.py | 35 ++++++++++++++++++++++++---------- 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index d61e3df1..74d58248 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -115,11 +115,14 @@ class RecorderApp(object): '200 OK', start_response) + def _get_params(self, environ): + params = dict(parse_qsl(environ.get('QUERY_STRING'))) + return params def __call__(self, environ, start_response): input_req = DirectWSGIInputRequest(environ) - params = dict(parse_qsl(environ.get('QUERY_STRING'))) + params = self._get_params(environ) request_uri = input_req.get_full_request_uri() diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 5320800f..7dbc2e75 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -168,10 +168,10 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert ('Cookie', 'boo=far') in stored_req.status_headers.headers def test_record_cookies_skip_header(self): - base_path = to_path(self.root_dir + '/warcs/cookieskip/') + warc_path = to_path(self.root_dir + '/warcs/cookieskip/') header_filter = ExcludeSpecificHeaders(['Set-Cookie', 'Cookie']) recorder_app = RecorderApp(self.upstream_url, - PerRecordWARCWriter(base_path, header_filter=header_filter), + PerRecordWARCWriter(warc_path, header_filter=header_filter), accept_colls='live') resp = self._test_warc_write(recorder_app, 'httpbin.org', '/cookies/set%3Fname%3Dvalue%26foo%3Dbar') @@ -182,7 +182,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert ('Set-Cookie', 'name=value; Path=/') in record.status_headers.headers assert ('Set-Cookie', 'foo=bar; Path=/') in record.status_headers.headers - stored_req, stored_resp = self._load_resp_req(base_path) + stored_req, stored_resp = self._load_resp_req(warc_path) assert ('Set-Cookie', 'name=value; Path=/') not in stored_resp.status_headers.headers assert ('Set-Cookie', 'foo=bar; Path=/') not in stored_resp.status_headers.headers @@ -201,7 +201,6 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) self._test_all_warcs('/warcs/', 2) def test_record_param_user_coll(self): - warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') dedup_index = self._get_dedup_index() @@ -234,6 +233,25 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) full_path = self.root_dir + '/warcs/' + cdx['filename'] assert warcs == {cdx['filename'].encode('utf-8'): full_path.encode('utf-8')} + def test_record_param_user_coll_same_dir(self): + warc_path = to_path(self.root_dir + '/warcs2/') + + dedup_index = self._get_dedup_index() + + recorder_app = RecorderApp(self.upstream_url, + PerRecordWARCWriter(warc_path, dedup_index=dedup_index, key_template='{user}:{coll}')) + + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER2¶m.recorder.coll=COLL2') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + resp = self._test_warc_write(recorder_app, 'httpbin.org', + '/get?foo=bar', '¶m.recorder.user=USER2¶m.recorder.coll=COLL3') + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + self._test_all_warcs('/warcs2', 2) def test_record_param_user_coll_revisit(self): warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') @@ -395,8 +413,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert len(writer.fh_cache) == 1 - writer.close_dir(self.root_dir + '/warcs/FOO/') - #writer.close_file({'param.recorder.coll': 'FOO'}) + writer.close_key(self.root_dir + '/warcs/FOO/') assert len(writer.fh_cache) == 0 diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 7fce2dd1..4ae1a20d 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -332,6 +332,7 @@ class MultiFileWARCWriter(BaseWARCWriter): filename_template = self.FILE_TEMPLATE self.dir_template = dir_template + self.key_template = kwargs.get('key_template', self.dir_template) self.filename_template = filename_template self.max_size = max_size if max_idle_secs > 0: @@ -344,9 +345,12 @@ class MultiFileWARCWriter(BaseWARCWriter): def _open_file(self, dir_, params): timestamp = timestamp20_now() + randstr = base64.b32encode(os.urandom(5)).decode('utf-8') + filename = dir_ + res_template(self.filename_template, params, hostname=self.hostname, - timestamp=timestamp) + timestamp=timestamp, + random=randstr) path, name = os.path.split(filename) @@ -366,9 +370,14 @@ class MultiFileWARCWriter(BaseWARCWriter): fcntl.flock(fh, fcntl.LOCK_UN) fh.close() - def close_dir(self, full_dir): - #full_dir = res_template(self.dir_template, params) - result = self.fh_cache.pop(full_dir, None) + def get_dir_key(self, params): + return res_template(self.key_template, params) + + def close_key(self, dir_key): + if isinstance(dir_key, dict): + dir_key = self.get_dir_key(dir_key) + + result = self.fh_cache.pop(dir_key, None) if not result: return @@ -376,6 +385,11 @@ class MultiFileWARCWriter(BaseWARCWriter): self._close_file(out) return filename + def close_file(self, match_filename): + for dir_key, out, filename in self.iter_open_files(): + if filename == match_filename: + return self.close_key(dir_key) + def _is_write_resp(self, resp, params): return True @@ -389,8 +403,9 @@ class MultiFileWARCWriter(BaseWARCWriter): def _do_write_req_resp(self, req, resp, params): full_dir = res_template(self.dir_template, params) + dir_key = self.get_dir_key(params) - result = self.fh_cache.get(full_dir) + result = self.fh_cache.get(dir_key) close_file = False @@ -436,11 +451,11 @@ class MultiFileWARCWriter(BaseWARCWriter): if close_file: self._close_file(out) if not is_new: - self.fh_cache.pop(full_dir, None) + self.fh_cache.pop(dir_key, None) elif is_new: fcntl.flock(out, fcntl.LOCK_EX | fcntl.LOCK_NB) - self.fh_cache[full_dir] = (out, filename) + self.fh_cache[dir_key] = (out, filename) def iter_open_files(self): for n, v in list(self.fh_cache.items()): @@ -448,7 +463,7 @@ class MultiFileWARCWriter(BaseWARCWriter): yield n, out, filename def close(self): - for dirname, out, filename in self.iter_open_files(): + for dir_key, out, filename in self.iter_open_files(): self._close_file(out) self.fh_cache = {} @@ -459,12 +474,12 @@ class MultiFileWARCWriter(BaseWARCWriter): now = datetime.datetime.now() - for dirname, out, filename in self.iter_open_files(): + for dir_key, out, filename in self.iter_open_files(): mtime = os.path.getmtime(filename) mtime = datetime.datetime.fromtimestamp(mtime) if (now - mtime) > self.max_idle_time: print('Closing idle ' + filename) - self.close_dir(dirname) + self.close_key(dir_key) # ============================================================================