Merge branch 'dedup-fixes' into qa

This commit is contained in:
Barbara Miller 2019-06-14 15:04:00 -07:00
commit f906312800
19 changed files with 426 additions and 159 deletions

View File

@ -1,20 +1,19 @@
sudo: required
dist: xenial
language: python
python:
- 3.7
- 3.6
- 3.5
- 3.4
- 2.7
- pypy
- pypy3
- 3.7-dev
- pypy3.5
- nightly
matrix:
allow_failures:
- python: nightly
- python: 3.7-dev
- python: 2.7
- python: pypy

View File

@ -89,12 +89,13 @@ for deduplication works similarly to deduplication by `Heritrix
4. If not found,
a. Write ``response`` record with full payload
b. Store new entry in deduplication database
b. Store new entry in deduplication database (can be disabled, see
`Warcprox-Meta HTTP request header <api.rst#warcprox-meta-http-request-header>`
The deduplication database is partitioned into different "buckets". URLs are
deduplicated only against other captures in the same bucket. If specified, the
``dedup-bucket`` field of the `Warcprox-Meta HTTP request header
<api.rst#warcprox-meta-http-request-header>`_ determines the bucket. Otherwise,
``dedup-buckets`` field of the `Warcprox-Meta HTTP request header
<api.rst#warcprox-meta-http-request-header>`_ determines the bucket(s). Otherwise,
the default bucket is used.
Deduplication can be disabled entirely by starting warcprox with the argument

10
api.rst
View File

@ -137,14 +137,16 @@ Example::
Warcprox-Meta: {"warc-prefix": "special-warc"}
``dedup-bucket`` (string)
``dedup-buckets`` (string)
~~~~~~~~~~~~~~~~~~~~~~~~~
Specifies the deduplication bucket. For more information about deduplication
Specifies the deduplication bucket(s). For more information about deduplication
see `<README.rst#deduplication>`_.
Example::
Examples::
Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"}
Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}}
Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}}
``blocks`` (list)
~~~~~~~~~~~~~~~~~

View File

@ -25,14 +25,16 @@ import setuptools
deps = [
'certauth==1.1.6',
'warctools>=4.10.0,<=4.10.0',
'urlcanon>=0.1.dev16',
'warctools>=4.10.0',
'urlcanon>=0.3.0',
'doublethink>=0.2.0.dev87',
'urllib3>=1.23',
'urllib3>=1.14,<1.25',
'requests>=2.0.1',
'PySocks>=1.6.8',
'cryptography>=2.3',
'idna>=2.5',
'PyYAML>=5.1',
'cachetools',
]
try:
import concurrent.futures
@ -41,7 +43,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.4b4.dev195',
version='2.4.14',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -80,7 +80,7 @@ RUN apt-get install -y libsqlite3-dev
# trough itself
RUN virtualenv -p python3 /opt/trough-ve3 \
&& . /opt/trough-ve3/bin/activate \
&& pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \
&& pip install git+https://github.com/nlevitt/snakebite.git@py3 \
&& pip install git+https://github.com/internetarchive/trough.git
RUN mkdir -vp /etc/service/trough-sync-local \

View File

@ -5,7 +5,7 @@
set -x
pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string
pip install git+https://github.com/nlevitt/snakebite.git@py3
pip install git+https://github.com/internetarchive/trough.git
mkdir /etc/trough

View File

@ -93,9 +93,11 @@ logging.basicConfig(
stream=sys.stdout, level=logging.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("urllib3").setLevel(logging.WARN)
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
import urllib3 ; urllib3.disable_warnings()
import requests.packages.urllib3 ; requests.packages.urllib3.disable_warnings()
def wait(callback, timeout=10):
start = time.time()
@ -144,7 +146,7 @@ def dump_state(signum=None, frame=None):
stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack))
logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
logging.warning("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
signal.signal(signal.SIGQUIT, dump_state)
@ -279,6 +281,15 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
payload = b'Test.'
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/incomplete-read':
headers = (b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'Transfer-Encoding: chunked\r\n'
+ b'\r\n')
# payload = b'''1\r\na'''
payload = chunkify(
b'Server closes connection when client expects next chunk')
payload = payload[:-7]
else:
payload = b'404 Not Found\n'
headers = (b'HTTP/1.1 404 Not Found\r\n'
@ -292,7 +303,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
headers, payload = self.build_response()
self.connection.sendall(headers)
self.connection.sendall(payload)
if self.path in ('/missing-content-length', '/empty-response'):
if self.path in (
'/missing-content-length', '/empty-response',
'/incomplete-read'):
# server must close the connection, else client has no idea if
# there is more data coming
self.connection.shutdown(socket.SHUT_RDWR)
@ -446,7 +459,7 @@ def warcprox_(request, http_daemon, https_daemon):
logging.info('dropping rethinkdb database %r', parsed.database)
rr.db_drop(parsed.database).run()
except Exception as e:
logging.warn(
logging.warning(
'problem deleting rethinkdb database %r: %s',
parsed.database, e)
logging.info('deleting working directory %r', work_dir)
@ -777,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -803,7 +816,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert dedup_lookup is None
# archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -903,6 +916,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
finally:
fh.close()
def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
# archive url1
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly",
"dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}})
}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check url1 in dedup db bucket_1 (rw)
# logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1")
assert dedup_lookup
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
record_id = dedup_lookup['id']
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_2 (ro)
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2")
assert dedup_lookup is None
# close the warc
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
warc_path = os.path.join(writer.directory, writer.finalname)
assert not os.path.exists(warc_path)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close()
assert os.path.exists(warc_path)
# read the warc
fh = warctools.ArchiveRecord.open_archive(warc_path)
record_iter = fh.read_records(limit=None, offsets=True)
try:
(offset, record, errors) = next(record_iter)
assert record.type == b'warcinfo'
# url1 bucket_1
(offset, record, errors) = next(record_iter)
assert record.type == b'response'
assert record.url == url1.encode('ascii')
# check for duplicate warc record headers
assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# that's all folks
assert next(record_iter)[1] == None
assert next(record_iter, None) == None
finally:
fh.close()
def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
revisits_before = warcprox_.proxy.stats_db.value(
@ -915,7 +993,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -931,7 +1009,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, -i - 1)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -946,7 +1024,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -965,12 +1043,12 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
},
{
"url_match": "SURT_MATCH",
"value": "http://(localhost:%s,)/fuh/" % (http_daemon.server_port),
"value": "http://(localhost,:%s)/fuh/" % (http_daemon.server_port),
},
{
"url_match": "SURT_MATCH",
# this rule won't match because of http scheme, https port
"value": "http://(localhost:%s,)/fuh/" % (https_daemon.server_port),
"value": "http://(localhost,:%s)/fuh/" % (https_daemon.server_port),
},
{
"domain": "bad.domain.com",
@ -1487,7 +1565,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive with dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False}
request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1505,7 +1583,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive without dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag'}
request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1611,13 +1689,11 @@ def test_controller_with_defaults():
assert not wwp.writer_pool.default_warc_writer.record_builder.base32
assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
class EarlyPlugin(warcprox.BaseStandardPostfetchProcessor):
CHAIN_POSITION = 'early'
def _process_url(self):
pass
def test_load_plugin():
options = warcprox.Options(port=0, plugins=[
'warcprox.stats.RunningStats',
@ -1714,13 +1790,13 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies):
url = 'http://localhost:%s/b/b' % http_daemon.server_port
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"../../../../etc/a"})}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 500
assert response.status_code == 400
assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix'
url = 'http://localhost:%s/b/c' % http_daemon.server_port
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"..\\..\\..\\derp\\monkey"})}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 500
assert response.status_code == 400
assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix'
def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
@ -1763,7 +1839,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
crawl_log = open(default_crawl_log_path, 'rb').read()
# tests will fail in year 3000 :)
assert re.match(b'\A2[^\n]+\n\Z', crawl_log)
assert re.match(br'\A2[^\n]+\n\Z', crawl_log)
assert crawl_log[24:31] == b' 200 '
assert crawl_log[31:42] == b' 54 '
fields = crawl_log.split()
@ -1783,7 +1859,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert extra_info['contentSize'] == 145
crawl_log_1 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_1)
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_1)
assert crawl_log_1[24:31] == b' 200 '
assert crawl_log_1[31:42] == b' 54 '
fields = crawl_log_1.split()
@ -1821,7 +1897,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
crawl_log_2 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2)
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_2)
assert crawl_log_2[24:31] == b' 200 '
assert crawl_log_2[31:42] == b' 54 '
fields = crawl_log_2.split()
@ -1854,7 +1930,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert os.path.exists(file)
crawl_log_3 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3)
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_3)
assert crawl_log_3[24:31] == b' 200 '
assert crawl_log_3[31:42] == b' 0 '
fields = crawl_log_3.split()
@ -1894,7 +1970,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert os.path.exists(file)
crawl_log_4 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4)
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_4)
assert crawl_log_4[24:31] == b' 204 '
assert crawl_log_4[31:42] == b' 38 '
fields = crawl_log_4.split()
@ -1976,6 +2052,10 @@ def test_socket_timeout_response(
def test_empty_response(
warcprox_, http_daemon, https_daemon, archiving_proxies,
playback_proxies):
# localhost:server_port was added to the `bad_hostnames_ports` cache by
# previous tests and this causes subsequent tests to fail. We clear it.
warcprox_.proxy.bad_hostnames_ports.clear()
url = 'http://localhost:%s/empty-response' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502
@ -1991,6 +2071,10 @@ def test_payload_digest(warcprox_, http_daemon):
Tests that digest is of RFC2616 "entity body"
(transfer-decoded but not content-decoded)
'''
# localhost:server_port was added to the `bad_hostnames_ports` cache by
# previous tests and this causes subsequent tests to fail. We clear it.
warcprox_.proxy.bad_hostnames_ports.clear()
class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler):
def __init__(self, url):
self.path = url
@ -2224,6 +2308,23 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies):
with pytest.raises(StopIteration):
next(rec_iter)
def test_incomplete_read(http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# see https://github.com/internetarchive/warcprox/pull/123
url = 'http://localhost:%s/incomplete-read' % http_daemon.server_port
with pytest.raises(requests.exceptions.ChunkedEncodingError):
response = requests.get(
url, proxies=archiving_proxies, verify=False, timeout=10)
# although `requests.get` raises exception here, other clients like
# browsers put up with the server misbehavior; warcprox does too, and will
# record the response verbatim in the warc; this `wait()` call tests
# that a warc record is written
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
if __name__ == '__main__':
pytest.main()

View File

@ -78,6 +78,15 @@ class RequestBlockedByRule(Exception):
def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg)
class BadRequest(Exception):
'''
Raised in case of a request deemed unacceptable by warcprox.
'''
def __init__(self, msg):
self.msg = msg
def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg)
class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")

View File

@ -71,7 +71,7 @@ class RethinkCaptures:
"unexpected result saving batch of %s: %s "
"entries" % (len(self._batch), result))
if result["replaced"] > 0 or result["unchanged"] > 0:
self.logger.warn(
self.logger.warning(
"inserted=%s replaced=%s unchanged=%s in big "
"captures table (normally replaced=0 and "
"unchanged=0)", result["inserted"],
@ -148,7 +148,7 @@ class RethinkCaptures:
recorded_url.payload_digest.digest()
).decode("utf-8")
else:
self.logger.warn(
self.logger.warning(
"digest type is %r but big captures table is indexed "
"by sha1",
recorded_url.payload_digest.name)
@ -157,8 +157,11 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta
and "dedup-bucket" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["dedup-bucket"]
and "dedup-buckets" in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
# maybe this is the right thing to do here? or should we return an entry for each? or ?
break
else:
bucket = "__unspecified__"

View File

@ -441,7 +441,12 @@ class WarcproxController(object):
exc_info=True)
pass
finally:
self.shutdown()
try:
self.shutdown()
except:
self.logger.critical("graceful shutdown failed", exc_info=True)
self.logger.critical("killing myself -9")
os.kill(os.getpid(), 9)
def _dump_profiling(self):
import pstats, tempfile, os, io

View File

@ -34,6 +34,7 @@ import urllib3
from urllib3.exceptions import HTTPError
import collections
from concurrent import futures
from functools import lru_cache
urllib3.disable_warnings()
@ -46,11 +47,11 @@ class DedupableMixin(object):
def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required
When we use option --dedup-only-with-bucket, `dedup-buckets` is required
in Warcprox-Meta to perform dedup.
Return Boolean.
"""
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta:
if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta:
return False
if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size
@ -68,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
and recorded_url.payload_digest
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["dedup-bucket"],
recorded_url.url)
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, bucket, recorded_url.url)
if recorded_url.dedup_info:
# we found an existing capture
break
else:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, url=recorded_url.url)
@ -147,10 +151,12 @@ class DedupDb(DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta["dedup-bucket"])
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == "ro":
self.save(
digest_key, records[0],
bucket=bucket)
else:
self.save(digest_key, records[0])
@ -212,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"])
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(digest_key, records[0], bucket=bucket)
else:
self.save(digest_key, records[0])
@ -236,6 +244,7 @@ class CdxServerDedup(DedupDb):
headers['Cookie'] = options.cdxserver_dedup_cookies
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0,
timeout=2.0, headers=headers)
self.cached_lookup = lru_cache(maxsize=1024)(self.lookup)
def loader(self, *args, **kwargs):
return CdxServerDedupLoader(self, self.options)
@ -296,7 +305,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin)
def __init__(self, cdx_dedup, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
DedupableMixin.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=400)
self.pool = futures.ThreadPoolExecutor(max_workers=options.cdxserver_dedup_max_threads)
self.batch = set()
self.cdx_dedup = cdx_dedup
@ -315,7 +324,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin)
try:
digest_key = warcprox.digest_str(recorded_url.payload_digest,
self.options.base32)
dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url)
dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url)
cache_info = self.cdx_dedup.cached_lookup.cache_info()
if (cache_info.hits + cache_info.misses) % 1000 == 0:
self.logger.info(self.cdx_dedup.cached_lookup.cache_info())
if dedup_info:
recorded_url.dedup_info = dedup_info
except ValueError as exc:
@ -342,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.warc_records[0].type == b'response'
and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
and 'dedup-buckets' in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
buckets[bucket].append(recorded_url)
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
buckets['__unspecified__'].append(recorded_url)
return buckets
def _process_batch(self, batch):
@ -369,7 +382,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,
# there's no way to stop them, but that should be harmless
logging.warn(
logging.warning(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
@ -394,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.payload_digest
and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
and 'dedup-buckets' in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
buckets[bucket].append(recorded_url)
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
buckets['__unspecified__'].append(recorded_url)
else:
discards.append(
warcprox.digest_str(
@ -453,7 +466,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
recorded_url.dedup_info = entry
except Exception as e:
# batch_lookup raised exception or something
logging.warn(
logging.warning(
'problem looking up dedup info for %s urls '
'in bucket %s', len(buckets[bucket]), bucket,
exc_info=True)
@ -469,7 +482,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,
# there's no way to stop them, but that should be harmless
self.logger.warn(
self.logger.warning(
'timed out loading dedup info from trough', exc_info=True)
class TroughDedupDb(DedupDb, DedupableMixin):
@ -571,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['dedup-bucket'])
if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(
digest_key, records[0],
bucket=bucket)
else:
self.save(digest_key, records[0])

View File

@ -30,6 +30,7 @@ except ImportError:
import Queue as queue
import logging
import logging.config
import sys
import hashlib
import argparse
@ -39,6 +40,7 @@ import traceback
import signal
import threading
import certauth.certauth
import yaml
import warcprox
import doublethink
import cryptography.hazmat.backends.openssl
@ -168,6 +170,10 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
help=suppress(
'value of Cookie header to include in requests to the cdx '
'server, when using --cdxserver-dedup'))
hidden.add_argument(
'--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads',
type=int, default=50, help=suppress(
'maximum number of cdx server dedup threads'))
arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size',
type=int, default=0,
help=('try to dedup text resources with payload size over this limit in bytes'))
@ -235,6 +241,9 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
arg_parser.add_argument(
'--trace', dest='trace', action='store_true',
help='very verbose logging')
arg_parser.add_argument(
'--logging-conf-file', dest='logging_conf_file', default=None,
help=('reads logging configuration from a YAML file'))
arg_parser.add_argument(
'--version', action='version',
version="warcprox {}".format(warcprox.__version__))
@ -255,7 +264,7 @@ def dump_state(signum=None, frame=None):
except Exception as e:
state_strs.append('<n/a:%r>' % e)
logging.warn(
logging.warning(
'dumping state (caught signal %s)\n%s',
signum, '\n'.join(state_strs))
@ -298,6 +307,11 @@ def main(argv=None):
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
if args.logging_conf_file:
with open(args.logging_conf_file, 'r') as fd:
conf = yaml.safe_load(fd)
logging.config.dictConfig(conf)
# see https://github.com/pyca/cryptography/issues/2911
cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
@ -312,7 +326,11 @@ def main(argv=None):
# SIGQUIT does not exist on some platforms (windows)
pass
controller.run_until_shutdown()
try:
controller.run_until_shutdown()
except:
logging.fatal('unhandled exception in controller', exc_info=True)
sys.exit(1)
def ensure_rethinkdb_tables(argv=None):
'''
@ -384,7 +402,7 @@ def ensure_rethinkdb_tables(argv=None):
did_something = True
if args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
logging.warn(
logging.warning(
'trough is responsible for creating most of the rethinkdb '
'tables that it uses')
did_something = True

View File

@ -35,6 +35,13 @@ try:
import urllib.parse as urllib_parse
except ImportError:
import urlparse as urllib_parse
# In python2/3, urllib parse caches in memory URL parsing results to avoid
# repeating the process for the same URL. The problem is that the default
# in memory cache size is just 20.
# https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py#L80
# since we do a lot of URL parsing, it makes sense to increase cache size.
urllib_parse.MAX_CACHE_SIZE = 2000
try:
import http.client as http_client
# In python3 http.client.parse_headers() enforces http_client._MAXLINE
@ -45,6 +52,11 @@ try:
http_client._MAXLINE = 4194304 # 4 MiB
except ImportError:
import httplib as http_client
# http_client has an arbitrary limit of 100 HTTP Headers which is too low and
# it raises an HTTPException if the target URL has more.
# https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L113
http_client._MAXHEADERS = 7000
import json
import socket
import logging
@ -64,8 +76,13 @@ import urlcanon
import time
import collections
import cProfile
from urllib3 import PoolManager
from urllib3.util import is_connection_dropped
from urllib3.exceptions import TimeoutError, HTTPError
import doublethink
from cachetools import TTLCache
from threading import RLock
from certauth.certauth import CertificateAuthority
class ProxyingRecorder(object):
"""
@ -100,7 +117,7 @@ class ProxyingRecorder(object):
self.proxy_client.sendall(hunk)
except BaseException as e:
self._proxy_client_conn_open = False
self.logger.warn(
self.logger.warning(
'%s sending data to proxy client for url %s',
e, self.url)
self.logger.info(
@ -210,9 +227,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
and records the bytes in transit as it proxies them.
'''
logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler")
_socket_timeout = 60
_max_resource_size = None
_tmp_file_max_memory_size = 512 * 1024
onion_tor_socks_proxy_host = None
onion_tor_socks_proxy_port = None
def __init__(self, request, client_address, server):
threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1])
@ -228,7 +248,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
else:
self.url = self.path
u = urllib_parse.urlparse(self.url)
if u.scheme != 'http':
if u.scheme != 'http' or u.netloc == '':
raise Exception(
'unable to parse request %r as a proxy request' % (
self.requestline))
@ -240,6 +260,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
query=u.query, fragment=u.fragment))
self.hostname = urlcanon.normalize_host(host).decode('ascii')
def _hostname_port_cache_key(self):
return '%s:%s' % (self.hostname, self.port)
def _connect_to_remote_server(self):
'''
Connect to destination.
@ -251,7 +274,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
'''
self._conn_pool = self.server.remote_connection_pool.connection_from_host(
host=self.hostname, port=int(self.port), scheme='http',
pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout})
pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout})
self._remote_server_conn = self._conn_pool._get_conn()
if is_connection_dropped(self._remote_server_conn):
@ -283,7 +306,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_conn.sock = ssl.wrap_socket(
self._remote_server_conn.sock)
except ssl.SSLError:
self.logger.warn(
self.logger.warning(
"failed to establish ssl connection to %s; "
"python ssl library does not support SNI, "
"consider upgrading to python 2.7.9+ or 3.4+",
@ -332,7 +355,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
else:
self.send_error(500, str(e))
except Exception as f:
self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f))
self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f))
return
# Reload!
@ -368,25 +391,55 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
else:
self._determine_host_port()
assert self.url
# Check if target hostname:port is in `bad_hostnames_ports` cache
# to avoid retrying to connect. Cached value is http status code.
cached = None
hostname_port = self._hostname_port_cache_key()
with self.server.bad_hostnames_ports_lock:
cached = self.server.bad_hostnames_ports.get(hostname_port)
if cached:
self.logger.info('Cannot connect to %s (cache)', hostname_port)
self.send_error(cached)
return
# Connect to destination
self._connect_to_remote_server()
except warcprox.RequestBlockedByRule as e:
# limit enforcers have already sent the appropriate response
self.logger.info("%r: %r", self.requestline, e)
return
except warcprox.BadRequest as e:
self.send_error(400, e.msg)
return
except Exception as e:
# If connection fails, add hostname:port to cache to avoid slow
# subsequent reconnection attempts. `NewConnectionError` can be
# caused by many types of errors which are handled by urllib3.
response_code = 500
cache = False
if isinstance(e, (socket.timeout, TimeoutError,)):
response_code = 504
cache = True
elif isinstance(e, HTTPError):
response_code = 502
cache = True
if cache:
host_port = self._hostname_port_cache_key()
with self.server.bad_hostnames_ports_lock:
self.server.bad_hostnames_ports[host_port] = response_code
self.logger.info('bad_hostnames_ports cache size: %d',
len(self.server.bad_hostnames_ports))
self.logger.error(
"problem processing request %r: %r",
self.requestline, e, exc_info=True)
self.send_error(500, str(e))
self.send_error(response_code)
return
try:
return self._proxy_request()
except Exception as e:
if self.server.shutting_down:
self.logger.warn(
self.logger.warning(
'sending 503 warcprox shutting down %r: %r',
self.requestline, e)
self.send_error(503, 'warcprox shutting down')
@ -394,7 +447,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error(
'error from remote server(?) %r: %r',
self.requestline, e, exc_info=True)
self.send_error(502, str(e))
self.send_error(502)
return
def send_error(self, code, message=None, explain=None):
@ -410,9 +463,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
try:
return http_server.BaseHTTPRequestHandler.send_error(
self, code, message, explain)
except:
self.logger.error(
'send_error(%r, %r, %r) raised exception', exc_info=True)
except Exception as e:
level = logging.ERROR
if isinstance(e, OSError) and e.errno == 9:
level = logging.TRACE
self.logger.log(
level, 'send_error(%r, %r, %r) raised exception',
exc_info=True)
return None
def _proxy_request(self, extra_response_headers={}):
@ -478,9 +535,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
tmp_file_max_memory_size=self._tmp_file_max_memory_size)
prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(65536)
buf = None
while buf != b'':
buf = prox_rec_res.read(65536)
try:
buf = prox_rec_res.read(65536)
except http_client.IncompleteRead as e:
self.logger.warn('%s from %s', e, self.url)
buf = e.partial
if (self._max_resource_size and
prox_rec_res.recorder.len > self._max_resource_size):
prox_rec_res.truncated = b'length'
@ -506,7 +568,19 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
# put it back in the pool to reuse it later.
if not is_connection_dropped(self._remote_server_conn):
self._conn_pool._put_conn(self._remote_server_conn)
except:
except Exception as e:
# A common error is to connect to the remote server successfully
# but raise a `RemoteDisconnected` exception when trying to begin
# downloading. Its caused by prox_rec_res.begin(...) which calls
# http_client._read_status(). In that case, the host is also bad
# and we must add it to `bad_hostnames_ports` cache.
if isinstance(e, http_client.RemoteDisconnected):
host_port = self._hostname_port_cache_key()
with self.server.bad_hostnames_ports_lock:
self.server.bad_hostnames_ports[host_port] = 502
self.logger.info('bad_hostnames_ports cache size: %d',
len(self.server.bad_hostnames_ports))
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR)
self._remote_server_conn.sock.close()
raise
@ -521,7 +595,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
return self.do_COMMAND
def log_error(self, fmt, *args):
self.logger.warn(fmt, *args)
self.logger.warning(fmt, *args)
class PooledMixIn(socketserver.ThreadingMixIn):
logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn")
@ -670,3 +744,52 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
for sock in self.remote_server_socks:
self.shutdown_request(sock)
class SingleThreadedMitmProxy(http_server.HTTPServer):
logger = logging.getLogger('warcprox.warcproxy.SingleThreadedMitmProxy')
def __init__(
self, MitmProxyHandlerClass=MitmProxyHandler,
options=warcprox.Options()):
self.options = options
# TTLCache is not thread-safe. Access to the shared cache from multiple
# threads must be properly synchronized with an RLock according to ref:
# https://cachetools.readthedocs.io/en/latest/
self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60)
self.bad_hostnames_ports_lock = RLock()
self.remote_connection_pool = PoolManager(
num_pools=max((options.max_threads or 0) // 6, 400))
if options.onion_tor_socks_proxy:
try:
host, port = options.onion_tor_socks_proxy.split(':')
MitmProxyHandlerClass.onion_tor_socks_proxy_host = host
MitmProxyHandlerClass.onion_tor_socks_proxy_port = int(port)
except ValueError:
MitmProxyHandlerClass.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy
MitmProxyHandlerClass.onion_tor_socks_proxy_port = None
if options.socket_timeout:
MitmProxyHandlerClass._socket_timeout = options.socket_timeout
if options.max_resource_size:
MitmProxyHandlerClass._max_resource_size = options.max_resource_size
if options.tmp_file_max_memory_size:
MitmProxyHandlerClass._tmp_file_max_memory_size = options.tmp_file_max_memory_size
self.digest_algorithm = options.digest_algorithm or 'sha1'
ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64]
self.ca = CertificateAuthority(
ca_file=options.cacert or 'warcprox-ca.pem',
certs_dir=options.certs_dir or './warcprox-ca',
ca_name=ca_name)
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)
http_server.HTTPServer.__init__(
self, server_address, MitmProxyHandlerClass,
bind_and_activate=True)

View File

@ -42,6 +42,7 @@ from warcprox.mitmproxy import MitmProxyHandler
import warcprox
import sqlite3
import threading
from cachetools import TTLCache
class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler")
@ -219,6 +220,8 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
self.playback_index_db = playback_index_db
self.warcs_dir = options.directory
self.options = options
self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60)
self.bad_hostnames_ports_lock = threading.RLock()
def server_activate(self):
http_server.HTTPServer.server_activate(self)

View File

@ -81,7 +81,7 @@ def unravel_buckets(url, warcprox_meta):
for bucket in warcprox_meta["stats"]["buckets"]:
if isinstance(bucket, dict):
if not 'bucket' in bucket:
self.logger.warn(
self.logger.warning(
'ignoring invalid stats bucket in '
'warcprox-meta header %s', bucket)
continue

View File

@ -190,7 +190,7 @@ class TroughClient(object):
return
if response.status_code != 200:
self._write_url_cache.pop(segment_id, None)
self.logger.warn(
self.logger.warning(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
write_url, sql)

View File

@ -125,48 +125,59 @@ class WarcRecordBuilder:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
# truncated value may be 'length' or 'time'
if truncated is not None:
headers.append((b'WARC-Truncated', truncated))
if content_length is not None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(content_length).encode('latin1')))
if recorder is not None:
if content_length is not None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(content_length).encode('latin1')))
else:
if payload_digest is not None:
headers.append(
(warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
if content_length is None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(recorder.block_digest, self.base32)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
record = warctools.WarcRecord(
headers=headers, content_file=recorder.tempfile)
else:
if content_length is not None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(content_length).encode('latin1')))
else:
if content_length is None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(len(data)).encode('latin1')))
# no http headers so block digest == payload digest
if not payload_digest:
payload_digest = warcprox.digest_str(
block_digest = None
if not hasattr(data, 'read'):
block_digest = warcprox.digest_str(
hashlib.new(self.digest_algorithm, data), self.base32)
headers.append((
warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
headers.append((warctools.WarcRecord.BLOCK_DIGEST, payload_digest))
if not content_type.lower().startswith(b'application/http'):
# no http headers, so block digest == payload digest
if payload_digest and not block_digest:
block_digest = payload_digest
elif block_digest and not payload_digest:
payload_digest = block_digest
if block_digest:
headers.append(
(warctools.WarcRecord.BLOCK_DIGEST, block_digest))
if payload_digest:
headers.append(
(warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
if hasattr(data, 'read'):
record = warctools.WarcRecord(
headers=headers, content_file=data)
else:
content_tuple = content_type, data
record = warctools.WarcRecord(
headers=headers, content=content_tuple)
headers=headers, content=(content_type, data))
return record

View File

@ -38,15 +38,14 @@ import logging
import json
import socket
from hanzo import warctools
from certauth.certauth import CertificateAuthority
import warcprox
import datetime
import urlcanon
import os
from urllib3 import PoolManager
import tempfile
import hashlib
import doublethink
import re
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
@ -167,7 +166,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
if warcprox_meta and 'warc-prefix' in warcprox_meta and (
'/' in warcprox_meta['warc-prefix']
or '\\' in warcprox_meta['warc-prefix']):
raise Exception(
raise warcprox.BadRequest(
"request rejected by warcprox: slash and backslash are not "
"permitted in warc-prefix")
@ -349,6 +348,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
# logging better handled elsewhere?
pass
RE_MIMETYPE = re.compile(r'[;\s]')
class RecordedUrl:
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
@ -377,8 +377,14 @@ class RecordedUrl:
if warcprox_meta:
if 'captures-bucket' in warcprox_meta:
# backward compatibility
warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket']
warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw'
del warcprox_meta['captures-bucket']
if 'dedup-bucket' in warcprox_meta:
# more backwards compatibility
warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw'
del warcprox_meta['dedup-bucket']
self.warcprox_meta = warcprox_meta
else:
self.warcprox_meta = {}
@ -387,9 +393,8 @@ class RecordedUrl:
self.mimetype = content_type
if self.mimetype:
n = self.mimetype.find(";")
if n >= 0:
self.mimetype = self.mimetype[:n]
# chop off subtype, and ensure there's no whitespace
self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0]
self.custom_type = custom_type
self.status = status
@ -420,51 +425,20 @@ class RecordedUrl:
# inherit from object so that multiple inheritance from this class works
# properly in python 2
# http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639
class SingleThreadedWarcProxy(http_server.HTTPServer, object):
class SingleThreadedWarcProxy(warcprox.mitmproxy.SingleThreadedMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
self.start_time = doublethink.utcnow()
warcprox.mitmproxy.SingleThreadedMitmProxy.__init__(
self, WarcProxyHandler, options)
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
self.remote_connection_pool = PoolManager(
num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200)
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)
if options.onion_tor_socks_proxy:
try:
host, port = options.onion_tor_socks_proxy.split(':')
WarcProxyHandler.onion_tor_socks_proxy_host = host
WarcProxyHandler.onion_tor_socks_proxy_port = int(port)
except ValueError:
WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy
WarcProxyHandler.onion_tor_socks_proxy_port = None
if options.socket_timeout:
WarcProxyHandler._socket_timeout = options.socket_timeout
if options.max_resource_size:
WarcProxyHandler._max_resource_size = options.max_resource_size
if options.tmp_file_max_memory_size:
WarcProxyHandler._tmp_file_max_memory_size = options.tmp_file_max_memory_size
http_server.HTTPServer.__init__(
self, server_address, WarcProxyHandler, bind_and_activate=True)
self.digest_algorithm = options.digest_algorithm or 'sha1'
ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64]
self.ca = CertificateAuthority(
ca_file=options.cacert or 'warcprox-ca.pem',
certs_dir=options.certs_dir or './warcprox-ca',
ca_name=ca_name)
self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000)
self.running_stats = warcprox.stats.RunningStats()
def status(self):
@ -530,6 +504,6 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
self.remote_connection_pool.clear()
def handle_error(self, request, client_address):
self.logger.warn(
self.logger.warning(
"exception processing request %s from %s", request,
client_address, exc_info=True)

View File

@ -149,6 +149,7 @@ class WarcWriter:
record.get_header(b'WARC-Payload-Digest'), record.offset,
self.path, record.get_header(warctools.WarcRecord.URL))
self.f.flush()
self.last_activity = time.time()
return records