support for tallying substats of a configured bucket by host, and enforcing limits host limits using those stats, with tests

This commit is contained in:
Noah Levitt 2016-06-24 20:04:27 -05:00
parent d48e2c462d
commit 2fe0c2f25b
4 changed files with 272 additions and 85 deletions

View File

@ -50,7 +50,7 @@ except:
deps.append('futures')
setuptools.setup(name='warcprox',
version='2.0.dev9',
version='2.0.dev10',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
@ -70,6 +70,7 @@ setuptools.setup(name='warcprox',
'License :: OSI Approved :: GNU General Public License (GPL)',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Topic :: Internet :: Proxy Servers',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries :: Python Modules',

View File

@ -57,7 +57,8 @@ import certauth.certauth
import warcprox
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
logging.basicConfig(stream=sys.stdout, level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
@ -563,7 +564,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
def test_limits(http_daemon, warcprox_, archiving_proxies):
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}}
request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket.total.urls":10}}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
@ -592,10 +593,10 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420
assert response.reason == "Reached limit"
expected_response_meta = {'reached-limit': {'job1.total.urls': 10}, 'stats': {'job1': {'bucket': 'job1', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}}
expected_response_meta = {'reached-limit': {'test_limits_bucket.total.urls': 10}, 'stats': {'test_limits_bucket': {'bucket': 'test_limits_bucket', 'revisit': {'wire_bytes': 1215, 'urls': 9}, 'total': {'wire_bytes': 1350, 'urls': 10}, 'new': {'wire_bytes': 135, 'urls': 1}}}}
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n"
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket.total.urls=10\n"
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
@ -839,6 +840,140 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
assert response.content.startswith(b"request rejected by warcprox: blocked by rule found in Warcprox-Meta header:")
assert json.loads(response.headers['warcprox-meta']) == {"blocked-by-rule":rules[3]}
def test_host_doc_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies):
request_meta = {
"stats": {"buckets": [{"bucket":"test_host_doc_limit_bucket","tally-host-stats":True}]},
"limits": {"test_host_doc_limit_bucket:localhost.total.urls":10},
}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
url = 'http://localhost:{}/o/p'.format(http_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# same host but different scheme and port -- host limit still applies
url = 'https://localhost:{}/q/r'.format(https_daemon.server_port)
for i in range(9):
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'q!'
assert response.content == b'I am the warcprox test payload! rrrrrrrrrr!\n'
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# back to http, and this is the 11th request
url = 'http://localhost:{}/u/v'.format(http_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420
assert response.reason == "Reached limit"
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}}
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
# https also blocked
url = 'https://localhost:{}/w/x'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
assert response.status_code == 420
assert response.reason == "Reached limit"
expected_response_meta = {'reached-limit': {'test_host_doc_limit_bucket:localhost.total.urls': 10}, 'stats': {'test_host_doc_limit_bucket:localhost': {'total': {'urls': 10, 'wire_bytes': 1350}, 'revisit': {'urls': 8, 'wire_bytes': 1080}, 'bucket': 'test_host_doc_limit_bucket:localhost', 'new': {'urls': 2, 'wire_bytes': 270}}}}
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_doc_limit_bucket:localhost.total.urls=10\n"
def test_host_data_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies):
request_meta = {
"stats": {"buckets": [{"bucket":"test_host_data_limit_bucket","tally-host-stats":True}]},
# response is 135 bytes, so 3rd novel url should be disallowed
"limits": {"test_host_data_limit_bucket:localhost.new.wire_bytes":200},
}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
url = 'http://localhost:{}/y/z'.format(http_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# duplicate, does not count toward limit
url = 'https://localhost:{}/y/z'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# novel, pushes stats over the limit
url = 'https://localhost:{}/z/~'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n'
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
time.sleep(0.5)
# blocked because we're over the limit now
url = 'http://localhost:{}/y/z'.format(http_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420
assert response.reason == "Reached limit"
expected_response_meta = {'reached-limit': {'test_host_data_limit_bucket:localhost.new.wire_bytes': 200}, 'stats': {'test_host_data_limit_bucket:localhost': {'total': {'wire_bytes': 405, 'urls': 3}, 'revisit': {'wire_bytes': 135, 'urls': 1}, 'new': {'wire_bytes': 270, 'urls': 2}, 'bucket': 'test_host_data_limit_bucket:localhost'}}}
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_data_limit_bucket:localhost.new.wire_bytes=200\n"
# https also blocked
url = 'https://localhost:{}/w/x'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
assert response.status_code == 420
assert response.reason == "Reached limit"
expected_response_meta = {'reached-limit': {'test_host_data_limit_bucket:localhost.new.wire_bytes': 200}, 'stats': {'test_host_data_limit_bucket:localhost': {'total': {'wire_bytes': 405, 'urls': 3}, 'revisit': {'wire_bytes': 135, 'urls': 1}, 'new': {'wire_bytes': 270, 'urls': 2}, 'bucket': 'test_host_data_limit_bucket:localhost'}}}
assert json.loads(response.headers["warcprox-meta"]) == expected_response_meta
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_host_data_limit_bucket:localhost.new.wire_bytes=200\n"
# XXX this test relies on a tor proxy running at localhost:9050 with a working
# connection to the internet, and relies on a third party site (facebook) being

View File

@ -1,23 +1,23 @@
#
# warcprox/stats.py - keeps statistics on what has been proxied, archived
#
# Copyright (C) 2013-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
'''
warcprox/stats.py - keeps statistics on what has been archived
Copyright (C) 2013-2016 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
@ -30,6 +30,7 @@ import warcprox
import threading
import rethinkdb as r
import datetime
import surt
def _empty_bucket(bucket):
return {
@ -37,17 +38,14 @@ def _empty_bucket(bucket):
"total": {
"urls": 0,
"wire_bytes": 0,
# "warc_bytes": 0,
},
"new": {
"urls": 0,
"wire_bytes": 0,
# "warc_bytes": 0,
},
"revisit": {
"urls": 0,
"wire_bytes": 0,
# "warc_bytes": 0,
},
}
@ -109,17 +107,51 @@ class StatsDb:
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
def tally(self, recorded_url, records):
buckets = ["__all__"]
def buckets(self, recorded_url):
'''
Unravels bucket definitions in Warcprox-Meta header. Each bucket
definition can either be a string, which signifies the name of the
bucket, or a dict. If a dict it is expected to have at least an item
with key 'bucket' whose value is the name of the bucket. The other
currently recognized item is 'tally-host-stats', which if true,
instructs warcprox to additionally tally substats of the given bucket
by host. Host stats are stored in the stats table under the key
'{parent-bucket}:{host}'.
Example Warcprox-Meta header (a real one will likely have other
sections besides 'stats'):
Warcprox-Meta: {'stats':{'buckets':['bucket1',{'bucket':'bucket2','tally-host-stats':true}]}}
'''
buckets = ["__all__"]
if (recorded_url.warcprox_meta
and "stats" in recorded_url.warcprox_meta
and "buckets" in recorded_url.warcprox_meta["stats"]):
buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"])
for bucket in recorded_url.warcprox_meta["stats"]["buckets"]:
if isinstance(bucket, dict):
if not 'bucket' in bucket:
self.logger.warn(
'ignoring invalid stats bucket in '
'warcprox-meta header %s', bucket)
continue
buckets.append(bucket['bucket'])
# XXX maybe host has been computed elsewhere and can be
# cached somewhere, but maybe the performance gain would be
# negligible
if bucket.get('tally-host-stats'):
buckets.append('%s:%s' % (
bucket['bucket'],
surt.handyurl.parse(recorded_url.url.decode(
'utf-8')).host))
else:
buckets.append(bucket)
else:
buckets.append("__unspecified__")
for bucket in buckets:
return buckets
def tally(self, recorded_url, records):
for bucket in self.buckets(recorded_url):
# Gdbm wants str/bytes keys in python2, str/unicode keys in python3.
# This ugliness deals with keys that arrive as unicode in py2.
b = bucket.encode("utf-8") if bucket and not isinstance(bucket, str) else bucket
@ -140,7 +172,7 @@ class StatsDb:
self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8")
class RethinkStatsDb:
class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
@ -162,37 +194,47 @@ class RethinkStatsDb:
"""Starts batch update repeating timer."""
self._update_batch() # starts repeating timer
def _bucket_batch_update_reql(bucket):
return self.r.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), self._batch[bucket], old.merge({
"total": {
"urls": old["total"]["urls"].add(
self._batch[bucket]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(
self._batch[bucket]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(
self._batch[bucket]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(
self._batch[bucket]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(
self._batch[bucket]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(
self._batch[bucket]["revisit"]["wire_bytes"]),
},
})))
def _update_batch(self):
with self._batch_lock:
if len(self._batch) > 0:
# XXX can this be done in one query?
# r.db("archiveit_brozzler").table("test00").get_all(*["foo01","foo"])...
# >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn)
for k in self._batch:
result = self.r.table(self.table).get(k).replace(
lambda old: r.branch(old.eq(None), self._batch[k], old.merge(
{
"total": {
"urls": old["total"]["urls"].add(self._batch[k]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(self._batch[k]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(self._batch[k]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(self._batch[k]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(self._batch[k]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(self._batch[k]["revisit"]["wire_bytes"]),
},
}
))).run()
if not result["inserted"] and not result["replaced"] or sorted(result.values()) != [0,0,0,0,0,1]:
raise Exception("unexpected result %s updating stats %s" % (result, self._batch[k]))
# XXX can all the buckets be done in one query?
for bucket in self._batch:
result = self._bucket_batch_update_reql(bucket).run()
if (not result["inserted"] and not result["replaced"]
or sorted(result.values()) != [0,0,0,0,0,1]):
raise Exception(
"unexpected result %s updating stats %s" % (
result, self._batch[bucket]))
self._batch = {}
if not self._stop.is_set():
self._timer = threading.Timer(2.0, self._update_batch)
self._timer.name = "RethinkStats-batch-update-timer-%s" % datetime.datetime.utcnow().isoformat()
self._timer.name = "RethinkStats-batch-update-timer-%s" % (
datetime.datetime.utcnow().isoformat())
self._timer.start()
else:
self.logger.info("finished")
@ -227,7 +269,9 @@ class RethinkStatsDb:
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
bucket0_stats = self.r.table(self.table).get(bucket0).run()
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
self.logger.debug(
'stats db lookup of bucket=%s returned %s',
bucket0, bucket0_stats)
if bucket0_stats:
if bucket1:
if bucket2:
@ -236,37 +280,24 @@ class RethinkStatsDb:
return bucket0_stats[bucket1]
return bucket0_stats
def _tally(self, buckets, size, is_revisit):
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
is_revisit = records[0].get_header(
warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(bucket, _empty_bucket(bucket))
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += size
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if is_revisit:
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += size
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += size
def _extract_stats_info(self, recorded_url, records):
buckets = ["__all__"]
if (recorded_url.warcprox_meta
and "stats" in recorded_url.warcprox_meta
and "buckets" in recorded_url.warcprox_meta["stats"]):
buckets.extend(recorded_url.warcprox_meta["stats"]["buckets"])
else:
buckets.append("__unspecified__")
is_revisit = records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT
return buckets, recorded_url.size, is_revisit
def tally(self, recorded_url, records):
self._tally(*self._extract_stats_info(recorded_url, records))
bucket_stats["new"]["wire_bytes"] += recorded_url.size
def notify(self, recorded_url, records):
self.tally(recorded_url, records)

View File

@ -100,6 +100,18 @@ class Url:
return host_parts[-len(domain_parts):] == domain_parts
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
XXX add more information.
Among other things, this class enforces limits specified in the
Warcprox-Meta request header. If a limit is deemed to have been reached, no
request will be made to the remote destination server. This implementation
detail has implications worth noting. For example, if a limit applies to
"new" (not deduplicated) bytes, and the limit has already been reached, no
request will be made, even if it would have resulted in duplicate content,
which would not count toward the limit. To reiterate, this is because the
limit enforcer does not know that the content would be deduplicated.
'''
# self.server is WarcProxy
logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler")
@ -173,16 +185,24 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
key, limit = item
bucket0, bucket1, bucket2 = key.rsplit(".", 2)
value = self.server.stats_db.value(bucket0, bucket1, bucket2)
self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s",
warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize())
self.logger.debug(
"warcprox_meta['limits']=%s stats['%s']=%s "
"recorded_url_q.qsize()=%s", warcprox_meta['limits'],
key, value, self.server.recorded_url_q.qsize())
if value and value >= limit:
body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8")
body = ("request rejected by warcprox: reached limit "
"%s=%s\n" % (key, limit)).encode("utf-8")
self.send_response(420, "Reached limit")
self.send_header("Content-Type", "text/plain;charset=utf-8")
self.send_header("Connection", "close")
self.send_header("Content-Length", len(body))
response_meta = {"reached-limit":{key:limit}, "stats":{bucket0:self.server.stats_db.value(bucket0)}}
self.send_header("Warcprox-Meta", json.dumps(response_meta, separators=(",",":")))
response_meta = {
"reached-limit": {key:limit},
"stats": {bucket0:self.server.stats_db.value(bucket0)}
}
self.send_header(
"Warcprox-Meta",
json.dumps(response_meta, separators=(",",":")))
self.end_headers()
if self.command != "HEAD":
self.wfile.write(body)