diff --git a/.travis.yml b/.travis.yml index 0ad15d4..1a351d3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,20 +1,19 @@ sudo: required - +dist: xenial language: python python: +- 3.7 - 3.6 - 3.5 - 3.4 - 2.7 - pypy -- pypy3 -- 3.7-dev +- pypy3.5 - nightly matrix: allow_failures: - python: nightly - - python: 3.7-dev - python: 2.7 - python: pypy diff --git a/README.rst b/README.rst index b7b5c17..77e7e58 100644 --- a/README.rst +++ b/README.rst @@ -89,12 +89,13 @@ for deduplication works similarly to deduplication by `Heritrix 4. If not found, a. Write ``response`` record with full payload - b. Store new entry in deduplication database + b. Store new entry in deduplication database (can be disabled, see + `Warcprox-Meta HTTP request header ` The deduplication database is partitioned into different "buckets". URLs are deduplicated only against other captures in the same bucket. If specified, the -``dedup-bucket`` field of the `Warcprox-Meta HTTP request header -`_ determines the bucket. Otherwise, +``dedup-buckets`` field of the `Warcprox-Meta HTTP request header +`_ determines the bucket(s). Otherwise, the default bucket is used. Deduplication can be disabled entirely by starting warcprox with the argument diff --git a/api.rst b/api.rst index 1da1898..eee3219 100644 --- a/api.rst +++ b/api.rst @@ -137,14 +137,16 @@ Example:: Warcprox-Meta: {"warc-prefix": "special-warc"} -``dedup-bucket`` (string) +``dedup-buckets`` (string) ~~~~~~~~~~~~~~~~~~~~~~~~~ -Specifies the deduplication bucket. For more information about deduplication +Specifies the deduplication bucket(s). For more information about deduplication see ``_. -Example:: +Examples:: - Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"} + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}} + + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}} ``blocks`` (list) ~~~~~~~~~~~~~~~~~ diff --git a/setup.py b/setup.py index ceb9886..0b6d756 100755 --- a/setup.py +++ b/setup.py @@ -25,14 +25,16 @@ import setuptools deps = [ 'certauth==1.1.6', - 'warctools>=4.10.0,<=4.10.0', - 'urlcanon>=0.1.dev16', + 'warctools>=4.10.0', + 'urlcanon>=0.3.0', 'doublethink>=0.2.0.dev87', - 'urllib3>=1.23', + 'urllib3>=1.14,<1.25', 'requests>=2.0.1', 'PySocks>=1.6.8', 'cryptography>=2.3', 'idna>=2.5', + 'PyYAML>=5.1', + 'cachetools', ] try: import concurrent.futures @@ -41,7 +43,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b4.dev195', + version='2.4.14', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/Dockerfile b/tests/Dockerfile index 6a97ac0..df9a688 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -80,7 +80,7 @@ RUN apt-get install -y libsqlite3-dev # trough itself RUN virtualenv -p python3 /opt/trough-ve3 \ && . /opt/trough-ve3/bin/activate \ - && pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \ + && pip install git+https://github.com/nlevitt/snakebite.git@py3 \ && pip install git+https://github.com/internetarchive/trough.git RUN mkdir -vp /etc/service/trough-sync-local \ diff --git a/tests/run-trough.sh b/tests/run-trough.sh index ce80488..64939a6 100644 --- a/tests/run-trough.sh +++ b/tests/run-trough.sh @@ -5,7 +5,7 @@ set -x -pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string +pip install git+https://github.com/nlevitt/snakebite.git@py3 pip install git+https://github.com/internetarchive/trough.git mkdir /etc/trough diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0602d39..b06043c 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -93,9 +93,11 @@ logging.basicConfig( stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +logging.getLogger("urllib3").setLevel(logging.WARN) logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) -warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning) -warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) +import urllib3 ; urllib3.disable_warnings() +import requests.packages.urllib3 ; requests.packages.urllib3.disable_warnings() def wait(callback, timeout=10): start = time.time() @@ -144,7 +146,7 @@ def dump_state(signum=None, frame=None): stack = traceback.format_stack(sys._current_frames()[th.ident]) state_strs.append("".join(stack)) - logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) + logging.warning("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) signal.signal(signal.SIGQUIT, dump_state) @@ -279,6 +281,15 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): payload = b'Test.' actual_headers = (b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') + elif self.path == '/incomplete-read': + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Transfer-Encoding: chunked\r\n' + + b'\r\n') + # payload = b'''1\r\na''' + payload = chunkify( + b'Server closes connection when client expects next chunk') + payload = payload[:-7] else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -292,7 +303,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): headers, payload = self.build_response() self.connection.sendall(headers) self.connection.sendall(payload) - if self.path in ('/missing-content-length', '/empty-response'): + if self.path in ( + '/missing-content-length', '/empty-response', + '/incomplete-read'): # server must close the connection, else client has no idea if # there is more data coming self.connection.shutdown(socket.SHUT_RDWR) @@ -446,7 +459,7 @@ def warcprox_(request, http_daemon, https_daemon): logging.info('dropping rethinkdb database %r', parsed.database) rr.db_drop(parsed.database).run() except Exception as e: - logging.warn( + logging.warning( 'problem deleting rethinkdb database %r: %s', parsed.database, e) logging.info('deleting working directory %r', work_dir) @@ -777,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) # archive url1 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -803,7 +816,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert dedup_lookup is None # archive url2 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -903,6 +916,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + + # archive url1 + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly", + "dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}}) + } + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + # check url1 in dedup db bucket_1 (rw) + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 not in dedup db bucket_2 (ro) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2") + assert dedup_lookup is None + + # close the warc + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] + warc_path = os.path.join(writer.directory, writer.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close() + assert os.path.exists(warc_path) + + # read the warc + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_1 + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + # check for duplicate warc record headers + assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls revisits_before = warcprox_.proxy.stats_db.value( @@ -915,7 +993,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -931,7 +1009,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, -i - 1) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -946,7 +1024,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -965,12 +1043,12 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): }, { "url_match": "SURT_MATCH", - "value": "http://(localhost:%s,)/fuh/" % (http_daemon.server_port), + "value": "http://(localhost,:%s)/fuh/" % (http_daemon.server_port), }, { "url_match": "SURT_MATCH", # this rule won't match because of http scheme, https port - "value": "http://(localhost:%s,)/fuh/" % (https_daemon.server_port), + "value": "http://(localhost,:%s)/fuh/" % (https_daemon.server_port), }, { "domain": "bad.domain.com", @@ -1487,7 +1565,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive with dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1505,7 +1583,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive without dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag'} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1611,13 +1689,11 @@ def test_controller_with_defaults(): assert not wwp.writer_pool.default_warc_writer.record_builder.base32 assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' - class EarlyPlugin(warcprox.BaseStandardPostfetchProcessor): CHAIN_POSITION = 'early' def _process_url(self): pass - def test_load_plugin(): options = warcprox.Options(port=0, plugins=[ 'warcprox.stats.RunningStats', @@ -1714,13 +1790,13 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies): url = 'http://localhost:%s/b/b' % http_daemon.server_port headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"../../../../etc/a"})} response = requests.get(url, proxies=archiving_proxies, headers=headers) - assert response.status_code == 500 + assert response.status_code == 400 assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' url = 'http://localhost:%s/b/c' % http_daemon.server_port headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"..\\..\\..\\derp\\monkey"})} response = requests.get(url, proxies=archiving_proxies, headers=headers) - assert response.status_code == 500 + assert response.status_code == 400 assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' def test_crawl_log(warcprox_, http_daemon, archiving_proxies): @@ -1763,7 +1839,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): crawl_log = open(default_crawl_log_path, 'rb').read() # tests will fail in year 3000 :) - assert re.match(b'\A2[^\n]+\n\Z', crawl_log) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log) assert crawl_log[24:31] == b' 200 ' assert crawl_log[31:42] == b' 54 ' fields = crawl_log.split() @@ -1783,7 +1859,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert extra_info['contentSize'] == 145 crawl_log_1 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_1) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_1) assert crawl_log_1[24:31] == b' 200 ' assert crawl_log_1[31:42] == b' 54 ' fields = crawl_log_1.split() @@ -1821,7 +1897,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): crawl_log_2 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_2) assert crawl_log_2[24:31] == b' 200 ' assert crawl_log_2[31:42] == b' 54 ' fields = crawl_log_2.split() @@ -1854,7 +1930,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert os.path.exists(file) crawl_log_3 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_3) assert crawl_log_3[24:31] == b' 200 ' assert crawl_log_3[31:42] == b' 0 ' fields = crawl_log_3.split() @@ -1894,7 +1970,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): assert os.path.exists(file) crawl_log_4 = open(file, 'rb').read() - assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4) + assert re.match(br'\A2[^\n]+\n\Z', crawl_log_4) assert crawl_log_4[24:31] == b' 204 ' assert crawl_log_4[31:42] == b' 38 ' fields = crawl_log_4.split() @@ -1976,6 +2052,10 @@ def test_socket_timeout_response( def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + url = 'http://localhost:%s/empty-response' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 @@ -1991,6 +2071,10 @@ def test_payload_digest(warcprox_, http_daemon): Tests that digest is of RFC2616 "entity body" (transfer-decoded but not content-decoded) ''' + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler): def __init__(self, url): self.path = url @@ -2224,6 +2308,23 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies): with pytest.raises(StopIteration): next(rec_iter) +def test_incomplete_read(http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + # see https://github.com/internetarchive/warcprox/pull/123 + url = 'http://localhost:%s/incomplete-read' % http_daemon.server_port + with pytest.raises(requests.exceptions.ChunkedEncodingError): + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + + # although `requests.get` raises exception here, other clients like + # browsers put up with the server misbehavior; warcprox does too, and will + # record the response verbatim in the warc; this `wait()` call tests + # that a warc record is written + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + if __name__ == '__main__': pytest.main() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 852d3fc..9cd09a8 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -78,6 +78,15 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) +class BadRequest(Exception): + ''' + Raised in case of a request deemed unacceptable by warcprox. + ''' + def __init__(self, msg): + self.msg = msg + def __str__(self): + return "%s: %s" % (self.__class__.__name__, self.msg) + class BasePostfetchProcessor(threading.Thread): logger = logging.getLogger("warcprox.BasePostfetchProcessor") diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 0d98270..ff2ad0a 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -71,7 +71,7 @@ class RethinkCaptures: "unexpected result saving batch of %s: %s " "entries" % (len(self._batch), result)) if result["replaced"] > 0 or result["unchanged"] > 0: - self.logger.warn( + self.logger.warning( "inserted=%s replaced=%s unchanged=%s in big " "captures table (normally replaced=0 and " "unchanged=0)", result["inserted"], @@ -148,7 +148,7 @@ class RethinkCaptures: recorded_url.payload_digest.digest() ).decode("utf-8") else: - self.logger.warn( + self.logger.warning( "digest type is %r but big captures table is indexed " "by sha1", recorded_url.payload_digest.name) @@ -157,8 +157,11 @@ class RethinkCaptures: sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") if (recorded_url.warcprox_meta - and "dedup-bucket" in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta["dedup-bucket"] + and "dedup-buckets" in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + # maybe this is the right thing to do here? or should we return an entry for each? or ? + break else: bucket = "__unspecified__" diff --git a/warcprox/controller.py b/warcprox/controller.py index fcdaa58..9a2880e 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -441,7 +441,12 @@ class WarcproxController(object): exc_info=True) pass finally: - self.shutdown() + try: + self.shutdown() + except: + self.logger.critical("graceful shutdown failed", exc_info=True) + self.logger.critical("killing myself -9") + os.kill(os.getpid(), 9) def _dump_profiling(self): import pstats, tempfile, os, io diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5e26062..9562fa5 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -34,6 +34,7 @@ import urllib3 from urllib3.exceptions import HTTPError import collections from concurrent import futures +from functools import lru_cache urllib3.disable_warnings() @@ -46,11 +47,11 @@ class DedupableMixin(object): def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. - When we use option --dedup-only-with-bucket, `dedup-bucket` is required + When we use option --dedup-only-with-bucket, `dedup-buckets` is required in Warcprox-Meta to perform dedup. Return Boolean. """ - if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: + if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta: return False if recorded_url.is_text(): return recorded_url.response_recorder.payload_size() > self.min_text_size @@ -68,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): and recorded_url.payload_digest and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = self.dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["dedup-bucket"], - recorded_url.url) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, bucket, recorded_url.url) + if recorded_url.dedup_info: + # we found an existing capture + break else: recorded_url.dedup_info = self.dedup_db.lookup( digest_key, url=recorded_url.url) @@ -147,10 +151,12 @@ class DedupDb(DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == "ro": + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) @@ -212,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save(digest_key, records[0], bucket=bucket) else: self.save(digest_key, records[0]) @@ -236,6 +244,7 @@ class CdxServerDedup(DedupDb): headers['Cookie'] = options.cdxserver_dedup_cookies self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, timeout=2.0, headers=headers) + self.cached_lookup = lru_cache(maxsize=1024)(self.lookup) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -296,7 +305,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) DedupableMixin.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=400) + self.pool = futures.ThreadPoolExecutor(max_workers=options.cdxserver_dedup_max_threads) self.batch = set() self.cdx_dedup = cdx_dedup @@ -315,7 +324,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) try: digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url) + dedup_info = self.cdx_dedup.cached_lookup(digest_key, recorded_url.url) + cache_info = self.cdx_dedup.cached_lookup.cache_info() + if (cache_info.hits + cache_info.misses) % 1000 == 0: + self.logger.info(self.cdx_dedup.cached_lookup.cache_info()) if dedup_info: recorded_url.dedup_info = dedup_info except ValueError as exc: @@ -342,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): and recorded_url.warc_records[0].type == b'response' and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) return buckets def _process_batch(self, batch): @@ -369,7 +382,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): except futures.TimeoutError as e: # the remaining threads actually keep running in this case, # there's no way to stop them, but that should be harmless - logging.warn( + logging.warning( 'timed out saving dedup info to trough', exc_info=True) class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): @@ -394,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): and recorded_url.payload_digest and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) else: discards.append( warcprox.digest_str( @@ -453,7 +466,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url.dedup_info = entry except Exception as e: # batch_lookup raised exception or something - logging.warn( + logging.warning( 'problem looking up dedup info for %s urls ' 'in bucket %s', len(buckets[bucket]), bucket, exc_info=True) @@ -469,7 +482,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): except futures.TimeoutError as e: # the remaining threads actually keep running in this case, # there's no way to stop them, but that should be harmless - self.logger.warn( + self.logger.warning( 'timed out loading dedup info from trough', exc_info=True) class TroughDedupDb(DedupDb, DedupableMixin): @@ -571,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta['dedup-bucket']) + if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index 8dab727..d61e6b1 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -30,6 +30,7 @@ except ImportError: import Queue as queue import logging +import logging.config import sys import hashlib import argparse @@ -39,6 +40,7 @@ import traceback import signal import threading import certauth.certauth +import yaml import warcprox import doublethink import cryptography.hazmat.backends.openssl @@ -168,6 +170,10 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): help=suppress( 'value of Cookie header to include in requests to the cdx ' 'server, when using --cdxserver-dedup')) + hidden.add_argument( + '--cdxserver-dedup-max-threads', dest='cdxserver_dedup_max_threads', + type=int, default=50, help=suppress( + 'maximum number of cdx server dedup threads')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) @@ -235,6 +241,9 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): arg_parser.add_argument( '--trace', dest='trace', action='store_true', help='very verbose logging') + arg_parser.add_argument( + '--logging-conf-file', dest='logging_conf_file', default=None, + help=('reads logging configuration from a YAML file')) arg_parser.add_argument( '--version', action='version', version="warcprox {}".format(warcprox.__version__)) @@ -255,7 +264,7 @@ def dump_state(signum=None, frame=None): except Exception as e: state_strs.append('' % e) - logging.warn( + logging.warning( 'dumping state (caught signal %s)\n%s', signum, '\n'.join(state_strs)) @@ -298,6 +307,11 @@ def main(argv=None): '%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) + if args.logging_conf_file: + with open(args.logging_conf_file, 'r') as fd: + conf = yaml.safe_load(fd) + logging.config.dictConfig(conf) + # see https://github.com/pyca/cryptography/issues/2911 cryptography.hazmat.backends.openssl.backend.activate_builtin_random() @@ -312,7 +326,11 @@ def main(argv=None): # SIGQUIT does not exist on some platforms (windows) pass - controller.run_until_shutdown() + try: + controller.run_until_shutdown() + except: + logging.fatal('unhandled exception in controller', exc_info=True) + sys.exit(1) def ensure_rethinkdb_tables(argv=None): ''' @@ -384,7 +402,7 @@ def ensure_rethinkdb_tables(argv=None): did_something = True if args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) - logging.warn( + logging.warning( 'trough is responsible for creating most of the rethinkdb ' 'tables that it uses') did_something = True diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 1fc0c72..d6a0593 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -35,6 +35,13 @@ try: import urllib.parse as urllib_parse except ImportError: import urlparse as urllib_parse +# In python2/3, urllib parse caches in memory URL parsing results to avoid +# repeating the process for the same URL. The problem is that the default +# in memory cache size is just 20. +# https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py#L80 +# since we do a lot of URL parsing, it makes sense to increase cache size. +urllib_parse.MAX_CACHE_SIZE = 2000 + try: import http.client as http_client # In python3 http.client.parse_headers() enforces http_client._MAXLINE @@ -45,6 +52,11 @@ try: http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client +# http_client has an arbitrary limit of 100 HTTP Headers which is too low and +# it raises an HTTPException if the target URL has more. +# https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L113 +http_client._MAXHEADERS = 7000 + import json import socket import logging @@ -64,8 +76,13 @@ import urlcanon import time import collections import cProfile +from urllib3 import PoolManager from urllib3.util import is_connection_dropped +from urllib3.exceptions import TimeoutError, HTTPError import doublethink +from cachetools import TTLCache +from threading import RLock +from certauth.certauth import CertificateAuthority class ProxyingRecorder(object): """ @@ -100,7 +117,7 @@ class ProxyingRecorder(object): self.proxy_client.sendall(hunk) except BaseException as e: self._proxy_client_conn_open = False - self.logger.warn( + self.logger.warning( '%s sending data to proxy client for url %s', e, self.url) self.logger.info( @@ -210,9 +227,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): and records the bytes in transit as it proxies them. ''' logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") + _socket_timeout = 60 _max_resource_size = None _tmp_file_max_memory_size = 512 * 1024 + onion_tor_socks_proxy_host = None + onion_tor_socks_proxy_port = None def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) @@ -228,7 +248,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.url = self.path u = urllib_parse.urlparse(self.url) - if u.scheme != 'http': + if u.scheme != 'http' or u.netloc == '': raise Exception( 'unable to parse request %r as a proxy request' % ( self.requestline)) @@ -240,6 +260,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): query=u.query, fragment=u.fragment)) self.hostname = urlcanon.normalize_host(host).decode('ascii') + def _hostname_port_cache_key(self): + return '%s:%s' % (self.hostname, self.port) + def _connect_to_remote_server(self): ''' Connect to destination. @@ -251,7 +274,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): ''' self._conn_pool = self.server.remote_connection_pool.connection_from_host( host=self.hostname, port=int(self.port), scheme='http', - pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout}) + pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout}) self._remote_server_conn = self._conn_pool._get_conn() if is_connection_dropped(self._remote_server_conn): @@ -283,7 +306,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_conn.sock = ssl.wrap_socket( self._remote_server_conn.sock) except ssl.SSLError: - self.logger.warn( + self.logger.warning( "failed to establish ssl connection to %s; " "python ssl library does not support SNI, " "consider upgrading to python 2.7.9+ or 3.4+", @@ -332,7 +355,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.send_error(500, str(e)) except Exception as f: - self.logger.warn("failed to send error response ({}) to proxy client: {}".format(e, f)) + self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f)) return # Reload! @@ -368,25 +391,55 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self._determine_host_port() assert self.url - + # Check if target hostname:port is in `bad_hostnames_ports` cache + # to avoid retrying to connect. Cached value is http status code. + cached = None + hostname_port = self._hostname_port_cache_key() + with self.server.bad_hostnames_ports_lock: + cached = self.server.bad_hostnames_ports.get(hostname_port) + if cached: + self.logger.info('Cannot connect to %s (cache)', hostname_port) + self.send_error(cached) + return # Connect to destination self._connect_to_remote_server() except warcprox.RequestBlockedByRule as e: # limit enforcers have already sent the appropriate response self.logger.info("%r: %r", self.requestline, e) return + except warcprox.BadRequest as e: + self.send_error(400, e.msg) + return except Exception as e: + # If connection fails, add hostname:port to cache to avoid slow + # subsequent reconnection attempts. `NewConnectionError` can be + # caused by many types of errors which are handled by urllib3. + response_code = 500 + cache = False + if isinstance(e, (socket.timeout, TimeoutError,)): + response_code = 504 + cache = True + elif isinstance(e, HTTPError): + response_code = 502 + cache = True + + if cache: + host_port = self._hostname_port_cache_key() + with self.server.bad_hostnames_ports_lock: + self.server.bad_hostnames_ports[host_port] = response_code + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) - self.send_error(500, str(e)) + self.send_error(response_code) return try: return self._proxy_request() except Exception as e: if self.server.shutting_down: - self.logger.warn( + self.logger.warning( 'sending 503 warcprox shutting down %r: %r', self.requestline, e) self.send_error(503, 'warcprox shutting down') @@ -394,7 +447,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( 'error from remote server(?) %r: %r', self.requestline, e, exc_info=True) - self.send_error(502, str(e)) + self.send_error(502) return def send_error(self, code, message=None, explain=None): @@ -410,9 +463,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): try: return http_server.BaseHTTPRequestHandler.send_error( self, code, message, explain) - except: - self.logger.error( - 'send_error(%r, %r, %r) raised exception', exc_info=True) + except Exception as e: + level = logging.ERROR + if isinstance(e, OSError) and e.errno == 9: + level = logging.TRACE + self.logger.log( + level, 'send_error(%r, %r, %r) raised exception', + exc_info=True) return None def _proxy_request(self, extra_response_headers={}): @@ -478,9 +535,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): tmp_file_max_memory_size=self._tmp_file_max_memory_size) prox_rec_res.begin(extra_response_headers=extra_response_headers) - buf = prox_rec_res.read(65536) + buf = None while buf != b'': - buf = prox_rec_res.read(65536) + try: + buf = prox_rec_res.read(65536) + except http_client.IncompleteRead as e: + self.logger.warn('%s from %s', e, self.url) + buf = e.partial + if (self._max_resource_size and prox_rec_res.recorder.len > self._max_resource_size): prox_rec_res.truncated = b'length' @@ -506,7 +568,19 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # put it back in the pool to reuse it later. if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) - except: + except Exception as e: + # A common error is to connect to the remote server successfully + # but raise a `RemoteDisconnected` exception when trying to begin + # downloading. Its caused by prox_rec_res.begin(...) which calls + # http_client._read_status(). In that case, the host is also bad + # and we must add it to `bad_hostnames_ports` cache. + if isinstance(e, http_client.RemoteDisconnected): + host_port = self._hostname_port_cache_key() + with self.server.bad_hostnames_ports_lock: + self.server.bad_hostnames_ports[host_port] = 502 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise @@ -521,7 +595,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return self.do_COMMAND def log_error(self, fmt, *args): - self.logger.warn(fmt, *args) + self.logger.warning(fmt, *args) class PooledMixIn(socketserver.ThreadingMixIn): logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") @@ -670,3 +744,52 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): for sock in self.remote_server_socks: self.shutdown_request(sock) +class SingleThreadedMitmProxy(http_server.HTTPServer): + logger = logging.getLogger('warcprox.warcproxy.SingleThreadedMitmProxy') + + def __init__( + self, MitmProxyHandlerClass=MitmProxyHandler, + options=warcprox.Options()): + self.options = options + + # TTLCache is not thread-safe. Access to the shared cache from multiple + # threads must be properly synchronized with an RLock according to ref: + # https://cachetools.readthedocs.io/en/latest/ + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = RLock() + + self.remote_connection_pool = PoolManager( + num_pools=max((options.max_threads or 0) // 6, 400)) + + if options.onion_tor_socks_proxy: + try: + host, port = options.onion_tor_socks_proxy.split(':') + MitmProxyHandlerClass.onion_tor_socks_proxy_host = host + MitmProxyHandlerClass.onion_tor_socks_proxy_port = int(port) + except ValueError: + MitmProxyHandlerClass.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy + MitmProxyHandlerClass.onion_tor_socks_proxy_port = None + + if options.socket_timeout: + MitmProxyHandlerClass._socket_timeout = options.socket_timeout + if options.max_resource_size: + MitmProxyHandlerClass._max_resource_size = options.max_resource_size + if options.tmp_file_max_memory_size: + MitmProxyHandlerClass._tmp_file_max_memory_size = options.tmp_file_max_memory_size + + self.digest_algorithm = options.digest_algorithm or 'sha1' + + ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] + self.ca = CertificateAuthority( + ca_file=options.cacert or 'warcprox-ca.pem', + certs_dir=options.certs_dir or './warcprox-ca', + ca_name=ca_name) + + server_address = ( + options.address or 'localhost', + options.port if options.port is not None else 8000) + + http_server.HTTPServer.__init__( + self, server_address, MitmProxyHandlerClass, + bind_and_activate=True) + diff --git a/warcprox/playback.py b/warcprox/playback.py index 91f86aa..8bfa42f 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -42,6 +42,7 @@ from warcprox.mitmproxy import MitmProxyHandler import warcprox import sqlite3 import threading +from cachetools import TTLCache class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -219,6 +220,8 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): self.playback_index_db = playback_index_db self.warcs_dir = options.directory self.options = options + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = threading.RLock() def server_activate(self): http_server.HTTPServer.server_activate(self) diff --git a/warcprox/stats.py b/warcprox/stats.py index 64ff2d7..1a71cad 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -81,7 +81,7 @@ def unravel_buckets(url, warcprox_meta): for bucket in warcprox_meta["stats"]["buckets"]: if isinstance(bucket, dict): if not 'bucket' in bucket: - self.logger.warn( + self.logger.warning( 'ignoring invalid stats bucket in ' 'warcprox-meta header %s', bucket) continue diff --git a/warcprox/trough.py b/warcprox/trough.py index b7db127..d0839d1 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -190,7 +190,7 @@ class TroughClient(object): return if response.status_code != 200: self._write_url_cache.pop(segment_id, None) - self.logger.warn( + self.logger.warning( 'unexpected response %r %r %r from %r to sql=%r', response.status_code, response.reason, response.text, write_url, sql) diff --git a/warcprox/warc.py b/warcprox/warc.py index 94fe137..1eceee2 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -125,48 +125,59 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) if content_type is not None: headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) - if payload_digest is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) # truncated value may be 'length' or 'time' if truncated is not None: headers.append((b'WARC-Truncated', truncated)) + if content_length is not None: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(content_length).encode('latin1'))) if recorder is not None: - if content_length is not None: - headers.append(( - warctools.WarcRecord.CONTENT_LENGTH, - str(content_length).encode('latin1'))) - else: + if payload_digest is not None: + headers.append( + (warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + if content_length is None: headers.append(( warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, warcprox.digest_str(recorder.block_digest, self.base32))) recorder.tempfile.seek(0) - record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) + record = warctools.WarcRecord( + headers=headers, content_file=recorder.tempfile) else: - if content_length is not None: - headers.append(( - warctools.WarcRecord.CONTENT_LENGTH, - str(content_length).encode('latin1'))) - else: + if content_length is None: headers.append(( warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) - # no http headers so block digest == payload digest - if not payload_digest: - payload_digest = warcprox.digest_str( + + block_digest = None + if not hasattr(data, 'read'): + block_digest = warcprox.digest_str( hashlib.new(self.digest_algorithm, data), self.base32) - headers.append(( - warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, payload_digest)) + + if not content_type.lower().startswith(b'application/http'): + # no http headers, so block digest == payload digest + if payload_digest and not block_digest: + block_digest = payload_digest + elif block_digest and not payload_digest: + payload_digest = block_digest + + if block_digest: + headers.append( + (warctools.WarcRecord.BLOCK_DIGEST, block_digest)) + if payload_digest: + headers.append( + (warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + if hasattr(data, 'read'): record = warctools.WarcRecord( headers=headers, content_file=data) else: content_tuple = content_type, data record = warctools.WarcRecord( - headers=headers, content=content_tuple) + headers=headers, content=(content_type, data)) return record diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index c0cdcb1..9d23244 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -38,15 +38,14 @@ import logging import json import socket from hanzo import warctools -from certauth.certauth import CertificateAuthority import warcprox import datetime import urlcanon import os -from urllib3 import PoolManager import tempfile import hashlib import doublethink +import re class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -167,7 +166,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): if warcprox_meta and 'warc-prefix' in warcprox_meta and ( '/' in warcprox_meta['warc-prefix'] or '\\' in warcprox_meta['warc-prefix']): - raise Exception( + raise warcprox.BadRequest( "request rejected by warcprox: slash and backslash are not " "permitted in warc-prefix") @@ -349,6 +348,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # logging better handled elsewhere? pass +RE_MIMETYPE = re.compile(r'[;\s]') class RecordedUrl: logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") @@ -377,8 +377,14 @@ class RecordedUrl: if warcprox_meta: if 'captures-bucket' in warcprox_meta: # backward compatibility - warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw' del warcprox_meta['captures-bucket'] + if 'dedup-bucket' in warcprox_meta: + # more backwards compatibility + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw' + del warcprox_meta['dedup-bucket'] self.warcprox_meta = warcprox_meta else: self.warcprox_meta = {} @@ -387,9 +393,8 @@ class RecordedUrl: self.mimetype = content_type if self.mimetype: - n = self.mimetype.find(";") - if n >= 0: - self.mimetype = self.mimetype[:n] + # chop off subtype, and ensure there's no whitespace + self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] self.custom_type = custom_type self.status = status @@ -420,51 +425,20 @@ class RecordedUrl: # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639 -class SingleThreadedWarcProxy(http_server.HTTPServer, object): +class SingleThreadedWarcProxy(warcprox.mitmproxy.SingleThreadedMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__( self, stats_db=None, status_callback=None, options=warcprox.Options()): self.start_time = doublethink.utcnow() + + warcprox.mitmproxy.SingleThreadedMitmProxy.__init__( + self, WarcProxyHandler, options) + self.status_callback = status_callback self.stats_db = stats_db - self.options = options - self.remote_connection_pool = PoolManager( - num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) - server_address = ( - options.address or 'localhost', - options.port if options.port is not None else 8000) - - if options.onion_tor_socks_proxy: - try: - host, port = options.onion_tor_socks_proxy.split(':') - WarcProxyHandler.onion_tor_socks_proxy_host = host - WarcProxyHandler.onion_tor_socks_proxy_port = int(port) - except ValueError: - WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy - WarcProxyHandler.onion_tor_socks_proxy_port = None - - if options.socket_timeout: - WarcProxyHandler._socket_timeout = options.socket_timeout - if options.max_resource_size: - WarcProxyHandler._max_resource_size = options.max_resource_size - if options.tmp_file_max_memory_size: - WarcProxyHandler._tmp_file_max_memory_size = options.tmp_file_max_memory_size - - http_server.HTTPServer.__init__( - self, server_address, WarcProxyHandler, bind_and_activate=True) - - self.digest_algorithm = options.digest_algorithm or 'sha1' - - ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] - self.ca = CertificateAuthority( - ca_file=options.cacert or 'warcprox-ca.pem', - certs_dir=options.certs_dir or './warcprox-ca', - ca_name=ca_name) - self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000) - self.running_stats = warcprox.stats.RunningStats() def status(self): @@ -530,6 +504,6 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): self.remote_connection_pool.clear() def handle_error(self, request, client_address): - self.logger.warn( + self.logger.warning( "exception processing request %s from %s", request, client_address, exc_info=True) diff --git a/warcprox/writer.py b/warcprox/writer.py index 96293db..fecb533 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -149,6 +149,7 @@ class WarcWriter: record.get_header(b'WARC-Payload-Digest'), record.offset, self.path, record.get_header(warctools.WarcRecord.URL)) self.f.flush() + self.last_activity = time.time() return records