From 2fe0c2f25b12c6e521afa93b9a0d41d2f46022ee Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 24 Jun 2016 20:04:27 -0500 Subject: [PATCH] support for tallying substats of a configured bucket by host, and enforcing limits host limits using those stats, with tests --- setup.py | 3 +- tests/test_warcprox.py | 143 +++++++++++++++++++++++++++++++- warcprox/stats.py | 181 ++++++++++++++++++++++++----------------- warcprox/warcproxy.py | 30 +++++-- 4 files changed, 272 insertions(+), 85 deletions(-) diff --git a/setup.py b/setup.py index 6584d18..5dcfc75 100755 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ except: deps.append('futures') setuptools.setup(name='warcprox', - version='2.0.dev9', + version='2.0.dev10', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', @@ -70,6 +70,7 @@ setuptools.setup(name='warcprox', 'License :: OSI Approved :: GNU General Public License (GPL)', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', 'Topic :: Internet :: Proxy Servers', 'Topic :: Internet :: WWW/HTTP', 'Topic :: Software Development :: Libraries :: Python Modules', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index db97674..281d1f9 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -57,7 +57,8 @@ import certauth.certauth import warcprox -logging.basicConfig(stream=sys.stdout, level=logging.INFO, +# logging.basicConfig(stream=sys.stdout, level=logging.INFO, +logging.basicConfig(stream=sys.stdout, level=warcprox.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -563,7 +564,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie def test_limits(http_daemon, warcprox_, archiving_proxies): url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) - request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}} + request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket.total.urls":10}} headers = {"Warcprox-Meta": json.dumps(request_meta)} response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) @@ -592,10 +593,10 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 assert response.reason == "Reached limit" - expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'bucket': 'job1', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}} + expected_response_meta = {'reached-limit': {'test_limits_bucket.total.urls': 10}, 'stats': {'test_limits_bucket': {'bucket': 'test_limits_bucket', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}} assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta assert response.headers["content-type"] == "text/plain;charset=utf-8" - assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n" + assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket.total.urls=10\n" def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) @@ -839,6 +840,140 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): assert response.content.startswith(b"request rejected by warcprox: blocked by rule found in Warcprox-Meta header:") assert json.loads(response.headers['warcprox-meta']) == {"blocked-by-rule":rules[3]} +def test_host_doc_limit( + http_daemon, https_daemon, warcprox_, archiving_proxies): + request_meta = { + "stats": {"buckets": [{"bucket":"test_host_doc_limit_bucket","tally-host-stats":True}]}, + "limits": {"test_host_doc_limit_bucket:localhost.total.urls":10}, + } + headers = {"Warcprox-Meta": json.dumps(request_meta)} + + url = 'http://localhost:{}/o/p'.format(http_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'o!' + assert response.content == b'I am the warcprox test payload! pppppppppp!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + # same host but different scheme and port -- host limit still applies + url = 'https://localhost:{}/q/r'.format(https_daemon.server_port) + for i in range(9): + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'q!' + assert response.content == b'I am the warcprox test payload! rrrrrrrrrr!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + # back to http, and this is the 11th request + url = 'http://localhost:{}/u/v'.format(http_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 420 + assert response.reason == "Reached limit" + expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}} + assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta + assert response.headers["content-type"] == "text/plain;charset=utf-8" + assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n" + + # https also blocked + url = 'https://localhost:{}/w/x'.format(https_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 420 + assert response.reason == "Reached limit" + expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}} + assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta + assert response.headers["content-type"] == "text/plain;charset=utf-8" + assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n" + +def test_host_data_limit( + http_daemon, https_daemon, warcprox_, archiving_proxies): + request_meta = { + "stats": {"buckets": [{"bucket":"test_host_data_limit_bucket","tally-host-stats":True}]}, + # response is 135 bytes, so 3rd novel url should be disallowed + "limits": {"test_host_data_limit_bucket:localhost.new.wire_bytes":200}, + } + headers = {"Warcprox-Meta": json.dumps(request_meta)} + + url = 'http://localhost:{}/y/z'.format(http_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'y!' + assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + # duplicate, does not count toward limit + url = 'https://localhost:{}/y/z'.format(https_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'y!' + assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + # novel, pushes stats over the limit + url = 'https://localhost:{}/z/~'.format(https_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'z!' + assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n' + + # wait for writer thread to process + time.sleep(0.5) + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + time.sleep(0.5) + + # blocked because we're over the limit now + url = 'http://localhost:{}/y/z'.format(http_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 420 + assert response.reason == "Reached limit" + expected_response_meta = {'reached-limit': {'test_host_data_limit_bucket:localhost.new.wire_bytes': 200}, 'stats': {'test_host_data_limit_bucket:localhost': {'total': {'wire_bytes': 405, 'urls': 3}, 'revisit': {'wire_bytes': 135, 'urls': 1}, 'new': {'wire_bytes': 270, 'urls': 2}, 'bucket': 'test_host_data_limit_bucket:localhost'}}} + assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta + assert response.headers["content-type"] == "text/plain;charset=utf-8" + assert response.raw.data == b"request rejected by warcprox: reached limit test_host_data_limit_bucket:localhost.new.wire_bytes=200\n" + + # https also blocked + url = 'https://localhost:{}/w/x'.format(https_daemon.server_port) + response = requests.get( + url, proxies=archiving_proxies, headers=headers, stream=True, + verify=False) + assert response.status_code == 420 + assert response.reason == "Reached limit" + expected_response_meta = {'reached-limit': {'test_host_data_limit_bucket:localhost.new.wire_bytes': 200}, 'stats': {'test_host_data_limit_bucket:localhost': {'total': {'wire_bytes': 405, 'urls': 3}, 'revisit': {'wire_bytes': 135, 'urls': 1}, 'new': {'wire_bytes': 270, 'urls': 2}, 'bucket': 'test_host_data_limit_bucket:localhost'}}} + assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta + assert response.headers["content-type"] == "text/plain;charset=utf-8" + assert response.raw.data == b"request rejected by warcprox: reached limit test_host_data_limit_bucket:localhost.new.wire_bytes=200\n" # XXX this test relies on a tor proxy running at localhost:9050 with a working # connection to the internet, and relies on a third party site (facebook) being diff --git a/warcprox/stats.py b/warcprox/stats.py index 7bf3fbc..edbb131 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -1,23 +1,23 @@ -# -# warcprox/stats.py - keeps statistics on what has been proxied, archived -# -# Copyright (C) 2013-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +''' +warcprox/stats.py - keeps statistics on what has been archived + +Copyright (C) 2013-2016 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' from __future__ import absolute_import @@ -30,6 +30,7 @@ import warcprox import threading import rethinkdb as r import datetime +import surt def _empty_bucket(bucket): return { @@ -37,17 +38,14 @@ def _empty_bucket(bucket): "total": { "urls": 0, "wire_bytes": 0, - # "warc_bytes": 0, }, "new": { "urls": 0, "wire_bytes": 0, - # "warc_bytes": 0, }, "revisit": { "urls": 0, "wire_bytes": 0, - # "warc_bytes": 0, }, } @@ -109,17 +107,51 @@ class StatsDb: def notify(self, recorded_url, records): self.tally(recorded_url, records) - def tally(self, recorded_url, records): - buckets = ["__all__"] + def buckets(self, recorded_url): + ''' + Unravels bucket definitions in Warcprox-Meta header. Each bucket + definition can either be a string, which signifies the name of the + bucket, or a dict. If a dict it is expected to have at least an item + with key 'bucket' whose value is the name of the bucket. The other + currently recognized item is 'tally-host-stats', which if true, + instructs warcprox to additionally tally substats of the given bucket + by host. Host stats are stored in the stats table under the key + '{parent-bucket}:{host}'. + Example Warcprox-Meta header (a real one will likely have other + sections besides 'stats'): + + Warcprox-Meta: {'stats':{'buckets':['bucket1',{'bucket':'bucket2','tally-host-stats':true}]}} + ''' + buckets = ["__all__"] if (recorded_url.warcprox_meta and "stats" in recorded_url.warcprox_meta and "buckets" in recorded_url.warcprox_meta["stats"]): - buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"]) + for bucket in recorded_url.warcprox_meta["stats"]["buckets"]: + if isinstance(bucket, dict): + if not 'bucket' in bucket: + self.logger.warn( + 'ignoring invalid stats bucket in ' + 'warcprox-meta header %s', bucket) + continue + buckets.append(bucket['bucket']) + # XXX maybe host has been computed elsewhere and can be + # cached somewhere, but maybe the performance gain would be + # negligible + if bucket.get('tally-host-stats'): + buckets.append('%s:%s' % ( + bucket['bucket'], + surt.handyurl.parse(recorded_url.url.decode( + 'utf-8')).host)) + else: + buckets.append(bucket) else: buckets.append("__unspecified__") - for bucket in buckets: + return buckets + + def tally(self, recorded_url, records): + for bucket in self.buckets(recorded_url): # Gdbm wants str/bytes keys in python2, str/unicode keys in python3. # This ugliness deals with keys that arrive as unicode in py2. b = bucket.encode("utf-8") if bucket and not isinstance(bucket, str) else bucket @@ -140,7 +172,7 @@ class StatsDb: self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") -class RethinkStatsDb: +class RethinkStatsDb(StatsDb): """Updates database in batch every 2.0 seconds""" logger = logging.getLogger("warcprox.stats.RethinkStatsDb") @@ -162,37 +194,47 @@ class RethinkStatsDb: """Starts batch update repeating timer.""" self._update_batch() # starts repeating timer + def _bucket_batch_update_reql(bucket): + return self.r.table(self.table).get(bucket).replace( + lambda old: r.branch( + old.eq(None), self._batch[bucket], old.merge({ + "total": { + "urls": old["total"]["urls"].add( + self._batch[bucket]["total"]["urls"]), + "wire_bytes": old["total"]["wire_bytes"].add( + self._batch[bucket]["total"]["wire_bytes"]), + }, + "new": { + "urls": old["new"]["urls"].add( + self._batch[bucket]["new"]["urls"]), + "wire_bytes": old["new"]["wire_bytes"].add( + self._batch[bucket]["new"]["wire_bytes"]), + }, + "revisit": { + "urls": old["revisit"]["urls"].add( + self._batch[bucket]["revisit"]["urls"]), + "wire_bytes": old["revisit"]["wire_bytes"].add( + self._batch[bucket]["revisit"]["wire_bytes"]), + }, + }))) + def _update_batch(self): with self._batch_lock: if len(self._batch) > 0: - # XXX can this be done in one query? - # r.db("archiveit_brozzler").table("test00").get_all(*["foo01","foo"])... - # >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn) - for k in self._batch: - result = self.r.table(self.table).get(k).replace( - lambda old: r.branch(old.eq(None), self._batch[k], old.merge( - { - "total": { - "urls": old["total"]["urls"].add(self._batch[k]["total"]["urls"]), - "wire_bytes": old["total"]["wire_bytes"].add(self._batch[k]["total"]["wire_bytes"]), - }, - "new": { - "urls": old["new"]["urls"].add(self._batch[k]["new"]["urls"]), - "wire_bytes": old["new"]["wire_bytes"].add(self._batch[k]["new"]["wire_bytes"]), - }, - "revisit": { - "urls": old["revisit"]["urls"].add(self._batch[k]["revisit"]["urls"]), - "wire_bytes": old["revisit"]["wire_bytes"].add(self._batch[k]["revisit"]["wire_bytes"]), - }, - } - ))).run() - if not result["inserted"] and not result["replaced"] or sorted(result.values()) != [0,0,0,0,0,1]: - raise Exception("unexpected result %s updating stats %s" % (result, self._batch[k])) + # XXX can all the buckets be done in one query? + for bucket in self._batch: + result = self._bucket_batch_update_reql(bucket).run() + if (not result["inserted"] and not result["replaced"] + or sorted(result.values()) != [0,0,0,0,0,1]): + raise Exception( + "unexpected result %s updating stats %s" % ( + result, self._batch[bucket])) self._batch = {} if not self._stop.is_set(): self._timer = threading.Timer(2.0, self._update_batch) - self._timer.name = "RethinkStats-batch-update-timer-%s" % datetime.datetime.utcnow().isoformat() + self._timer.name = "RethinkStats-batch-update-timer-%s" % ( + datetime.datetime.utcnow().isoformat()) self._timer.start() else: self.logger.info("finished") @@ -227,7 +269,9 @@ class RethinkStatsDb: def value(self, bucket0="__all__", bucket1=None, bucket2=None): bucket0_stats = self.r.table(self.table).get(bucket0).run() - self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) + self.logger.debug( + 'stats db lookup of bucket=%s returned %s', + bucket0, bucket0_stats) if bucket0_stats: if bucket1: if bucket2: @@ -236,37 +280,24 @@ class RethinkStatsDb: return bucket0_stats[bucket1] return bucket0_stats - def _tally(self, buckets, size, is_revisit): + def tally(self, recorded_url, records): + buckets = self.buckets(recorded_url) + is_revisit = records[0].get_header( + warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT with self._batch_lock: for bucket in buckets: - bucket_stats = self._batch.setdefault(bucket, _empty_bucket(bucket)) + bucket_stats = self._batch.setdefault( + bucket, _empty_bucket(bucket)) bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += size + bucket_stats["total"]["wire_bytes"] += recorded_url.size if is_revisit: bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += size + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size else: bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += size - - def _extract_stats_info(self, recorded_url, records): - buckets = ["__all__"] - - if (recorded_url.warcprox_meta - and "stats" in recorded_url.warcprox_meta - and "buckets" in recorded_url.warcprox_meta["stats"]): - buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"]) - else: - buckets.append("__unspecified__") - - is_revisit = records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT - - return buckets, recorded_url.size, is_revisit - - def tally(self, recorded_url, records): - self._tally(*self._extract_stats_info(recorded_url, records)) + bucket_stats["new"]["wire_bytes"] += recorded_url.size def notify(self, recorded_url, records): self.tally(recorded_url, records) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index d342774..5ffe83d 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -100,6 +100,18 @@ class Url: return host_parts[-len(domain_parts):] == domain_parts class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): + ''' + XXX add more information. + + Among other things, this class enforces limits specified in the + Warcprox-Meta request header. If a limit is deemed to have been reached, no + request will be made to the remote destination server. This implementation + detail has implications worth noting. For example, if a limit applies to + "new" (not deduplicated) bytes, and the limit has already been reached, no + request will be made, even if it would have resulted in duplicate content, + which would not count toward the limit. To reiterate, this is because the + limit enforcer does not know that the content would be deduplicated. + ''' # self.server is WarcProxy logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler") @@ -173,16 +185,24 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): key, limit = item bucket0, bucket1, bucket2 = key.rsplit(".", 2) value = self.server.stats_db.value(bucket0, bucket1, bucket2) - self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s", - warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize()) + self.logger.debug( + "warcprox_meta['limits']=%s stats['%s']=%s " + "recorded_url_q.qsize()=%s", warcprox_meta['limits'], + key, value, self.server.recorded_url_q.qsize()) if value and value >= limit: - body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8") + body = ("request rejected by warcprox: reached limit " + "%s=%s\n" % (key, limit)).encode("utf-8") self.send_response(420, "Reached limit") self.send_header("Content-Type", "text/plain;charset=utf-8") self.send_header("Connection", "close") self.send_header("Content-Length", len(body)) - response_meta = {"reached-limit":{key:limit}, "stats":{bucket0:self.server.stats_db.value(bucket0)}} - self.send_header("Warcprox-Meta", json.dumps(response_meta, separators=(",",":"))) + response_meta = { + "reached-limit": {key:limit}, + "stats": {bucket0:self.server.stats_db.value(bucket0)} + } + self.send_header( + "Warcprox-Meta", + json.dumps(response_meta, separators=(",",":"))) self.end_headers() if self.command != "HEAD": self.wfile.write(body)