diff --git a/.travis.yml b/.travis.yml index c427b37..0ad15d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,10 +50,10 @@ before_script: - docker ps script: -- py.test -v tests -- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests -- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests -- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests +- py.test -v --tb=native tests +- py.test -v --tb=native --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests +- py.test -v --tb=native --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests +- py.test -v --tb=native --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests after_script: - ps ww -fHe diff --git a/README.rst b/README.rst index dbb1440..b7b5c17 100644 --- a/README.rst +++ b/README.rst @@ -3,22 +3,19 @@ Warcprox - WARC writing MITM HTTP/S proxy .. image:: https://travis-ci.org/internetarchive/warcprox.svg?branch=master :target: https://travis-ci.org/internetarchive/warcprox -Warcprox is a tool for archiving the web. It is an http proxy that stores its -traffic to disk in `WARC -`_ -format. Warcprox captures encrypted https traffic by using the -`"man-in-the-middle" `_ -technique (see the `Man-in-the-middle`_ section for more info). +Warcprox is an HTTP proxy designed for web archiving applications. When used in +parallel with `brozzler `_ it +supports a comprehensive, modern, and distributed archival web capture system. +Warcprox stores its traffic to disk in the `Web ARChive (WARC) file format +`_, +which may then be accessed with web archival replay software like `OpenWayback +`_ and `pywb +`_. It captures encrypted HTTPS traffic by +using the "man-in-the-middle" technique (see the `Man-in-the-middle`_ section +for more info). -The web pages that warcprox stores in WARC files can be played back using -software like `OpenWayback `_ or `pywb -`_. Warcprox has been developed in -parallel with `brozzler `_ and -together they make a comprehensive modern distributed archival web crawling -system. - -Warcprox was originally based on the excellent and simple pymiproxy by Nadeem -Douba. https://github.com/allfro/pymiproxy +Warcprox was originally based on `pymiproxy +`_ by Nadeem Douba. .. contents:: @@ -43,68 +40,72 @@ Try ``warcprox --help`` for documentation on command line options. Man-in-the-middle ================= -Normally, http proxies can't read https traffic, because it's encrypted. The -browser uses the http ``CONNECT`` method to establish a tunnel through the -proxy, and the proxy merely routes raw bytes between the client and server. -Since the bytes are encrypted, the proxy can't make sense of the information -it's proxying. This nonsensical encrypted data would not be very useful to -archive. +Normally, HTTP proxies can't read encrypted HTTPS traffic. The browser uses the +HTTP ``CONNECT`` method to establish a tunnel through the proxy, and the proxy +merely routes raw bytes between the client and server. Since the bytes are +encrypted, the proxy can't make sense of the information that it proxies. This +nonsensical encrypted data is not typically useful for web archiving purposes. -In order to capture https traffic, warcprox acts as a "man-in-the-middle" +In order to capture HTTPS traffic, warcprox acts as a "man-in-the-middle" (MITM). When it receives a ``CONNECT`` directive from a client, it generates a public key certificate for the requested site, presents to the client, and -proceeds to establish an encrypted connection with the client. Then it makes a -separate, normal https connection to the remote site. It decrypts, archives, +proceeds to establish an encrypted connection with the client. It then makes a +separate, normal HTTPS connection to the remote site. It decrypts, archives, and re-encrypts traffic in both directions. -Although "man-in-the-middle" is often paired with "attack", there is nothing -malicious about what warcprox is doing. If you configure an instance of -warcprox as your browser's http proxy, you will see lots of certificate -warnings, since none of the certificates will be signed by trusted authorities. -To use warcprox effectively the client needs to disable certificate -verification, or add the CA cert generated by warcprox as a trusted authority. -(If you do this in your browser, make sure you undo it when you're done using -warcprox!) +Configuring a warcprox instance as a browser’s HTTP proxy will result in +security certificate warnings because none of the certificates will be signed +by trusted authorities. However, there is nothing malicious about warcprox +functions. To use warcprox effectively, the client needs to disable certificate +verification or add the CA certificate generated by warcprox as a trusted +authority. When using the latter, remember to undo this change when finished +using warcprox. API === -For interacting with a running instance of warcprox. +The warcprox API may be used to retrieve information from and interact with a +running warcprox instance, including: -* ``/status`` url -* ``WARCPROX_WRITE_RECORD`` http method -* ``Warcprox-Meta`` http request header and response header +* Retrieving status information via ``/status`` URL +* Writing WARC records via ``WARCPROX_WRITE_RECORD`` HTTP method +* Controlling warcprox settings via the ``Warcprox-Meta`` HTTP header -See ``_. +For warcprox API documentation, see: ``_. Deduplication ============= Warcprox avoids archiving redundant content by "deduplicating" it. The process -for deduplication works similarly to heritrix and other web archiving tools. +for deduplication works similarly to deduplication by `Heritrix +`_ and other web archiving tools: -1. while fetching url, calculate payload content digest (typically sha1) -2. look up digest in deduplication database (warcprox supports a few different - ones) -3. if found, write warc ``revisit`` record referencing the url and capture time +1. While fetching URL, calculate payload content digest (typically SHA1 + checksum value) +2. Look up digest in deduplication database (warcprox currently supports + `sqlite `_ by default, `rethinkdb + `_ with two different schemas, and + `trough `_) +3. If found, write warc ``revisit`` record referencing the url and capture time of the previous capture -4. else (if not found), +4. If not found, - a. write warc ``response`` record with full payload - b. store entry in deduplication database + a. Write ``response`` record with full payload + b. Store new entry in deduplication database -The dedup database is partitioned into different "buckets". Urls are +The deduplication database is partitioned into different "buckets". URLs are deduplicated only against other captures in the same bucket. If specified, the -``dedup-bucket`` field of the ``Warcprox-Meta`` http request header determines -the bucket, otherwise the default bucket is used. +``dedup-bucket`` field of the `Warcprox-Meta HTTP request header +`_ determines the bucket. Otherwise, +the default bucket is used. Deduplication can be disabled entirely by starting warcprox with the argument ``--dedup-db-file=/dev/null``. Statistics ========== -Warcprox keeps some crawl statistics and stores them in sqlite or rethinkdb. -These are consulted for enforcing ``limits`` and ``soft-limits`` (see -``_), and can also be consulted by other -processes outside of warcprox, for reporting etc. +Warcprox stores some crawl statistics to sqlite or rethinkdb. These are +consulted for enforcing ``limits`` and ``soft-limits`` (see `Warcprox-Meta +fields `_), and can also be consulted by other +processes outside of warcprox, such as for crawl job reporting. Statistics are grouped by "bucket". Every capture is counted as part of the ``__all__`` bucket. Other buckets can be specified in the ``Warcprox-Meta`` @@ -113,21 +114,20 @@ request header. The fallback bucket in case none is specified is called Within each bucket are three sub-buckets: -* ``new`` - tallies captures for which a complete record (usually a ``response`` - record) was written to warc +* ``new`` - tallies captures for which a complete record (usually a + ``response`` record) was written to a WARC file * ``revisit`` - tallies captures for which a ``revisit`` record was written to - warc -* ``total`` - includes all urls processed, even those not written to warc (so the - numbers may be greater than new + revisit) + a WARC file +* ``total`` - includes all URLs processed, even those not written to a WARC + file, and so may be greater than the sum of new and revisit records -Within each of these sub-buckets we keep two statistics: +Within each of these sub-buckets, warcprox generates two kinds of statistics: -* ``urls`` - simple count of urls -* ``wire_bytes`` - sum of bytes received over the wire, including http headers, - from the remote server for each url +* ``urls`` - simple count of URLs +* ``wire_bytes`` - sum of bytes received over the wire from the remote server + for each URL, including HTTP headers -For historical reasons, in sqlite, the default store, statistics are kept as -json blobs:: +For historical reasons, the default sqlite store keeps statistics as JSON blobs:: sqlite> select * from buckets_of_stats; bucket stats @@ -139,14 +139,37 @@ Plugins ======= Warcprox supports a limited notion of plugins by way of the ``--plugin`` command line argument. Plugin classes are loaded from the regular python module -search path. They will be instantiated with one argument, a -``warcprox.Options``, which holds the values of all the command line arguments. -Legacy plugins with constructors that take no arguments are also supported. -Plugins should either have a method ``notify(self, recorded_url, records)`` or -should subclass ``warcprox.BasePostfetchProcessor``. More than one plugin can -be configured by specifying ``--plugin`` multiples times. +search path. They are instantiated with one argument that contains the values +of all command line arguments, ``warcprox.Options``. Legacy plugins with +constructors that take no arguments are also supported. Plugins should either +have a method ``notify(self, recorded_url, records)`` or should subclass +``warcprox.BasePostfetchProcessor``. More than one plugin can be configured by +specifying ``--plugin`` multiples times. -`A minimal example `__ +See a minimal example `here +`__. + +Architecture +============ +.. image:: arch.svg + +Warcprox is multithreaded. It has pool of http proxy threads (100 by default). +When handling a request, a proxy thread records data from the remote server to +an in-memory buffer that spills over to disk if necessary (after 512k by +default), while it streams the data to the proxy client. Once the HTTP +transaction is complete, it puts the recorded URL in a thread-safe queue, to be +picked up by the first processor in the postfetch chain. + +The postfetch chain normally includes processors for loading deduplication +information, writing records to the WARC, saving deduplication information, and +updating statistics. The exact set of processors in the chain depends on +command line arguments; for example, plugins specified with ``--plugin`` are +processors in the postfetch chain. Each postfetch processor has its own thread +or threads. Thus the processors are able to run in parallel, independent of one +another. This design also enables them to process URLs in batch. For example, +the statistics processor gathers statistics for up to 10 seconds or 500 URLs, +whichever comes first, then updates the statistics database with just a few +queries. License ======= diff --git a/arch.svg b/arch.svg new file mode 100644 index 0000000..7684014 --- /dev/null +++ b/arch.svg @@ -0,0 +1,433 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index 4491a8b..f273e96 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -194,7 +194,7 @@ if __name__ == '__main__': args = arg_parser.parse_args(args=sys.argv[1:]) if args.trace: - loglevel = warcprox.TRACE + loglevel = logging.TRACE elif args.verbose: loglevel = logging.DEBUG else: diff --git a/setup.py b/setup.py index 1707f1f..cd5da0c 100755 --- a/setup.py +++ b/setup.py @@ -25,13 +25,13 @@ import setuptools deps = [ 'certauth==1.1.6', - 'warctools', + 'warctools>=4.10.0', 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev87', - 'urllib3', + 'urllib3>=1.23', 'requests>=2.0.1', - 'PySocks', - 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu + 'PySocks>=1.6.8', + 'cryptography>=2.3', ] try: import concurrent.futures @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b3.dev180', + version='2.4b3.dev184', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_ensure_rethinkdb_tables.py b/tests/test_ensure_rethinkdb_tables.py index 030cddb..f0649f4 100644 --- a/tests/test_ensure_rethinkdb_tables.py +++ b/tests/test_ensure_rethinkdb_tables.py @@ -30,7 +30,7 @@ import logging import sys logging.basicConfig( - stream=sys.stdout, level=warcprox.TRACE, + stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 3511fe2..cdc68eb 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -90,8 +90,7 @@ def _send(self, data): # http_client.HTTPConnection.send = _send logging.basicConfig( - # stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, - stream=sys.stdout, level=warcprox.TRACE, + stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -1718,8 +1717,14 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies): def test_crawl_log(warcprox_, http_daemon, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls + hostname = socket.gethostname().split('.', 1)[0] + port = warcprox_.proxy.server_port + default_crawl_log_path = os.path.join( + warcprox_.options.crawl_log_dir, + 'crawl-%s-%s.log' % (hostname, port)) + try: - os.unlink(os.path.join(warcprox_.options.crawl_log_dir, 'crawl.log')) + os.unlink(default_crawl_log_path) except: pass @@ -1740,14 +1745,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) - file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log') + file = os.path.join( + warcprox_.options.crawl_log_dir, + 'test_crawl_log_1-%s-%s.log' % (hostname, port)) assert os.path.exists(file) assert os.stat(file).st_size > 0 - assert os.path.exists(os.path.join( - warcprox_.options.crawl_log_dir, 'crawl.log')) + assert os.path.exists(default_crawl_log_path) - crawl_log = open(os.path.join( - warcprox_.options.crawl_log_dir, 'crawl.log'), 'rb').read() + crawl_log = open(default_crawl_log_path, 'rb').read() # tests will fail in year 3000 :) assert re.match(b'\A2[^\n]+\n\Z', crawl_log) assert crawl_log[24:31] == b' 200 ' @@ -1768,8 +1773,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): 'contentSize', 'warcFilename', 'warcFileOffset'} assert extra_info['contentSize'] == 145 - crawl_log_1 = open(os.path.join( - warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log'), 'rb').read() + crawl_log_1 = open(file, 'rb').read() assert re.match(b'\A2[^\n]+\n\Z', crawl_log_1) assert crawl_log_1[24:31] == b' 200 ' assert crawl_log_1[31:42] == b' 54 ' @@ -1800,7 +1804,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) - file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log') + file = os.path.join( + warcprox_.options.crawl_log_dir, + 'test_crawl_log_2-%s-%s.log' % (hostname, port)) assert os.path.exists(file) assert os.stat(file).st_size > 0 @@ -1833,7 +1839,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) - file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log') + file = os.path.join( + warcprox_.options.crawl_log_dir, + 'test_crawl_log_3-%s-%s.log' % (hostname, port)) assert os.path.exists(file) crawl_log_3 = open(file, 'rb').read() @@ -1871,7 +1879,9 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 5) - file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log') + file = os.path.join( + warcprox_.options.crawl_log_dir, + 'test_crawl_log_4-%s-%s.log' % (hostname, port)) assert os.path.exists(file) crawl_log_4 = open(file, 'rb').read() diff --git a/tests/test_writer.py b/tests/test_writer.py index 126932a..ed5c699 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -22,7 +22,7 @@ USA. import os import fcntl from multiprocessing import Process, Queue -from datetime import datetime +from datetime import datetime, timedelta import pytest import re from warcprox.mitmproxy import ProxyingRecorder @@ -34,6 +34,7 @@ import warcprox import io import tempfile import logging +import hashlib def lock_file(queue, filename): """Try to lock file and return 1 if successful, else return 0. @@ -58,7 +59,7 @@ def test_warc_writer_locking(tmpdir): url='http://example.com', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', - timestamp=datetime.utcnow()) + timestamp=datetime.utcnow(), payload_digest=hashlib.sha1()) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options( @@ -129,7 +130,7 @@ def test_special_dont_write_prefix(): wwt.join() wwt = warcprox.writerthread.WarcWriterProcessor( - Options(writer_threads=1)) + Options(writer_threads=1, blackout_period=60, prefix='foo')) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -158,6 +159,41 @@ def test_special_dont_write_prefix(): recorded_url = wwt.outq.get(timeout=10) assert not recorded_url.warc_records assert wwt.outq.empty() + + # test blackout_period option. Write first revisit record because + # its outside the blackout_period (60). Do not write the second + # because its inside the blackout_period. + recorder = ProxyingRecorder(io.BytesIO(b'test1'), None) + recorder.read() + old = datetime.utcnow() - timedelta(0, 3600) + ru = RecordedUrl( + url='http://example.com/dup', + content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest) + ru.dedup_info = dict(id=b'1', url=b'http://example.com/dup', + date=old.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')) + wwt.inq.put(ru) + recorded_url = wwt.outq.get(timeout=10) + recorder = ProxyingRecorder(io.BytesIO(b'test2'), None) + recorder.read() + recent = datetime.utcnow() - timedelta(0, 5) + ru = RecordedUrl( + url='http://example.com/dup', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest) + ru.dedup_info = dict(id=b'2', url=b'http://example.com/dup', + date=recent.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')) + wwt.inq.put(ru) + assert recorded_url.warc_records + recorded_url = wwt.outq.get(timeout=10) + assert not recorded_url.warc_records + assert wwt.outq.empty() + finally: wwt.stop.set() wwt.join() @@ -212,7 +248,7 @@ def test_warc_writer_filename(tmpdir): url='http://example.com', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', - timestamp=datetime.utcnow()) + timestamp=datetime.utcnow(), payload_digest=hashlib.sha1()) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 2dcc838..67cf654 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -266,21 +266,21 @@ def timestamp14(): return '{:%Y%m%d%H%M%S}'.format(now) # monkey-patch log levels TRACE and NOTICE -TRACE = 5 +logging.TRACE = (logging.NOTSET + logging.DEBUG) // 2 def _logger_trace(self, msg, *args, **kwargs): - if self.isEnabledFor(TRACE): - self._log(TRACE, msg, args, **kwargs) + if self.isEnabledFor(logging.TRACE): + self._log(logging.TRACE, msg, args, **kwargs) logging.Logger.trace = _logger_trace logging.trace = logging.root.trace -logging.addLevelName(TRACE, 'TRACE') +logging.addLevelName(logging.TRACE, 'TRACE') -NOTICE = (logging.INFO + logging.WARN) // 2 +logging.NOTICE = (logging.INFO + logging.WARN) // 2 def _logger_notice(self, msg, *args, **kwargs): - if self.isEnabledFor(NOTICE): - self._log(NOTICE, msg, args, **kwargs) + if self.isEnabledFor(logging.NOTICE): + self._log(logging.NOTICE, msg, args, **kwargs) logging.Logger.notice = _logger_notice logging.notice = logging.root.notice -logging.addLevelName(NOTICE, 'NOTICE') +logging.addLevelName(logging.NOTICE, 'NOTICE') import warcprox.controller as controller import warcprox.playback as playback diff --git a/warcprox/controller.py b/warcprox/controller.py index e89ecbb..9d20e71 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -299,9 +299,7 @@ class WarcproxController(object): status_info.update(self.proxy.status()) self.status_info = self.service_registry.heartbeat(status_info) - self.logger.log( - warcprox.TRACE, "status in service registry: %s", - self.status_info) + self.logger.trace('status in service registry: %s', self.status_info) def start(self): with self._start_stop_lock: diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index a953402..2f7ea5e 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -24,11 +24,15 @@ import datetime import json import os import warcprox +import socket class CrawlLogger(object): def __init__(self, dir_, options=warcprox.Options()): self.dir = dir_ self.options = options + self.hostname = socket.gethostname().split('.', 1)[0] + + def start(self): if not os.path.exists(self.dir): logging.info('creating directory %r', self.dir) os.mkdir(self.dir) @@ -49,7 +53,7 @@ class CrawlLogger(object): self.options.base32) else: # WARCPROX_WRITE_RECORD request - content_length = len(recorded_url.request_data) + content_length = int(records[0].get_header(b'Content-Length')) payload_digest = records[0].get_header(b'WARC-Payload-Digest') fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), @@ -77,12 +81,11 @@ class CrawlLogger(object): pass line = b' '.join(fields) + b'\n' - if 'warc-prefix' in recorded_url.warcprox_meta: - filename = '%s.log' % recorded_url.warcprox_meta['warc-prefix'] - else: - filename = 'crawl.log' - + prefix = recorded_url.warcprox_meta.get('warc-prefix', 'crawl') + filename = '%s-%s-%s.log' % ( + prefix, self.hostname, self.options.server_port) crawl_log_path = os.path.join(self.dir, filename) + with open(crawl_log_path, 'ab') as f: f.write(line) diff --git a/warcprox/main.py b/warcprox/main.py index 6fb46ef..bf8d11e 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -60,10 +60,23 @@ class BetterArgumentDefaultsHelpFormatter( else: return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) -def _build_arg_parser(prog='warcprox'): +def _build_arg_parser(prog='warcprox', show_hidden=False): + if show_hidden: + def suppress(msg): + return msg + else: + def suppress(msg): + return argparse.SUPPRESS + arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=BetterArgumentDefaultsHelpFormatter) + + hidden = arg_parser.add_argument_group('hidden options') + arg_parser.add_argument( + '--help-hidden', action='help', default=argparse.SUPPRESS, + help='show help message, including help on hidden options, and exit') + arg_parser.add_argument('-p', '--port', dest='port', default='8000', type=int, help='port to listen on') arg_parser.add_argument('-b', '--address', dest='address', @@ -81,8 +94,12 @@ def _build_arg_parser(prog='warcprox'): help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') - arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', - default=False, action='store_true', help=argparse.SUPPRESS) + hidden.add_argument( + '--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', + help=suppress( + 'do not name warc files with suffix ".open" while writing to ' + 'them, but lock them with lockf(3) intead')) # not mentioned in --help: special value for '-' for --prefix means don't # archive the capture, unless prefix set in warcprox-meta header arg_parser.add_argument( @@ -146,40 +163,60 @@ def _build_arg_parser(prog='warcprox'): 'rethinkdb service registry table url; if provided, warcprox ' 'will create and heartbeat entry for itself')) # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" - arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', - help=argparse.SUPPRESS) + hidden.add_argument( + '--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', + help=suppress( + 'value of Cookie header to include in requests to the cdx ' + 'server, when using --cdxserver-dedup')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', type=int, default=0, help=( 'try to dedup binary resources with payload size over this limit in bytes')) - # optionally, dedup request only when `dedup-bucket` is available in - # Warcprox-Meta HTTP header. By default, we dedup all requests. - arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', - action='store_true', default=False, help=argparse.SUPPRESS) - arg_parser.add_argument('--queue-size', dest='queue_size', type=int, - default=500, help=argparse.SUPPRESS) - arg_parser.add_argument('--max-threads', dest='max_threads', type=int, - help=argparse.SUPPRESS) - arg_parser.add_argument('--profile', action='store_true', default=False, - help=argparse.SUPPRESS) - arg_parser.add_argument( - '--writer-threads', dest='writer_threads', type=int, default=None, - help=argparse.SUPPRESS) + hidden.add_argument( + '--dedup-only-with-bucket', dest='dedup_only_with_bucket', + action='store_true', default=False, help=suppress( + 'only deduplicate captures if "dedup-bucket" is set in ' + 'the Warcprox-Meta request header')) + arg_parser.add_argument('--blackout-period', dest='blackout_period', + type=int, default=0, + help='skip writing a revisit record if its too close to the original capture') + hidden.add_argument( + '--queue-size', dest='queue_size', type=int, default=500, + help=suppress( + 'maximum number of urls that can be queued at each ' + 'step of the processing chain (see the section on warcprox ' + 'architecture in README.rst)')) + hidden.add_argument( + '--max-threads', dest='max_threads', type=int, default=100, + help=suppress('maximum number of http worker threads')) + hidden.add_argument( + '--profile', action='store_true', default=False, + help=suppress( + 'turn on performance profiling; summary statistics are dumped ' + 'every 10 minutes and at shutdown')) + hidden.add_argument( + '--writer-threads', dest='writer_threads', type=int, default=1, + help=suppress( + 'number of warc writer threads; caution, see ' + 'https://github.com/internetarchive/warcprox/issues/101')) arg_parser.add_argument( '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', default=None, help=( 'host:port of tor socks proxy, used only to connect to ' '.onion sites')) - # Configurable connection socket timeout, default is 60 sec. - arg_parser.add_argument( - '--socket-timeout', dest='socket_timeout', type=float, - default=None, help=argparse.SUPPRESS) + hidden.add_argument( + '--socket-timeout', dest='socket_timeout', type=float, default=60, + help=suppress( + 'socket timeout, used for proxy client connection and for ' + 'connection to remote server')) # Increasing this value increases memory usage but reduces /tmp disk I/O. - arg_parser.add_argument( + hidden.add_argument( '--tmp-file-max-memory-size', dest='tmp_file_max_memory_size', - type=int, default=512*1024, help=argparse.SUPPRESS) + type=int, default=512*1024, help=suppress( + 'size of in-memory buffer for each url being processed ' + '(spills over to temp space on disk if exceeded)')) arg_parser.add_argument( '--max-resource-size', dest='max_resource_size', type=int, default=None, help='maximum resource size limit in bytes') @@ -194,11 +231,18 @@ def _build_arg_parser(prog='warcprox'): 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' 'May be used multiple times to register multiple plugins. ' 'See README.rst for more information.')) - arg_parser.add_argument('--version', action='version', + arg_parser.add_argument( + '-q', '--quiet', dest='quiet', action='store_true', + help='less verbose logging') + arg_parser.add_argument( + '-v', '--verbose', dest='verbose', action='store_true', + help='verbose logging') + arg_parser.add_argument( + '--trace', dest='trace', action='store_true', + help='very verbose logging') + arg_parser.add_argument( + '--version', action='version', version="warcprox {}".format(warcprox.__version__)) - arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') - arg_parser.add_argument('--trace', dest='trace', action='store_true') - arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') return arg_parser @@ -224,7 +268,11 @@ def parse_args(argv): ''' Parses command line arguments with argparse. ''' - arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) + show_hidden = False + if '--help-hidden' in argv: + show_hidden = True + argv = [argv[0], '--help-hidden'] + arg_parser = _build_arg_parser(os.path.basename(argv[0]), show_hidden) args = arg_parser.parse_args(args=argv[1:]) try: @@ -242,11 +290,11 @@ def main(argv=None): args = parse_args(argv or sys.argv) if args.trace: - loglevel = warcprox.TRACE + loglevel = logging.TRACE elif args.verbose: loglevel = logging.DEBUG elif args.quiet: - loglevel = logging.WARNING + loglevel = logging.NOTICE else: loglevel = logging.INFO diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index e01f15e..4153e54 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -250,7 +250,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): ''' self._conn_pool = self.server.remote_connection_pool.connection_from_host( host=self.hostname, port=int(self.port), scheme='http', - pool_kwargs={'maxsize': 6}) + pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout}) self._remote_server_conn = self._conn_pool._get_conn() if is_connection_dropped(self._remote_server_conn): @@ -263,10 +263,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_conn.sock.set_proxy( socks.SOCKS5, addr=self.onion_tor_socks_proxy_host, port=self.onion_tor_socks_proxy_port, rdns=True) - self._remote_server_conn.timeout = self._socket_timeout + self._remote_server_conn.sock.settimeout(self._socket_timeout) self._remote_server_conn.sock.connect((self.hostname, int(self.port))) else: - self._remote_server_conn.timeout = self._socket_timeout self._remote_server_conn.connect() # Wrap socket if SSL is required @@ -276,16 +275,17 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): context.check_hostname = False context.verify_mode = ssl.CERT_NONE self._remote_server_conn.sock = context.wrap_socket( - self._remote_server_conn.sock, server_hostname=self.hostname) + self._remote_server_conn.sock, + server_hostname=self.hostname) except AttributeError: try: self._remote_server_conn.sock = ssl.wrap_socket( self._remote_server_conn.sock) except ssl.SSLError: self.logger.warn( - "failed to establish ssl connection to %s; python " - "ssl library does not support SNI, considering " - "upgrading to python >= 2.7.9 or python 3.4", + "failed to establish ssl connection to %s; " + "python ssl library does not support SNI, " + "consider upgrading to python 2.7.9+ or 3.4+", self.hostname) raise return self._remote_server_conn.sock @@ -424,8 +424,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.command, self.path, self.request_version) # Swallow headers that don't make sense to forward on, i.e. most - # hop-by-hop headers, see - # http://tools.ietf.org/html/rfc2616#section-13.5. + # hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5. # self.headers is an email.message.Message, which is case-insensitive # and doesn't throw KeyError in __delitem__ for key in ( @@ -503,10 +502,7 @@ class PooledMixIn(socketserver.ThreadingMixIn): def __init__(self, max_threads=None): self.active_requests = set() self.unaccepted_requests = 0 - if max_threads: - self.max_threads = max_threads - else: - self.max_threads = 100 + self.max_threads = max_threads or 100 self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads) self.logger.info("%s proxy threads", self.max_threads) @@ -596,11 +592,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): request_queue_size = 4096 def __init__(self, options=warcprox.Options()): - if options.max_threads: - self.logger.info( - 'max_threads=%s set by command line option', - options.max_threads) - PooledMixIn.__init__(self, options.max_threads) self.profilers = collections.defaultdict(cProfile.Profile) diff --git a/warcprox/warc.py b/warcprox/warc.py index 708366b..21d0f5d 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -19,8 +19,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ''' -from __future__ import absolute_import - import logging import warcprox import hashlib @@ -83,16 +81,21 @@ class WarcRecordBuilder: concurrent_to=principal_record.id) return principal_record, request_record else: - principal_record = self.build_warc_record(url=recorded_url.url, + principal_record = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, data=recorded_url.request_data, warc_type=recorded_url.custom_type, - content_type=recorded_url.content_type.encode("latin1")) + content_type=recorded_url.content_type.encode("latin1"), + payload_digest=warcprox.digest_str( + recorded_url.payload_digest, self.base32), + content_length=recorded_url.size) return (principal_record,) def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, profile=None, refers_to=None, refers_to_target_uri=None, - refers_to_date=None, payload_digest=None, truncated=None): + refers_to_date=None, payload_digest=None, truncated=None, + content_length=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow()) @@ -126,21 +129,41 @@ class WarcRecordBuilder: headers.append((b'WARC-Truncated', truncated)) if recorder is not None: - headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) + if content_length is not None: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(content_length).encode('latin1'))) + else: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(len(recorder)).encode('latin1'))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, warcprox.digest_str(recorder.block_digest, self.base32))) recorder.tempfile.seek(0) record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) else: - headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) - digest = hashlib.new(self.digest_algorithm, data) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, - warcprox.digest_str(digest, self.base32))) + if content_length is not None: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(content_length).encode('latin1'))) + else: + headers.append(( + warctools.WarcRecord.CONTENT_LENGTH, + str(len(data)).encode('latin1'))) + # no http headers so block digest == payload digest if not payload_digest: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, - warcprox.digest_str(digest, self.base32))) - content_tuple = content_type, data - record = warctools.WarcRecord(headers=headers, content=content_tuple) + payload_digest = warcprox.digest_str( + hashlib.new(self.digest_algorithm, data), self.base32) + headers.append(( + warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, payload_digest)) + if hasattr(data, 'read'): + record = warctools.WarcRecord( + headers=headers, content_file=data) + else: + content_tuple = content_type, data + record = warctools.WarcRecord( + headers=headers, content=content_tuple) return record diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 417f450..cfe2314 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -44,6 +44,8 @@ import datetime import urlcanon import os from urllib3 import PoolManager +import tempfile +import hashlib class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -285,8 +287,18 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): and (warc_type or 'WARC-Type' in self.headers)): timestamp = datetime.datetime.utcnow() - # stream this? - request_data = self.rfile.read(int(self.headers['Content-Length'])) + request_data = tempfile.SpooledTemporaryFile( + max_size=self._tmp_file_max_memory_size) + payload_digest = hashlib.new(self.server.digest_algorithm) + + # XXX we don't support chunked uploads for now + length = int(self.headers['Content-Length']) + buf = self.rfile.read(min(65536, length - request_data.tell())) + while buf != b'': + request_data.write(buf) + payload_digest.update(buf) + buf = self.rfile.read( + min(65536, length - request_data.tell())) warcprox_meta = None raw_warcprox_meta = self.headers.get('Warcprox-Meta') @@ -301,11 +313,14 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta=warcprox_meta, content_type=self.headers['Content-Type'], custom_type=warc_type or self.headers['WARC-Type'].encode('utf-8'), - status=204, size=len(request_data), + status=204, + size=request_data.tell(), client_ip=self.client_address[0], method=self.command, timestamp=timestamp, - duration=datetime.datetime.utcnow()-timestamp) + duration=datetime.datetime.utcnow()-timestamp, + payload_digest=payload_digest) + request_data.seek(0) self.server.recorded_url_q.put(rec_custom) self.send_response(204, 'OK') @@ -492,12 +507,15 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): def server_activate(self): http_server.HTTPServer.server_activate(self) - self.logger.info( + self.logger.notice( 'listening on %s:%s', self.server_address[0], self.server_address[1]) + # take note of actual port in case running with --port=0 so that other + # parts of warcprox have easy access to it + self.options.server_port = self.server_address[1] def server_close(self): - self.logger.info('shutting down') + self.logger.notice('shutting down') http_server.HTTPServer.server_close(self) self.remote_connection_pool.clear() diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 9d85b6f..75aeb05 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -31,6 +31,7 @@ import logging import time import warcprox from concurrent import futures +from datetime import datetime import threading class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): @@ -52,6 +53,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): max_workers=options.writer_threads or 1, max_queued=10 * (options.writer_threads or 1)) self.batch = set() + self.blackout_period = options.blackout_period or 0 def _startup(self): self.logger.info('%s warc writer threads', self.pool._max_workers) @@ -114,7 +116,26 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): else self.options.prefix) # special warc name prefix '-' means "don't archive" return (prefix != '-' and not recorded_url.do_not_archive - and self._filter_accepts(recorded_url)) + and self._filter_accepts(recorded_url) + and not self._in_blackout(recorded_url)) + + def _in_blackout(self, recorded_url): + """If --blackout-period=N (sec) is set, check if duplicate record + datetime is close to the original. If yes, we don't write it to WARC. + The aim is to avoid having unnecessary `revisit` records. + Return Boolean + """ + if self.blackout_period and hasattr(recorded_url, "dedup_info") and \ + recorded_url.dedup_info: + dedup_date = recorded_url.dedup_info.get('date') + if dedup_date and recorded_url.dedup_info.get('url') == recorded_url.url: + try: + dt = datetime.strptime(dedup_date.decode('utf-8'), + '%Y-%m-%dT%H:%M:%SZ') + return (datetime.utcnow() - dt).total_seconds() <= self.blackout_period + except ValueError: + return False + return False def _log(self, recorded_url, records): # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}