diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index f595f8b..a05db59 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. arg_parser = argparse.ArgumentParser( prog=prog, description=desc, formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter) - arg_parser.add_argument( - '-z', '--gzip', dest='gzip', action='store_true', + + ### these warcprox options are not configurable for the benchmarks + # arg_parser.add_argument('-p', '--port', dest='port', default='8000', + # type=int, help='port to listen on') + # arg_parser.add_argument('-b', '--address', dest='address', + # default='localhost', help='address to listen on') + # arg_parser.add_argument('-c', '--cacert', dest='cacert', + # default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), + # help='CA certificate file; if file does not exist, it will be created') + # arg_parser.add_argument('--certs-dir', dest='certs_dir', + # default='./{0}-warcprox-ca'.format(socket.gethostname()), + # help='where to store and load generated certificates') + # arg_parser.add_argument('-d', '--dir', dest='directory', + # default='./warcs', help='where to write warcs') + + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( - '-s', '--size', dest='size', default=1000*1000*1000, type=int, - help='WARC file rollover size threshold in bytes') - arg_parser.add_argument( - '--rollover-idle-time', dest='rollover_idle_time', default=None, - type=int, help=( - 'WARC file rollover idle time threshold in seconds (so that ' - "Friday's last open WARC doesn't sit there all weekend " - 'waiting for more data)')) + '-s', '--size', dest='rollover_size', default=1000*1000*1000, + type=int, help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, type=int, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") try: hash_algos = hashlib.algorithms_guaranteed except AttributeError: hash_algos = hashlib.algorithms - arg_parser.add_argument( - '-g', '--digest-algorithm', dest='digest_algorithm', - default='sha1', help='digest algorithm, one of %s' % hash_algos) + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument( - '--method-filter', metavar='HTTP_METHOD', - action='append', help=( - 'only record requests with the given http method(s) (can be ' - 'used more than once)')) - arg_parser.add_argument( - '--stats-db-file', dest='stats_db_file', - default=os.path.join(tmpdir, 'stats.db'), help=( - 'persistent statistics database file; empty string or ' - '/dev/null disables statistics tracking')) + arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', + action='append', help='only record requests with the given http method(s) (can be used more than once)') + arg_parser.add_argument('--stats-db-file', dest='stats_db_file', + default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', + type=int, default=None, help='port to listen on for instant playback') + arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + default='./warcprox-playback-index.db', + help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() - group.add_argument( - '-j', '--dedup-db-file', dest='dedup_db_file', - default=os.path.join(tmpdir, 'dedup.db'), help=( - 'persistent deduplication database file; empty string or ' - '/dev/null disables deduplication')) - group.add_argument( - '--rethinkdb-servers', dest='rethinkdb_servers', help=( - 'rethinkdb servers, used for dedup and stats if specified; ' - 'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')) - # arg_parser.add_argument( - # '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=( - # 'rethinkdb database name (ignored unless --rethinkdb-servers ' - # 'is specified)')) + group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', + help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', + help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') + arg_parser.add_argument('--rethinkdb-big-table', + dest='rethinkdb_big_table', action='store_true', default=False, + help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument( - '--rethinkdb-big-table', dest='rethinkdb_big_table', - action='store_true', default=False, help=( - 'use a big rethinkdb table called "captures", instead of a ' - 'small table called "dedup"; table is suitable for use as ' - 'index for playback (ignored unless --rethinkdb-servers is ' - 'specified)')) + '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', + default='captures', help=argparse.SUPPRESS) + arg_parser.add_argument('--queue-size', dest='queue_size', type=int, + default=500, help=argparse.SUPPRESS) + arg_parser.add_argument('--max-threads', dest='max_threads', type=int, + help=argparse.SUPPRESS) + arg_parser.add_argument('--profile', action='store_true', default=False, + help=argparse.SUPPRESS) arg_parser.add_argument( - '--queue-size', dest='queue_size', type=int, default=1, help=( - 'max size of the queue of urls waiting to be processed by ' - 'the warc writer thread')) + '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', + default=None, help=( + 'host:port of tor socks proxy, used only to connect to ' + '.onion sites')) arg_parser.add_argument( - '--max-threads', dest='max_threads', type=int, help=( - 'number of proxy server threads (if not specified, chosen based ' - 'on system resource limits')) - arg_parser.add_argument( - '--version', action='version', - version='warcprox %s' % warcprox.__version__) - arg_parser.add_argument( - '-v', '--verbose', dest='verbose', action='store_true', - help='verbose logging') - arg_parser.add_argument( - '--trace', dest='trace', action='store_true', - help='trace-level logging') - arg_parser.add_argument( - '--profile', dest='profile', action='store_true', default=False, - help='profile the warc writer thread') + '--plugin', metavar='PLUGIN_CLASS', dest='plugins', + action='append', help=( + 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' + 'May be used multiple times to register multiple plugins. ' + 'Plugin classes are loaded from the regular python module ' + 'search path. They will be instantiated with no arguments and ' + 'must have a method `notify(self, recorded_url, records)` ' + 'which will be called for each url, after warc records have ' + 'been written.')) + arg_parser.add_argument('--version', action='version', + version="warcprox {}".format(warcprox.__version__)) + arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') + arg_parser.add_argument('--trace', dest='trace', action='store_true') + arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') arg_parser.add_argument( '--requests', dest='requests', type=int, default=200, help='number of urls to fetch') diff --git a/setup.py b/setup.py index 2e19b8e..3de51f7 100755 --- a/setup.py +++ b/setup.py @@ -39,8 +39,10 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', + 'urllib3', 'doublethink>=0.2.0.dev81', 'PySocks', + 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu ] try: import concurrent.futures @@ -49,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev98', + version='2.2.1b2.dev107', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', @@ -58,7 +60,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..80db2f8 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -40,7 +40,7 @@ do && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ - && pip --log-file /tmp/pip.log install . pytest requests warcio \ + && pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && py.test -v tests \ && py.test -v --rethinkdb-servers=localhost tests \ && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..124efb5 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,46 @@ +import mock +from warcprox.dedup import CdxServerDedup + + +def test_cdx_dedup(): + # Mock CDX Server responses to simulate found, not found and errors. + with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: + url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + + # found case + result = mock.Mock() + result.status = 200 + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res["date"] == b"2017-02-03T04:05:03Z" + + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9e5fa6a..cf43949 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -60,6 +60,7 @@ except ImportError: import certauth.certauth import warcprox +import warcprox.main try: import http.client as http_client @@ -241,173 +242,44 @@ def https_daemon(request, cert): return https_daemon @pytest.fixture(scope="module") -def captures_db(request, rethinkdb_servers, rethinkdb_big_table): - captures_db = None +def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): + orig_dir = os.getcwd() + work_dir = tempfile.mkdtemp() + logging.info('changing to working directory %r', work_dir) + os.chdir(work_dir) + + argv = ['warcprox', + '--method-filter=GET', + '--method-filter=POST', + '--port=0', + '--playback-port=0', + '--onion-tor-socks-proxy=localhost:9050'] if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - if rethinkdb_big_table: - db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - captures_db = warcprox.bigtable.RethinkCaptures(rr) - captures_db.start() + rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) + argv.append('--rethinkdb-db=%s' % rethinkdb_db) + if rethinkdb_big_table: + argv.append('--rethinkdb-big-table') - def fin(): - if captures_db: - captures_db.close() - # logging.info('dropping rethinkdb database {}'.format(db)) - # result = captures_db.rr.db_drop(db).run() - # logging.info("result=%s", result) - request.addfinalizer(fin) + args = warcprox.main.parse_args(argv) + warcprox_ = warcprox.main.init_controller(args) - return captures_db - -@pytest.fixture(scope="module") -def rethink_dedup_db(request, rethinkdb_servers, captures_db): - ddb = None - if rethinkdb_servers: - if captures_db: - ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db) - else: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - ddb = warcprox.dedup.RethinkDedupDb(rr) - - def fin(): - if rethinkdb_servers: - ddb.close() - if not captures_db: - logging.info('dropping rethinkdb database {}'.format(db)) - result = ddb.rr.db_drop(db).run() - logging.info("result=%s", result) - request.addfinalizer(fin) - - return ddb - -@pytest.fixture(scope="module") -def dedup_db(request, rethink_dedup_db): - dedup_db_file = None - ddb = rethink_dedup_db - if not ddb: - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) - f.close() - dedup_db_file = f.name - ddb = warcprox.dedup.DedupDb(dedup_db_file) - - def fin(): - if dedup_db_file: - logging.info('deleting file {}'.format(dedup_db_file)) - os.unlink(dedup_db_file) - request.addfinalizer(fin) - - return ddb - -@pytest.fixture(scope="module") -def stats_db(request, rethinkdb_servers): - if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - sdb = warcprox.stats.RethinkStatsDb(rr) - sdb.start() - else: - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) - f.close() - stats_db_file = f.name - sdb = warcprox.stats.StatsDb(stats_db_file) - - def fin(): - sdb.close() - if rethinkdb_servers: - logging.info('dropping rethinkdb database {}'.format(db)) - result = sdb.rr.db_drop(db).run() - logging.info("result=%s", result) - # else: - # logging.info('deleting file {}'.format(stats_db_file)) - # os.unlink(stats_db_file) - request.addfinalizer(fin) - - return sdb - -@pytest.fixture(scope="module") -def service_registry(request, rethinkdb_servers): - if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - - def fin(): - logging.info('dropping rethinkdb database {}'.format(db)) - result = rr.db_drop(db).run() - logging.info("result=%s", result) - request.addfinalizer(fin) - - return doublethink.ServiceRegistry(rr) - else: - return None - -@pytest.fixture(scope="module") -def warcprox_(request, captures_db, dedup_db, stats_db, service_registry): - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True) - f.close() # delete it, or CertificateAuthority will try to read it - ca_file = f.name - ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca') - ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test') - - recorded_url_q = warcprox.TimestampedQueue() - - options = warcprox.Options(port=0, playback_port=0, - onion_tor_socks_proxy='localhost:9050') - proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q, - stats_db=stats_db, options=options) - options.port = proxy.server_port - - options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-') - - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False) - f.close() - playback_index_db_file = f.name - playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file) - playback_proxy = warcprox.playback.PlaybackProxy(ca=ca, - playback_index_db=playback_index_db, options=options) - options.playback_proxy = playback_proxy.server_port - - options.method_filter = ['GET','POST'] - - options.crawl_log_dir = tempfile.mkdtemp( - prefix='warcprox-test-', suffix='-crawl-log') - crawl_logger = warcprox.crawl_log.CrawlLogger(options.crawl_log_dir) - - writer_pool = warcprox.writer.WarcWriterPool(options) - warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, listeners=[ - captures_db or dedup_db, playback_index_db, stats_db, - crawl_logger], options=options) - for i in range(int(proxy.max_threads ** 0.5))] - - warcprox_ = warcprox.controller.WarcproxController( - proxy=proxy, warc_writer_threads=warc_writer_threads, - playback_proxy=playback_proxy, service_registry=service_registry, - options=options) logging.info('starting warcprox') - warcprox_thread = threading.Thread(name='WarcproxThread', - target=warcprox_.run_until_shutdown) + warcprox_thread = threading.Thread( + name='WarcproxThread', target=warcprox_.run_until_shutdown) warcprox_thread.start() def fin(): - logging.info('stopping warcprox') warcprox_.stop.set() warcprox_thread.join() - for f in (ca_file, ca_dir, options.directory, playback_index_db_file, - options.crawl_log_dir): - if os.path.isdir(f): - logging.info('deleting directory {}'.format(f)) - shutil.rmtree(f) - else: - logging.info('deleting file {}'.format(f)) - os.unlink(f) + if rethinkdb_servers: + logging.info('dropping rethinkdb database %r', rethinkdb_db) + rr = doublethink.Rethinker(rethinkdb_servers) + result = rr.db_drop(rethinkdb_db).run() + logging.info('deleting working directory %r', work_dir) + os.chdir(orig_dir) + shutil.rmtree(work_dir) + request.addfinalizer(fin) return warcprox_ @@ -683,6 +555,22 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n" +def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): + url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) + request_meta = {"accept": ["capture-metadata"]} + headers = {"Warcprox-Meta": json.dumps(request_meta)} + response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['Warcprox-Meta'] + data = json.loads(response.headers['Warcprox-Meta']) + assert data['capture-metadata'] + try: + dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'], + '%Y-%m-%dT%H:%M:%SZ') + assert dt + except ValueError: + pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) + def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) @@ -1233,9 +1121,8 @@ def test_method_filter( assert response.content == payload def test_dedup_ok_flag( - https_daemon, http_daemon, warcprox_, archiving_proxies, - rethinkdb_big_table): - if not rethinkdb_big_table: + https_daemon, http_daemon, warcprox_, archiving_proxies): + if not warcprox_.options.rethinkdb_big_table: # this feature is n/a unless using rethinkdb big table return @@ -1319,11 +1206,11 @@ def test_status_api(warcprox_): assert status['pid'] == os.getpid() assert status['threads'] == warcprox_.proxy.pool._max_workers -def test_svcreg_status(warcprox_, service_registry): - if service_registry: +def test_svcreg_status(warcprox_): + if warcprox_.service_registry: start = time.time() while time.time() - start < 15: - status = service_registry.available_service('warcprox') + status = warcprox_.service_registry.available_service('warcprox') if status: break time.sleep(0.5) @@ -1380,11 +1267,11 @@ def test_controller_with_defaults(): assert not wwt.writer_pool.default_warc_writer.record_builder.base32 assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' -def test_choose_a_port_for_me(service_registry): +def test_choose_a_port_for_me(warcprox_): options = warcprox.Options() options.port = 0 controller = warcprox.controller.WarcproxController( - service_registry=service_registry, options=options) + service_registry=warcprox_.service_registry, options=options) assert controller.proxy.server_port != 0 assert controller.proxy.server_port != 8000 assert controller.proxy.server_address == ( @@ -1400,12 +1287,12 @@ def test_choose_a_port_for_me(service_registry): status = json.loads(response.content.decode('ascii')) assert status['port'] == controller.proxy.server_port - if service_registry: + if warcprox_.service_registry: # check that service registry entry lists the correct port start = time.time() ports = [] while time.time() - start < 30: - svcs = service_registry.available_services('warcprox') + svcs = warcprox_.service_registry.available_services('warcprox') ports = [svc['port'] for svc in svcs] if controller.proxy.server_port in ports: break @@ -1552,6 +1439,44 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies): 'contentSize', 'warcFilename', 'warcFileOffset'} assert extra_info['contentSize'] == 145 +def test_long_warcprox_meta( + warcprox_, http_daemon, archiving_proxies, playback_proxies): + url = 'http://localhost:%s/b/g' % http_daemon.server_port + + # create a very long warcprox-meta header + headers = {'Warcprox-Meta': json.dumps({ + 'x':'y'*1000000, 'warc-prefix': 'test_long_warcprox_meta'})} + response = requests.get( + url, proxies=archiving_proxies, headers=headers, verify=False) + assert response.status_code == 200 + + # wait for writer thread to process + time.sleep(0.5) + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + time.sleep(0.5) + time.sleep(0.5) + + # check that warcprox-meta was parsed and honored ("warc-prefix" param) + assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + warc_path = os.path.join(writer.directory, writer._f_finalname) + warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + assert os.path.exists(warc_path) + + # read the warc + with open(warc_path, 'rb') as f: + rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) + record = next(rec_iter) + assert record.rec_type == 'warcinfo' + record = next(rec_iter) + assert record.rec_type == 'response' + assert record.rec_headers.get_header('warc-target-uri') == url + record = next(rec_iter) + assert record.rec_type == 'request' + assert record.rec_headers.get_header('warc-target-uri') == url + with pytest.raises(StopIteration): + next(rec_iter) + if __name__ == '__main__': pytest.main() diff --git a/tests/test_writer.py b/tests/test_writer.py new file mode 100644 index 0000000..9ce0e13 --- /dev/null +++ b/tests/test_writer.py @@ -0,0 +1,57 @@ +import os +import fcntl +from multiprocessing import Process, Queue +from datetime import datetime +import pytest +from warcprox.mitmproxy import ProxyingRecorder +from warcprox.warcproxy import RecordedUrl +from warcprox.writer import WarcWriter +from warcprox import Options + +recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + +recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', + status=200, client_ip='127.0.0.2', + request_data=b'abc', + response_recorder=recorder, + remote_ip='127.0.0.3', + timestamp=datetime.utcnow()) + + +def lock_file(queue, filename): + """Try to lock file and return 1 if successful, else return 0. + It is necessary to run this method in a different process to test locking. + """ + try: + fi = open(filename, 'ab') + fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fi.close() + queue.put('OBTAINED LOCK') + except IOError: + queue.put('FAILED TO OBTAIN LOCK') + + +def test_warc_writer_locking(tmpdir): + """Test if WarcWriter is locking WARC files. + When we don't have the .open suffix, WarcWriter locks the file and the + external process trying to ``lock_file`` fails (result=0). + """ + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) + wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter.write_records(recorded_url) + warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + # launch another process and try to lock WARC file + queue = Queue() + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'FAILED TO OBTAIN LOCK' + wwriter.close_writer() + + # locking must succeed after writer has closed the WARC file. + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'OBTAINED LOCK' diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 94cf9c9..f3d897d 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -25,8 +25,6 @@ USA. from __future__ import absolute_import import logging -from hanzo import warctools -import random import warcprox import base64 import urlcanon @@ -115,6 +113,7 @@ class RethinkCaptures: [r.row["abbr_canon_surt"], r.row["timestamp"]]).run() self.rr.table(self.table).index_create("sha1_warc_type", [ r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run() + self.rr.table(self.table).index_wait().run() def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": @@ -221,7 +220,7 @@ class RethinkCapturesDedup: self.captures_db = captures_db self.options = options - def lookup(self, digest_key, bucket="__unspecified__"): + def lookup(self, digest_key, bucket="__unspecified__", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") if self.options.base32: diff --git a/warcprox/controller.py b/warcprox/controller.py index 00f5eb7..42f71de 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -176,7 +176,7 @@ class WarcproxController(object): assert(all( wwt.dedup_db is self.warc_writer_threads[0].dedup_db for wwt in self.warc_writer_threads)) - if self.warc_writer_threads[0].dedup_db: + if any((t.dedup_db for t in self.warc_writer_threads)): self.warc_writer_threads[0].dedup_db.start() for wwt in self.warc_writer_threads: @@ -211,8 +211,6 @@ class WarcproxController(object): if self.proxy.stats_db: self.proxy.stats_db.stop() - if self.warc_writer_threads[0].dedup_db: - self.warc_writer_threads[0].dedup_db.close() self.proxy_thread.join() if self.playback_proxy is not None: diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78c5434..e70f5f9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -21,13 +21,17 @@ USA. from __future__ import absolute_import +from datetime import datetime import logging import os import json from hanzo import warctools import warcprox -import random import sqlite3 +import urllib3 +from urllib3.exceptions import HTTPError + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -55,15 +59,6 @@ class DedupDb(object): conn.commit() conn.close() - def stop(self): - pass - - def close(self): - pass - - def sync(self): - pass - def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') @@ -82,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -117,9 +112,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + recorded_url.dedup_info = dedup_db.lookup(digest_key, + url=recorded_url.url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -151,15 +148,6 @@ class RethinkDedupDb: def start(self): pass - def stop(self): - pass - - def close(self): - pass - - def sync(self): - pass - def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) @@ -173,7 +161,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -193,3 +181,66 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) + +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + http_pool = urllib3.PoolManager() + + def __init__(self, cdx_url="https://web.archive.org/cdx/search", + options=warcprox.Options()): + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + Get only the last item (limit=-1) because Wayback Machine has special + performance optimisation to handle that. limit < 0 is very inefficient + in general. Maybe it could be configurable in the future. + + :param digest_key: b'sha1:' (prefix is optional). + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param url: Target URL string + Result must contain: + {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} + """ + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", + limit=-1)) + assert result.status == 200 + if isinstance(digest_key, bytes): + dkey = digest_key + else: + dkey = digest_key.encode('utf-8') + dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey + line = result.data.strip() + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == dkey: + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) + except (HTTPError, AssertionError, ValueError) as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass diff --git a/warcprox/main.py b/warcprox/main.py index 70d4079..e21ff6a 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -39,7 +39,6 @@ import signal import threading import certauth.certauth import warcprox -import re import doublethink import cryptography.hazmat.backends.openssl import importlib @@ -79,6 +78,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', help=argparse.SUPPRESS) arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( @@ -107,6 +108,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', @@ -195,6 +198,9 @@ def init_controller(args): else: dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 6297dcc..b14cddf 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -37,8 +37,15 @@ except ImportError: import urlparse as urllib_parse try: import http.client as http_client + # In python3 http.client.parse_headers() enforces http_client._MAXLINE + # as max length of an http header line, but we want to support very + # long warcprox-meta headers, so we tweak it here. Python2 doesn't seem + # to enforce any limit. Multiline headers could be an option but it + # turns out those are illegal as of RFC 7230. Plus, this is easier. + http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client +import json import socket import logging import ssl @@ -157,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp, proxy_client, digest_algorithm, url=url) self.fp = self.recorder - def begin(self): + def begin(self, extra_response_headers={}): http_client.HTTPResponse.begin(self) # reads status line, headers status_and_headers = 'HTTP/1.1 {} {}\r\n'.format( self.status, self.reason) self.msg['Via'] = via_header_value( self.msg.get('Via'), '%0.1f' % (self.version / 10.0)) + if extra_response_headers: + for header, value in extra_response_headers.items(): + self.msg[header] = value + for k,v in self.msg.items(): if k.lower() not in ( 'connection', 'proxy-connection', 'keep-alive', @@ -355,12 +366,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error("exception proxying request", exc_info=True) raise - def _proxy_request(self): + def _proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to read the response and send it to the proxy client, while recording the bytes in transit. Returns a tuple (request, response) where request is the raw request bytes, and response is a ProxyingRecorder. + + :param extra_response_headers: generated on warcprox._proxy_request. + It may contain extra HTTP headers such as ``Warcprox-Meta`` which + are written in the WARC record for this request. ''' # Build request req_str = '{} {} {}\r\n'.format( @@ -401,7 +416,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock, proxy_client=self.connection, digest_algorithm=self.server.digest_algorithm, url=self.url, method=self.command) - prox_rec_res.begin() + prox_rec_res.begin(extra_response_headers=extra_response_headers) buf = prox_rec_res.read(8192) while buf != b'': diff --git a/warcprox/playback.py b/warcprox/playback.py index 663e10a..a9aa47d 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( - self, headers, refers_to, refers_to_target_uri, refers_to_date): + self, headers, refers_to_target_uri, refers_to_date, payload_digest): + """Parameters: + + """ location = self.server.playback_index_db.lookup_exact( - refers_to_target_uri, refers_to_date, record_id=refers_to) + refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -131,7 +134,7 @@ class PlaybackProxyHandler(MitmProxyHandler): pass if errors: - raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) + raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) warc_type = record.get_header(warctools.WarcRecord.TYPE) if warc_type != warctools.WarcRecord.RESPONSE: @@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) - refers_to = record.get_header( - warctools.WarcRecord.REFERS_TO).decode('latin1') refers_to_target_uri = record.get_header( warctools.WarcRecord.REFERS_TO_TARGET_URI).decode( 'latin1') refers_to_date = record.get_header( warctools.WarcRecord.REFERS_TO_DATE).decode('latin1') - + payload_digest = record.get_header( + warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') self.logger.debug( 'revisit record references %s:%s capture of %s', - refers_to_date, refers_to, refers_to_target_uri) + refers_to_date, payload_digest, refers_to_target_uri) return self._send_headers_and_refd_payload( - record.content[1], refers_to, refers_to_target_uri, - refers_to_date) + record.content[1], refers_to_target_uri, refers_to_date, + payload_digest) else: # send it back raw, whatever it is @@ -264,12 +266,12 @@ class PlaybackIndexDb(object): # XXX canonicalize url? url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') # there could be two visits of same url in the same second, and WARC-Date is # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ - # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...} with self._lock: conn = sqlite3.connect(self.file) @@ -283,10 +285,10 @@ class PlaybackIndexDb(object): if date_str in py_value: py_value[date_str].append( - {'f':warcfile, 'o':offset, 'i':record_id_str}) + {'f': warcfile, 'o': offset, 'd': payload_digest_str}) else: py_value[date_str] = [ - {'f':warcfile, 'o':offset, 'i':record_id_str}] + {'f': warcfile, 'o': offset, 'd': payload_digest_str}] json_value = json.dumps(py_value, separators=(',',':')) @@ -314,11 +316,11 @@ class PlaybackIndexDb(object): latest_date = max(py_value) result = py_value[latest_date][0] - result['i'] = result['i'].encode('ascii') + result['d'] = result['d'].encode('ascii') return latest_date, result # in python3 params are bytes - def lookup_exact(self, url, warc_date, record_id): + def lookup_exact(self, url, warc_date, payload_digest): conn = sqlite3.connect(self.file) cursor = conn.execute( 'select value from playback where url = ?', (url,)) @@ -334,14 +336,13 @@ class PlaybackIndexDb(object): if warc_date in py_value: for record in py_value[warc_date]: - if record['i'] == record_id: + if record['d'] == payload_digest: self.logger.debug( "found exact match for (%r,%r,%r)", - warc_date, record_id, url) - record['i'] = record['i'] + warc_date, payload_digest, url) + record['d'] = record['d'] return record else: self.logger.info( - "match not found for (%r,%r,%r)", warc_date, record_id, url) + "match not found for (%r,%r,%r)", warc_date, payload_digest, url) return None - diff --git a/warcprox/stats.py b/warcprox/stats.py index 52a5b47..55693a2 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -25,7 +25,6 @@ import logging import os import json from hanzo import warctools -import random import warcprox import threading import rethinkdb as r diff --git a/warcprox/warc.py b/warcprox/warc.py index fbc2a33..de0ec06 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -27,7 +27,6 @@ import hashlib import socket import hanzo.httptools from hanzo import warctools -import warcprox import datetime class WarcRecordBuilder: @@ -51,7 +50,7 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, - refers_to=recorded_url.dedup_info['id'], + refers_to=recorded_url.dedup_info.get('id'), refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 95ca81f..544dc61 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -35,15 +35,12 @@ try: except ImportError: import Queue as queue import logging -import re -import traceback import json import socket from hanzo import warctools from certauth.certauth import CertificateAuthority import warcprox import datetime -import ipaddress import urlcanon import os @@ -195,9 +192,22 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): remote_ip = self._remote_server_sock.getpeername()[0] timestamp = datetime.datetime.utcnow() + extra_response_headers = {} + if warcprox_meta and 'accept' in warcprox_meta and \ + 'capture-metadata' in warcprox_meta['accept']: + rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}} + extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( - self) + self, extra_response_headers=extra_response_headers) + + content_type = None + try: + content_type = prox_rec_res.headers.get('content-type') + except AttributeError: # py2 + raw = prox_rec_res.msg.getrawheader('content-type') + if raw: + content_type = raw.strip() recorded_url = RecordedUrl( url=self.url, request_data=req, @@ -205,8 +215,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta=warcprox_meta, status=prox_rec_res.status, size=prox_rec_res.recorder.len, client_ip=self.client_address[0], - content_type=prox_rec_res.getheader("Content-Type"), - method=self.command, timestamp=timestamp, host=self.hostname, + content_type=content_type, method=self.command, + timestamp=timestamp, host=self.hostname, duration=datetime.datetime.utcnow()-timestamp, referer=self.headers.get('referer')) self.server.recorded_url_q.put(recorded_url) diff --git a/warcprox/writer.py b/warcprox/writer.py index 0c503bf..7a1032a 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -24,10 +24,10 @@ from __future__ import absolute_import import logging from datetime import datetime from hanzo import warctools +import fcntl import time import warcprox import os -import socket import string import random import threading @@ -54,6 +54,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None + self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -71,6 +72,12 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath, exc) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -92,9 +99,17 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + '.open']) + self.directory, self._f_finalname + self._f_open_suffix]) self._f = open(self._fpath, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 0e73b97..e422a65 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -29,13 +29,8 @@ except ImportError: import logging import threading -import os -import hashlib import time -import socket -import base64 from datetime import datetime -import hanzo.httptools from hanzo import warctools import warcprox import cProfile @@ -46,7 +41,7 @@ class WarcWriterThread(threading.Thread): def __init__( self, name='WarcWriterThread', recorded_url_q=None, - writer_pool=None, dedup_db=None, listeners=None, + writer_pool=None, dedup_db=None, listeners=[], options=warcprox.Options()): """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" threading.Thread.__init__(self, name=name)