Merge branch 'master' into qa

* master:
  support "captures-bucket" for backward compatibility
  Add hidden CLI option --dedup-only-with-bucket
  dedup-bucket is required in Warcprox-Meta to do dedup
  Rename captures-bucket to dedup-bucket in Warcprox-Meta
  bump dev version number after #86
  Use DedupableMixin in RethinkCapturesDedup
  Fix travis-ci unit test issue
  Add unit tests
  Remove method decorate_with_dedup_info
  Use DedupableMixin in all dedup classes
  default to 100 proxy threads, 1 warc writer thread
  include warc writer worker threads in profiling
  cap the number of urls queued for warc writing
  oops! /status has been lying about queued urls
  Configurable min dedupable size for text/binary resources
  bump dev version number after PR
  Fix Accept-Encoding request header
  CDX dedup improvements
  bump dev version number after PR
  make test server multithreaded so tests will pass
  always call socket.shutdown() to close connections
  bump dev version number
  close connection when truncating response
  test another request after truncated response
  close all remote connections at shutdown
  tweak tests to make them pass now that keepalive
  enable keepalive on test http server
  more logging
  remove some debug logging
  this is some logging meant to debug the mysterious
  work around odd problem (see comment in code)
This commit is contained in:
Noah Levitt 2018-05-09 15:43:52 -07:00
commit 7afa92f102
11 changed files with 343 additions and 106 deletions

View File

@ -40,7 +40,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.4b2.dev159',
version='2.4b2.dev170',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -50,6 +50,7 @@ import io
import gzip
import mock
import email.message
import socketserver
try:
import http.server as http_server
@ -166,6 +167,9 @@ def chunkify(buf, chunk_size=13):
# return outbuf.getvalue()
class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
# enable keepalive
protocol_version = 'HTTP/1.1'
def build_response(self):
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
if m is not None:
@ -187,6 +191,18 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
+ b'Content-Type: text/plain\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
elif self.path == '/text-2bytes':
payload = b'aa'
headers = (b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
elif self.path == '/binary-4bytes':
payload = b'aaaa'
headers = (b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: application/octet-stream\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
elif self.path.startswith('/test_payload_digest-'):
content_body = (
b'Hello. How are you. I am the test_payload_digest '
@ -276,6 +292,11 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
headers, payload = self.build_response()
self.connection.sendall(headers)
self.connection.sendall(payload)
if self.path in ('/missing-content-length', '/empty-response'):
# server must close the connection, else client has no idea if
# there is more data coming
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
def do_HEAD(self):
logging.info('HEAD {}'.format(self.path))
@ -315,9 +336,20 @@ def cert(request):
finally:
f.close()
# We need this test server to accept multiple simultaneous connections in order
# to avoid mysterious looking test failures like these:
# https://travis-ci.org/internetarchive/warcprox/builds/362892231
# This is because we can't guarantee (without jumping through hoops) that
# MitmProxyHandler._proxy_request() returns the connection to the pool before
# the next request tries to get a connection from the pool in
# MitmProxyHandler._connect_to_remote_server(). (Unless we run warcprox
# single-threaded for these tests, which maybe we should consider?)
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http_server.HTTPServer):
pass
@pytest.fixture(scope="module")
def http_daemon(request):
http_daemon = http_server.HTTPServer(
http_daemon = ThreadedHTTPServer(
('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler)
logging.info('starting http://{}:{}'.format(http_daemon.server_address[0], http_daemon.server_address[1]))
http_daemon_thread = threading.Thread(name='HttpDaemonThread',
@ -336,9 +368,8 @@ def http_daemon(request):
@pytest.fixture(scope="module")
def https_daemon(request, cert):
# http://www.piware.de/2011/01/creating-an-https-server-in-python/
https_daemon = http_server.HTTPServer(('localhost', 0),
https_daemon = ThreadedHTTPServer(('localhost', 0),
RequestHandlerClass=_TestHttpRequestHandler)
# https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True)
https_daemon.socket = ssl.wrap_socket(https_daemon.socket, certfile=cert, server_side=True)
logging.info('starting https://{}:{}'.format(https_daemon.server_address[0], https_daemon.server_address[1]))
https_daemon_thread = threading.Thread(name='HttpsDaemonThread',
@ -354,8 +385,11 @@ def https_daemon(request, cert):
return https_daemon
# specify http_daemon and https_daemon as dependencies so that their finalizers
# run after warcprox is shut down, otherwise warcprox holds connections open
# and prevents the servers from shutting down
@pytest.fixture(scope="module")
def warcprox_(request):
def warcprox_(request, http_daemon, https_daemon):
orig_dir = os.getcwd()
work_dir = tempfile.mkdtemp()
logging.info('changing to working directory %r', work_dir)
@ -372,7 +406,9 @@ def warcprox_(request):
'--onion-tor-socks-proxy=localhost:9050',
'--crawl-log-dir=crawl-logs',
'--socket-timeout=4',
'--max-resource-size=200000']
'--max-resource-size=200000',
'--dedup-min-text-size=3',
'--dedup-min-binary-size=5']
if request.config.getoption('--rethinkdb-dedup-url'):
argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url'))
# test these here only
@ -709,7 +745,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -735,7 +771,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert dedup_lookup is None
# archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -764,7 +800,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# archive url1 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -961,6 +997,14 @@ def test_domain_doc_soft_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# we need to clear the connection pool here because
# - connection pool already may already have an open connection localhost
# - we're about to make a connection to foo.localhost
# - but our test server, which handles all the hosts, is single threaded
# - so it will fail to connect (socket timeout)
# must close connections before each connection to a different hostname
warcprox_.proxy.remote_connection_pool.clear()
request_meta = {
"stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]},
"soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10},
@ -978,6 +1022,8 @@ def test_domain_doc_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
warcprox_.proxy.remote_connection_pool.clear()
# make sure stats from different domain don't count
url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port)
for i in range(10):
@ -990,6 +1036,8 @@ def test_domain_doc_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11)
warcprox_.proxy.remote_connection_pool.clear()
# (2) same host but different scheme and port: domain limit applies
url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
@ -999,6 +1047,8 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
warcprox_.proxy.remote_connection_pool.clear()
# (3-9) different subdomain: host limit applies
url = 'https://baz.foo.localhost:{}/o/p'.format(https_daemon.server_port)
for i in range(7):
@ -1027,6 +1077,8 @@ def test_domain_doc_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20)
warcprox_.proxy.remote_connection_pool.clear()
# (11) back to http, and this is the 11th request
url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port)
response = requests.get(
@ -1038,6 +1090,8 @@ def test_domain_doc_soft_limit(
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n"
warcprox_.proxy.remote_connection_pool.clear()
# make sure limit doesn't get applied to a different domain
url = 'https://localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
@ -1050,6 +1104,8 @@ def test_domain_doc_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21)
warcprox_.proxy.remote_connection_pool.clear()
# https also blocked
url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
@ -1062,6 +1118,8 @@ def test_domain_doc_soft_limit(
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n"
warcprox_.proxy.remote_connection_pool.clear()
# same host, different capitalization still blocked
url = 'https://HEHEHE.fOO.lOcALhoST:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
@ -1086,6 +1144,8 @@ def test_domain_data_soft_limit(
}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
warcprox_.proxy.remote_connection_pool.clear()
url = 'http://ÞZz.localhost:{}/y/z'.format(http_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True)
@ -1096,6 +1156,8 @@ def test_domain_data_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
warcprox_.proxy.remote_connection_pool.clear()
# duplicate, does not count toward limit
url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port)
response = requests.get(
@ -1108,6 +1170,8 @@ def test_domain_data_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
warcprox_.proxy.remote_connection_pool.clear()
# novel, pushes stats over the limit
url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port)
response = requests.get(
@ -1120,6 +1184,8 @@ def test_domain_data_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
warcprox_.proxy.remote_connection_pool.clear()
# make sure limit doesn't get applied to a different host
url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port)
response = requests.get(
@ -1131,6 +1197,8 @@ def test_domain_data_soft_limit(
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
warcprox_.proxy.remote_connection_pool.clear()
# blocked because we're over the limit now
url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port)
response = requests.get(
@ -1226,14 +1294,23 @@ def test_limit_large_resource(archiving_proxies, http_daemon, warcprox_):
"""
urls_before = warcprox_.proxy.running_stats.urls
# this should be truncated
url = 'http://localhost:%s/300k-content' % http_daemon.server_port
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
assert len(response.content) == 262144
# test that the connection is cleaned up properly after truncating a
# response (no hang or timeout)
url = 'http://localhost:%s/' % http_daemon.server_port
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
assert response.status_code == 404
assert response.content == b'404 Not Found\n'
# wait for processing of this url to finish so that it doesn't interfere
# with subsequent tests
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
def test_method_filter(
warcprox_, https_daemon, http_daemon, archiving_proxies,
@ -1294,7 +1371,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive with dedup_ok:False
request_meta = {'captures-bucket':'test_dedup_ok_flag','dedup-ok':False}
request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1312,7 +1389,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive without dedup_ok:False
request_meta = {'captures-bucket':'test_dedup_ok_flag'}
request_meta = {'dedup-bucket':'test_dedup_ok_flag'}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1765,6 +1842,14 @@ def test_socket_timeout_response(
response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502
# test that the connection is cleaned up properly after truncating a
# response (no hang or timeout)
url = 'http://localhost:%s/' % http_daemon.server_port
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
assert response.status_code == 404
assert response.content == b'404 Not Found\n'
def test_empty_response(
warcprox_, http_daemon, https_daemon, archiving_proxies,
playback_proxies):
@ -1856,6 +1941,47 @@ def test_trough_segment_promotion(warcprox_):
time.sleep(3)
assert promoted == []
def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies):
"""We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we
try to download content smaller than these limits to make sure that it is
not deduplicated. We create the digest_str with the following code:
```
payload_digest = hashlib.new('sha1')
payload_digest.update(b'aa')
warcprox.digest_str(payload_digest)
```
"""
url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
assert len(response.content) == 2
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37')
assert dedup_lookup is None
time.sleep(3)
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37')
# This would return dedup data if payload_size > dedup-min-text-size
assert dedup_lookup is None
url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
assert len(response.content) == 4
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79')
assert dedup_lookup is None
time.sleep(3)
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79')
# This would return dedup data if payload_size > dedup-min-binary-size
assert dedup_lookup is None
if __name__ == '__main__':
pytest.main()

View File

@ -26,12 +26,14 @@ import time
import logging
from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version
import concurrent.futures
try:
import queue
except ImportError:
import Queue as queue
__version__ = _get_distribution('warcprox').version
def digest_str(hash_obj, base32=False):
import base64
return hash_obj.name.encode('utf-8') + b':' + (
@ -45,6 +47,17 @@ class Options(_Namespace):
except AttributeError:
return None
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
'''
`concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size.
If `max_queued` is set, calls to `submit()` will block if necessary until a
free slot is available.
'''
def __init__(self, max_queued=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queued or 0)
class TimestampedQueue(queue.Queue):
"""
A queue.Queue that exposes the time enqueued of the oldest item in the

View File

@ -34,6 +34,7 @@ import threading
import datetime
import doublethink
import rethinkdb as r
from warcprox.dedup import DedupableMixin
class RethinkCaptures:
"""Inserts in batches every 0.5 seconds"""
@ -156,8 +157,8 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta
and "captures-bucket" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["captures-bucket"]
and "dedup-bucket" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["dedup-bucket"]
else:
bucket = "__unspecified__"
@ -215,10 +216,11 @@ class RethinkCaptures:
if self._timer:
self._timer.join()
class RethinkCapturesDedup(warcprox.dedup.DedupDb):
class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin):
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, options=warcprox.Options()):
DedupableMixin.__init__(self, options)
self.captures_db = RethinkCaptures(options=options)
self.options = options
@ -251,5 +253,6 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb):
self.captures_db.close()
def notify(self, recorded_url, records):
self.captures_db.notify(recorded_url, records)
if (records and records[0].type == b'response'
and self.should_dedup(recorded_url)):
self.captures_db.notify(recorded_url, records)

View File

@ -141,11 +141,9 @@ class WarcproxController(object):
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
# default number of warc writer threads = sqrt(proxy.max_threads)
# pulled out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
# https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
if not self.options.writer_threads:
self.options.writer_threads = int(self.proxy.max_threads ** 0.5)
self.options.writer_threads = 1
self.build_postfetch_chain(self.proxy.recorded_url_q)
@ -164,8 +162,7 @@ class WarcproxController(object):
queued += len(processor.batch)
result['postfetch_chain'].append({
'processor': name,
'queued_urls': len(processor.inq.queue)})
'processor': name, 'queued_urls': queued})
return result
def chain(self, processor0, processor1):
@ -440,3 +437,18 @@ class WarcproxController(object):
self.logger.notice(
'performance profile of %s:\n%s', processor,
buf.getvalue())
if hasattr(processor, 'thread_profilers'):
files = []
for th_id, profiler in processor.thread_profilers.items():
file = os.path.join(tmpdir, '%s.dat' % th_id)
profiler.dump_stats(file)
files.append(file)
buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'aggregate performance profile of %s worker '
'threads of %s:\n%s',
len(files), processor, buf.getvalue())

View File

@ -37,20 +37,51 @@ from concurrent import futures
urllib3.disable_warnings()
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
class DedupableMixin(object):
def __init__(self, options=warcprox.Options()):
self.min_text_size = options.dedup_min_text_size
self.min_binary_size = options.dedup_min_binary_size
self.dedup_only_with_bucket = options.dedup_only_with_bucket
def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required
in Warcprox-Meta to perform dedup.
Return Boolean.
"""
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta:
return False
if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size
else:
return recorded_url.response_recorder.payload_size() > self.min_binary_size
class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
DedupableMixin.__init__(self, options)
self.dedup_db = dedup_db
def _process_url(self, recorded_url):
decorate_with_dedup_info(
self.dedup_db, recorded_url, self.options.base32)
if (recorded_url.response_recorder
and recorded_url.payload_digest
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["dedup-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, url=recorded_url.url)
class DedupDb(object):
class DedupDb(DedupableMixin):
logger = logging.getLogger("warcprox.dedup.DedupDb")
def __init__(
self, file='./warcprox.sqlite', options=warcprox.Options()):
DedupableMixin.__init__(self, options)
self.file = file
self.options = options
@ -113,33 +144,21 @@ class DedupDb(object):
def notify(self, recorded_url, records):
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta["captures-bucket"])
bucket=recorded_url.warcprox_meta["dedup-bucket"])
else:
self.save(digest_key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = dedup_db.lookup(
digest_key, url=recorded_url.url)
class RethinkDedupDb(DedupDb):
class RethinkDedupDb(DedupDb, DedupableMixin):
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, options=warcprox.Options()):
DedupableMixin.__init__(self, options)
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
@ -190,11 +209,11 @@ class RethinkDedupDb(DedupDb):
def notify(self, recorded_url, records):
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"])
else:
self.save(digest_key, records[0])
@ -205,17 +224,18 @@ class CdxServerDedup(DedupDb):
cookies = None
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
maxsize=200, options=warcprox.Options()):
maxsize=400, options=warcprox.Options()):
"""Initialize cdx server connection pool and related parameters.
Use low timeout value and no retries to avoid blocking warcprox
operation by a slow CDX server.
"""
self.cdx_url = cdx_url
self.options = options
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0,
timeout=2.0)
headers = {'User-Agent': 'warcprox', 'Accept-Encoding': 'gzip, deflate'}
if options.cdxserver_dedup_cookies:
self.cookies = options.cdxserver_dedup_cookies
headers['Cookie'] = options.cdxserver_dedup_cookies
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0,
timeout=2.0, headers=headers)
def loader(self, *args, **kwargs):
return CdxServerDedupLoader(self, self.options)
@ -245,10 +265,9 @@ class CdxServerDedup(DedupDb):
"""
u = url.decode("utf-8") if isinstance(url, bytes) else url
try:
headers = {'Cookie': self.cookies} if self.cookies else {}
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1), headers=headers)
limit=-1))
assert result.status == 200
if isinstance(digest_key, bytes):
dkey = digest_key
@ -273,17 +292,24 @@ class CdxServerDedup(DedupDb):
"""
pass
class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
def __init__(self, cdx_dedup, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=200)
DedupableMixin.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=400)
self.batch = set()
self.cdx_dedup = cdx_dedup
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
if (recorded_url.response_recorder
and recorded_url.payload_digest
and self.should_dedup(recorded_url)):
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
else:
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url):
try:
@ -300,9 +326,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
if self.outq:
self.outq.put(recorded_url)
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
DedupableMixin.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _filter_and_bucketize(self, batch):
@ -314,10 +341,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
for recorded_url in batch:
if (recorded_url.warc_records
and recorded_url.warc_records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
@ -346,9 +373,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
logging.warn(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
DedupableMixin.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _startup(self):
@ -363,10 +391,10 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for recorded_url in batch:
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
@ -423,7 +451,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
logging.warn(
'timed out loading dedup info from trough', exc_info=True)
class TroughDedupDb(DedupDb):
class TroughDedupDb(DedupDb, DedupableMixin):
'''
https://github.com/internetarchive/trough
'''
@ -440,6 +468,7 @@ class TroughDedupDb(DedupDb):
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
DedupableMixin.__init__(self, options)
self.options = options
self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
@ -512,12 +541,12 @@ class TroughDedupDb(DedupDb):
def notify(self, recorded_url, records):
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['captures-bucket'])
bucket=recorded_url.warcprox_meta['dedup-bucket'])
else:
self.save(digest_key, records[0])

View File

@ -148,6 +148,16 @@ def _build_arg_parser(prog='warcprox'):
# optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2"
arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies',
help=argparse.SUPPRESS)
arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size',
type=int, default=0,
help=('try to dedup text resources with payload size over this limit in bytes'))
arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size',
type=int, default=0, help=(
'try to dedup binary resources with payload size over this limit in bytes'))
# optionally, dedup request only when `dedup-bucket` is available in
# Warcprox-Meta HTTP header. By default, we dedup all requests.
arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,

View File

@ -390,6 +390,24 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.send_error(502, str(e))
return
def send_error(self, code, message=None, explain=None):
# BaseHTTPRequestHandler.send_response_only() in http/server.py
# does this:
# if not hasattr(self, '_headers_buffer'):
# self._headers_buffer = []
# but we sometimes see self._headers_buffer == None
# (This happened before! see commit dc9fdc34125dd2357)
# Workaround:
if hasattr(self, '_headers_buffer') and not self._headers_buffer:
self._headers_buffer = []
try:
return http_server.BaseHTTPRequestHandler.send_error(
self, code, message, explain)
except:
self.logger.error(
'send_error(%r, %r, %r) raised exception', exc_info=True)
return None
def _proxy_request(self, extra_response_headers={}):
'''
Sends the request to the remote server, then uses a ProxyingRecorder to
@ -447,13 +465,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
buf = prox_rec_res.read(65536)
while buf != b'':
buf = prox_rec_res.read(65536)
if self._max_resource_size:
if prox_rec_res.recorder.len > self._max_resource_size:
prox_rec_res.truncated = b'length'
self.logger.error(
'Max resource size %d bytes exceeded for URL %s',
if (self._max_resource_size and
prox_rec_res.recorder.len > self._max_resource_size):
prox_rec_res.truncated = b'length'
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR)
self._remote_server_conn.sock.close()
self.logger.info(
'truncating response because max resource size %d '
'bytes exceeded for URL %s',
self._max_resource_size, self.url)
break
break
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
# Let's close off the remote end. If remote connection is fine,
@ -461,6 +482,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
if not is_connection_dropped(self._remote_server_conn):
self._conn_pool._put_conn(self._remote_server_conn)
except:
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR)
self._remote_server_conn.sock.close()
raise
finally:
@ -479,35 +501,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
class PooledMixIn(socketserver.ThreadingMixIn):
logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn")
def __init__(self, max_threads=None):
'''
If max_threads is not supplied, calculates a reasonable value based
on system resource limits.
'''
self.active_requests = set()
self.unaccepted_requests = 0
if not max_threads:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
# real user ID of the calling process."
try:
import resource
rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0]
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2)
# resource.RLIM_INFINITY == -1 which can result in max_threads == 0
if max_threads <= 0 or max_threads > 5000:
max_threads = 5000
self.logger.info(
"max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)",
max_threads, rlimit_nproc, rlimit_nofile)
except Exception as e:
self.logger.warn(
"unable to calculate optimal number of threads based "
"on resource limits due to %s", e)
max_threads = 100
self.logger.info("max_threads=%s", max_threads)
self.max_threads = max_threads
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
if max_threads:
self.max_threads = max_threads
else:
self.max_threads = 100
self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads)
self.logger.info("%s proxy threads", self.max_threads)
def status(self):
if hasattr(super(), 'status'):

View File

@ -350,6 +350,10 @@ class RecordedUrl:
self.response_recorder = response_recorder
if warcprox_meta:
if 'captures-bucket' in warcprox_meta:
# backward compatibility
warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket']
del warcprox_meta['captures-bucket']
self.warcprox_meta = warcprox_meta
else:
self.warcprox_meta = {}
@ -376,6 +380,18 @@ class RecordedUrl:
self.warc_records = warc_records
self.do_not_archive = do_not_archive
def is_text(self):
"""Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types
Alternative method: try to decode('ascii') first N bytes to make sure
its text.
"""
if self.mimetype:
return self.mimetype[:5] == "text/" or self.mimetype in (
"application/xml", "application/javascript", "application/json",
"application/xhtml+xml", "application/typescript",
"image/svg+xml")
return False
# inherit from object so that multiple inheritance from this class works
# properly in python 2
# http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639
@ -483,6 +499,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
def server_close(self):
self.logger.info('shutting down')
http_server.HTTPServer.server_close(self)
self.remote_connection_pool.clear()
def handle_error(self, request, client_address):
self.logger.warn(

View File

@ -88,6 +88,7 @@ class _OneWritableWarc:
os.mkdir(self.directory)
self.finalname = self.next_filename(serial)
self.logger.trace('opening %s', self.finalname)
self.path = os.path.sep.join(
[self.directory, self.finalname + self.open_suffix])

View File

@ -31,6 +31,7 @@ import logging
import time
import warcprox
from concurrent import futures
import threading
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
@ -41,21 +42,45 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1)
# set max_queued small, because self.inq is already handling queueing
self.thread_local = threading.local()
self.thread_profilers = {}
# for us; but give it a little breathing room to make sure it can keep
# worker threads busy
self.pool = warcprox.ThreadPoolExecutor(
max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1))
self.batch = set()
def _startup(self):
self.logger.info('%s threads', self.pool._max_workers)
self.logger.info('%s warc writer threads', self.pool._max_workers)
warcprox.BaseStandardPostfetchProcessor._startup(self)
def _get_process_put(self):
try:
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
self.pool.submit(self._wrap_process_url, recorded_url)
finally:
self.writer_pool.maybe_idle_rollover()
def _wrap_process_url(self, recorded_url):
if not getattr(self.thread_local, 'name_set', False):
threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid()
self.thread_local.name_set = True
if self.options.profile:
import cProfile
if not hasattr(self.thread_local, 'profiler'):
self.thread_local.profiler = cProfile.Profile()
tid = threading.current_thread().ident
self.thread_profilers[tid] = self.thread_local.profiler
self.thread_local.profiler.enable()
self._process_url(recorded_url)
self.thread_local.profiler.disable()
else:
self._process_url(recorded_url)
def _process_url(self, recorded_url):
try:
records = []