diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index f595f8b..a05db59 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. arg_parser = argparse.ArgumentParser( prog=prog, description=desc, formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter) - arg_parser.add_argument( - '-z', '--gzip', dest='gzip', action='store_true', + + ### these warcprox options are not configurable for the benchmarks + # arg_parser.add_argument('-p', '--port', dest='port', default='8000', + # type=int, help='port to listen on') + # arg_parser.add_argument('-b', '--address', dest='address', + # default='localhost', help='address to listen on') + # arg_parser.add_argument('-c', '--cacert', dest='cacert', + # default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), + # help='CA certificate file; if file does not exist, it will be created') + # arg_parser.add_argument('--certs-dir', dest='certs_dir', + # default='./{0}-warcprox-ca'.format(socket.gethostname()), + # help='where to store and load generated certificates') + # arg_parser.add_argument('-d', '--dir', dest='directory', + # default='./warcs', help='where to write warcs') + + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( - '-s', '--size', dest='size', default=1000*1000*1000, type=int, - help='WARC file rollover size threshold in bytes') - arg_parser.add_argument( - '--rollover-idle-time', dest='rollover_idle_time', default=None, - type=int, help=( - 'WARC file rollover idle time threshold in seconds (so that ' - "Friday's last open WARC doesn't sit there all weekend " - 'waiting for more data)')) + '-s', '--size', dest='rollover_size', default=1000*1000*1000, + type=int, help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, type=int, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") try: hash_algos = hashlib.algorithms_guaranteed except AttributeError: hash_algos = hashlib.algorithms - arg_parser.add_argument( - '-g', '--digest-algorithm', dest='digest_algorithm', - default='sha1', help='digest algorithm, one of %s' % hash_algos) + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument( - '--method-filter', metavar='HTTP_METHOD', - action='append', help=( - 'only record requests with the given http method(s) (can be ' - 'used more than once)')) - arg_parser.add_argument( - '--stats-db-file', dest='stats_db_file', - default=os.path.join(tmpdir, 'stats.db'), help=( - 'persistent statistics database file; empty string or ' - '/dev/null disables statistics tracking')) + arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', + action='append', help='only record requests with the given http method(s) (can be used more than once)') + arg_parser.add_argument('--stats-db-file', dest='stats_db_file', + default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', + type=int, default=None, help='port to listen on for instant playback') + arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + default='./warcprox-playback-index.db', + help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() - group.add_argument( - '-j', '--dedup-db-file', dest='dedup_db_file', - default=os.path.join(tmpdir, 'dedup.db'), help=( - 'persistent deduplication database file; empty string or ' - '/dev/null disables deduplication')) - group.add_argument( - '--rethinkdb-servers', dest='rethinkdb_servers', help=( - 'rethinkdb servers, used for dedup and stats if specified; ' - 'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')) - # arg_parser.add_argument( - # '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=( - # 'rethinkdb database name (ignored unless --rethinkdb-servers ' - # 'is specified)')) + group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', + help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', + help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') + arg_parser.add_argument('--rethinkdb-big-table', + dest='rethinkdb_big_table', action='store_true', default=False, + help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument( - '--rethinkdb-big-table', dest='rethinkdb_big_table', - action='store_true', default=False, help=( - 'use a big rethinkdb table called "captures", instead of a ' - 'small table called "dedup"; table is suitable for use as ' - 'index for playback (ignored unless --rethinkdb-servers is ' - 'specified)')) + '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', + default='captures', help=argparse.SUPPRESS) + arg_parser.add_argument('--queue-size', dest='queue_size', type=int, + default=500, help=argparse.SUPPRESS) + arg_parser.add_argument('--max-threads', dest='max_threads', type=int, + help=argparse.SUPPRESS) + arg_parser.add_argument('--profile', action='store_true', default=False, + help=argparse.SUPPRESS) arg_parser.add_argument( - '--queue-size', dest='queue_size', type=int, default=1, help=( - 'max size of the queue of urls waiting to be processed by ' - 'the warc writer thread')) + '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', + default=None, help=( + 'host:port of tor socks proxy, used only to connect to ' + '.onion sites')) arg_parser.add_argument( - '--max-threads', dest='max_threads', type=int, help=( - 'number of proxy server threads (if not specified, chosen based ' - 'on system resource limits')) - arg_parser.add_argument( - '--version', action='version', - version='warcprox %s' % warcprox.__version__) - arg_parser.add_argument( - '-v', '--verbose', dest='verbose', action='store_true', - help='verbose logging') - arg_parser.add_argument( - '--trace', dest='trace', action='store_true', - help='trace-level logging') - arg_parser.add_argument( - '--profile', dest='profile', action='store_true', default=False, - help='profile the warc writer thread') + '--plugin', metavar='PLUGIN_CLASS', dest='plugins', + action='append', help=( + 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' + 'May be used multiple times to register multiple plugins. ' + 'Plugin classes are loaded from the regular python module ' + 'search path. They will be instantiated with no arguments and ' + 'must have a method `notify(self, recorded_url, records)` ' + 'which will be called for each url, after warc records have ' + 'been written.')) + arg_parser.add_argument('--version', action='version', + version="warcprox {}".format(warcprox.__version__)) + arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') + arg_parser.add_argument('--trace', dest='trace', action='store_true') + arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') arg_parser.add_argument( '--requests', dest='requests', type=int, default=200, help='number of urls to fetch') diff --git a/setup.py b/setup.py index aeb9455..0e6b5f0 100755 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ deps = [ 'warctools', 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev87', + 'urllib3', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu ] @@ -50,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev105', + version='2.2.1b2.dev107', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', @@ -59,7 +60,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tests/run-tests.sh b/tests/run-tests.sh index f962ca8..a7a819c 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -38,7 +38,7 @@ do && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ - && pip --log-file /tmp/pip.log install . pytest requests warcio \ + && pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && py.test -v tests \ && py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \ && py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \ diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..124efb5 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,46 @@ +import mock +from warcprox.dedup import CdxServerDedup + + +def test_cdx_dedup(): + # Mock CDX Server responses to simulate found, not found and errors. + with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: + url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + + # found case + result = mock.Mock() + result.status = 200 + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res["date"] == b"2017-02-03T04:05:03Z" + + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 53b6f71..95c8aba 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -570,6 +570,22 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n" +def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): + url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) + request_meta = {"accept": ["capture-metadata"]} + headers = {"Warcprox-Meta": json.dumps(request_meta)} + response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['Warcprox-Meta'] + data = json.loads(response.headers['Warcprox-Meta']) + assert data['capture-metadata'] + try: + dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'], + '%Y-%m-%dT%H:%M:%SZ') + assert dt + except ValueError: + pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) + def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) diff --git a/tests/test_writer.py b/tests/test_writer.py new file mode 100644 index 0000000..9ce0e13 --- /dev/null +++ b/tests/test_writer.py @@ -0,0 +1,57 @@ +import os +import fcntl +from multiprocessing import Process, Queue +from datetime import datetime +import pytest +from warcprox.mitmproxy import ProxyingRecorder +from warcprox.warcproxy import RecordedUrl +from warcprox.writer import WarcWriter +from warcprox import Options + +recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + +recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', + status=200, client_ip='127.0.0.2', + request_data=b'abc', + response_recorder=recorder, + remote_ip='127.0.0.3', + timestamp=datetime.utcnow()) + + +def lock_file(queue, filename): + """Try to lock file and return 1 if successful, else return 0. + It is necessary to run this method in a different process to test locking. + """ + try: + fi = open(filename, 'ab') + fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fi.close() + queue.put('OBTAINED LOCK') + except IOError: + queue.put('FAILED TO OBTAIN LOCK') + + +def test_warc_writer_locking(tmpdir): + """Test if WarcWriter is locking WARC files. + When we don't have the .open suffix, WarcWriter locks the file and the + external process trying to ``lock_file`` fails (result=0). + """ + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) + wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter.write_records(recorded_url) + warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + # launch another process and try to lock WARC file + queue = Queue() + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'FAILED TO OBTAIN LOCK' + wwriter.close_writer() + + # locking must succeed after writer has closed the WARC file. + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'OBTAINED LOCK' diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index d2147b8..a87d49c 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -221,7 +221,7 @@ class RethinkCapturesDedup: self.captures_db = RethinkCaptures(options=options) self.options = options - def lookup(self, digest_key, bucket="__unspecified__"): + def lookup(self, digest_key, bucket="__unspecified__", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") if self.options.base32: diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 21c89f8..6c76ab0 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -31,6 +31,10 @@ import requests import doublethink import rethinkdb as r import datetime +import urllib3 +from urllib3.exceptions import HTTPError + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -76,7 +80,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -111,9 +115,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + recorded_url.dedup_info = dedup_db.lookup(digest_key, + url=recorded_url.url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -157,7 +163,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -177,6 +183,69 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + http_pool = urllib3.PoolManager() + + def __init__(self, cdx_url="https://web.archive.org/cdx/search", + options=warcprox.Options()): + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + Get only the last item (limit=-1) because Wayback Machine has special + performance optimisation to handle that. limit < 0 is very inefficient + in general. Maybe it could be configurable in the future. + + :param digest_key: b'sha1:' (prefix is optional). + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param url: Target URL string + Result must contain: + {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} + """ + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", + limit=-1)) + assert result.status == 200 + if isinstance(digest_key, bytes): + dkey = digest_key + else: + dkey = digest_key.encode('utf-8') + dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey + line = result.data.strip() + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == dkey: + dt = datetime.datetime.strptime( + cdx_ts.decode('ascii'), '%Y%m%d%H%M%S') + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) + except (HTTPError, AssertionError, ValueError) as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass + class TroughDedupDb(object): ''' https://github.com/jkafader/trough @@ -265,7 +334,7 @@ class TroughDedupDb(object): 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) - def lookup(self, digest_key, bucket='__unspecified__'): + def lookup(self, digest_key, bucket='__unspecified__', url=None): read_url = self._read_url(bucket) if not read_url: return None diff --git a/warcprox/main.py b/warcprox/main.py index c8c0ae8..c915917 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -80,6 +80,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', help=argparse.SUPPRESS) arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( @@ -133,6 +135,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): '🐷   url pointing to trough configuration rethinkdb database, ' 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' '/trough_configuration')) + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') arg_parser.add_argument( '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( 'rethinkdb service registry table url; if provided, warcprox ' @@ -205,6 +209,8 @@ def init_controller(args): dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) elif args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 914fb52..b14cddf 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -45,6 +45,7 @@ try: http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client +import json import socket import logging import ssl @@ -163,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp, proxy_client, digest_algorithm, url=url) self.fp = self.recorder - def begin(self): + def begin(self, extra_response_headers={}): http_client.HTTPResponse.begin(self) # reads status line, headers status_and_headers = 'HTTP/1.1 {} {}\r\n'.format( self.status, self.reason) self.msg['Via'] = via_header_value( self.msg.get('Via'), '%0.1f' % (self.version / 10.0)) + if extra_response_headers: + for header, value in extra_response_headers.items(): + self.msg[header] = value + for k,v in self.msg.items(): if k.lower() not in ( 'connection', 'proxy-connection', 'keep-alive', @@ -361,12 +366,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error("exception proxying request", exc_info=True) raise - def _proxy_request(self): + def _proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to read the response and send it to the proxy client, while recording the bytes in transit. Returns a tuple (request, response) where request is the raw request bytes, and response is a ProxyingRecorder. + + :param extra_response_headers: generated on warcprox._proxy_request. + It may contain extra HTTP headers such as ``Warcprox-Meta`` which + are written in the WARC record for this request. ''' # Build request req_str = '{} {} {}\r\n'.format( @@ -407,7 +416,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock, proxy_client=self.connection, digest_algorithm=self.server.digest_algorithm, url=self.url, method=self.command) - prox_rec_res.begin() + prox_rec_res.begin(extra_response_headers=extra_response_headers) buf = prox_rec_res.read(8192) while buf != b'': diff --git a/warcprox/playback.py b/warcprox/playback.py index 663e10a..a9aa47d 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( - self, headers, refers_to, refers_to_target_uri, refers_to_date): + self, headers, refers_to_target_uri, refers_to_date, payload_digest): + """Parameters: + + """ location = self.server.playback_index_db.lookup_exact( - refers_to_target_uri, refers_to_date, record_id=refers_to) + refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -131,7 +134,7 @@ class PlaybackProxyHandler(MitmProxyHandler): pass if errors: - raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) + raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) warc_type = record.get_header(warctools.WarcRecord.TYPE) if warc_type != warctools.WarcRecord.RESPONSE: @@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) - refers_to = record.get_header( - warctools.WarcRecord.REFERS_TO).decode('latin1') refers_to_target_uri = record.get_header( warctools.WarcRecord.REFERS_TO_TARGET_URI).decode( 'latin1') refers_to_date = record.get_header( warctools.WarcRecord.REFERS_TO_DATE).decode('latin1') - + payload_digest = record.get_header( + warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') self.logger.debug( 'revisit record references %s:%s capture of %s', - refers_to_date, refers_to, refers_to_target_uri) + refers_to_date, payload_digest, refers_to_target_uri) return self._send_headers_and_refd_payload( - record.content[1], refers_to, refers_to_target_uri, - refers_to_date) + record.content[1], refers_to_target_uri, refers_to_date, + payload_digest) else: # send it back raw, whatever it is @@ -264,12 +266,12 @@ class PlaybackIndexDb(object): # XXX canonicalize url? url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') # there could be two visits of same url in the same second, and WARC-Date is # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ - # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...} with self._lock: conn = sqlite3.connect(self.file) @@ -283,10 +285,10 @@ class PlaybackIndexDb(object): if date_str in py_value: py_value[date_str].append( - {'f':warcfile, 'o':offset, 'i':record_id_str}) + {'f': warcfile, 'o': offset, 'd': payload_digest_str}) else: py_value[date_str] = [ - {'f':warcfile, 'o':offset, 'i':record_id_str}] + {'f': warcfile, 'o': offset, 'd': payload_digest_str}] json_value = json.dumps(py_value, separators=(',',':')) @@ -314,11 +316,11 @@ class PlaybackIndexDb(object): latest_date = max(py_value) result = py_value[latest_date][0] - result['i'] = result['i'].encode('ascii') + result['d'] = result['d'].encode('ascii') return latest_date, result # in python3 params are bytes - def lookup_exact(self, url, warc_date, record_id): + def lookup_exact(self, url, warc_date, payload_digest): conn = sqlite3.connect(self.file) cursor = conn.execute( 'select value from playback where url = ?', (url,)) @@ -334,14 +336,13 @@ class PlaybackIndexDb(object): if warc_date in py_value: for record in py_value[warc_date]: - if record['i'] == record_id: + if record['d'] == payload_digest: self.logger.debug( "found exact match for (%r,%r,%r)", - warc_date, record_id, url) - record['i'] = record['i'] + warc_date, payload_digest, url) + record['d'] = record['d'] return record else: self.logger.info( - "match not found for (%r,%r,%r)", warc_date, record_id, url) + "match not found for (%r,%r,%r)", warc_date, payload_digest, url) return None - diff --git a/warcprox/warc.py b/warcprox/warc.py index 51b1c35..de0ec06 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -50,7 +50,7 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, - refers_to=recorded_url.dedup_info['id'], + refers_to=recorded_url.dedup_info.get('id'), refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 06983ed..d37e588 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -179,9 +179,14 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): remote_ip = self._remote_server_sock.getpeername()[0] timestamp = datetime.datetime.utcnow() + extra_response_headers = {} + if warcprox_meta and 'accept' in warcprox_meta and \ + 'capture-metadata' in warcprox_meta['accept']: + rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}} + extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( - self) + self, extra_response_headers=extra_response_headers) content_type = None try: diff --git a/warcprox/writer.py b/warcprox/writer.py index cf8d72d..7a1032a 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -24,6 +24,7 @@ from __future__ import absolute_import import logging from datetime import datetime from hanzo import warctools +import fcntl import time import warcprox import os @@ -53,6 +54,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None + self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -70,6 +72,12 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath, exc) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -91,9 +99,17 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + '.open']) + self.directory, self._f_finalname + self._f_open_suffix]) self._f = open(self._fpath, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname)