Merge branch 'master' into crawl-log

* master:
  Update docstring
  Move Warcprox-Meta header construction to warcproxy
  Improve test_writer tests
  Replace timestamp parameter with more generic request/response syntax
  Return capture timestamp
  Swap fcntl.flock with fcntl.lockf
  Unit test fix for Python2 compatibility
  Test WarcWriter file locking when no_warc_open_suffix=True
  Rename writer var and add exception handling
  Acquire and exclusive file lock when not using .open WARC suffix
  Add hidden --no-warc-open-suffix CLI option
  Fix missing dummy url param in bigtable lookup method
  back to dev version number
  version 2.2 for pypi to address https://github.com/internetarchive/warcprox/issues/42
  Expand comment with limit=-1 explanation
  Drop unnecessary split for newline in CDX results
  fix benchmarks (update command line args)
  Update CdxServerDedup lookup algorithm
  Pass url instead of recorded_url obj to dedup lookup methods
  Filter out warc/revisit records in CdxServerDedup
  Improve CdxServerDedup implementation
  Fix minor CdxServerDedup unit test
  Fix bug with dedup_info date encoding
  Add mock pkg to run-tests.sh
  Add CdxServerDedup unit tests and improve its exception handling
  Add CDX Server based deduplication
  cryptography lib version 2.1.1 is causing problems
  Revert changes to test_warcprox.py
  Revert changes to bigtable and dedup
  Revert warc to previous behavior
  Update unit test
  Replace invalid warcfilename variable in playback
  Stop using WarcRecord.REFERS_TO header and use payload_digest instead
  greatly simplify automated test setup by reusing initialization code from the command line executable; this also has the benefit of testing that initialization code
  avoid TypeError: 'NoneType' object is not iterable exception at shutdown
  wait for rethinkdb indexes to be ready
  Remove deleted ``close`` method call from test.
  bump dev version number after merging pull requests
  Add missing "," in deps
  Remove tox.ini, move warcio to test_requires
  allow very long request header lines, to support large warcprox-meta header values
  Remove redundant stop() & sync() dedup methods
  Remove redundant close method from DedupDb and RethinkDedupDb
  Remove unused imports
  Add missing packages from setup.py, add tox config.
  fix python2 tests
  don't use http.client.HTTPResponse.getheader() to get the content-type header, because it can return a comma-delimited string
  fix zero-indexing of warc_writer_threads so they can be disabled via empty list
This commit is contained in:
Noah Levitt 2017-11-09 11:13:02 -08:00
commit 5c18054d37
18 changed files with 432 additions and 301 deletions

View File

@ -47,6 +47,7 @@ Usage
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
[--cdxserver-dedup CDX_SERVER_URL]
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
@ -100,6 +101,9 @@ Usage
persistent deduplication database file; empty
string or /dev/null disables deduplication
(default: ./warcprox.sqlite)
--cdxserver-dedup CDX_SERVER_URL
use a CDX server for deduplication
(default: None)
--rethinkdb-servers RETHINKDB_SERVERS
rethinkdb servers, used for dedup and stats if
specified; e.g.

View File

@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
arg_parser = argparse.ArgumentParser(
prog=prog, description=desc,
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
arg_parser.add_argument(
'-z', '--gzip', dest='gzip', action='store_true',
### these warcprox options are not configurable for the benchmarks
# arg_parser.add_argument('-p', '--port', dest='port', default='8000',
# type=int, help='port to listen on')
# arg_parser.add_argument('-b', '--address', dest='address',
# default='localhost', help='address to listen on')
# arg_parser.add_argument('-c', '--cacert', dest='cacert',
# default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
# help='CA certificate file; if file does not exist, it will be created')
# arg_parser.add_argument('--certs-dir', dest='certs_dir',
# default='./{0}-warcprox-ca'.format(socket.gethostname()),
# help='where to store and load generated certificates')
# arg_parser.add_argument('-d', '--dir', dest='directory',
# default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument(
'-s', '--size', dest='size', default=1000*1000*1000, type=int,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument(
'--rollover-idle-time', dest='rollover_idle_time', default=None,
type=int, help=(
'WARC file rollover idle time threshold in seconds (so that '
"Friday's last open WARC doesn't sit there all weekend "
'waiting for more data)'))
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
type=int, help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None, type=int,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try:
hash_algos = hashlib.algorithms_guaranteed
except AttributeError:
hash_algos = hashlib.algorithms
arg_parser.add_argument(
'-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of %s' % hash_algos)
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument(
'--method-filter', metavar='HTTP_METHOD',
action='append', help=(
'only record requests with the given http method(s) (can be '
'used more than once)'))
arg_parser.add_argument(
'--stats-db-file', dest='stats_db_file',
default=os.path.join(tmpdir, 'stats.db'), help=(
'persistent statistics database file; empty string or '
'/dev/null disables statistics tracking'))
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'-j', '--dedup-db-file', dest='dedup_db_file',
default=os.path.join(tmpdir, 'dedup.db'), help=(
'persistent deduplication database file; empty string or '
'/dev/null disables deduplication'))
group.add_argument(
'--rethinkdb-servers', dest='rethinkdb_servers', help=(
'rethinkdb servers, used for dedup and stats if specified; '
'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org'))
# arg_parser.add_argument(
# '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=(
# 'rethinkdb database name (ignored unless --rethinkdb-servers '
# 'is specified)'))
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument(
'--rethinkdb-big-table', dest='rethinkdb_big_table',
action='store_true', default=False, help=(
'use a big rethinkdb table called "captures", instead of a '
'small table called "dedup"; table is suitable for use as '
'index for playback (ignored unless --rethinkdb-servers is '
'specified)'))
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
default='captures', help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS)
arg_parser.add_argument(
'--queue-size', dest='queue_size', type=int, default=1, help=(
'max size of the queue of urls waiting to be processed by '
'the warc writer thread'))
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
arg_parser.add_argument(
'--max-threads', dest='max_threads', type=int, help=(
'number of proxy server threads (if not specified, chosen based '
'on system resource limits'))
arg_parser.add_argument(
'--version', action='version',
version='warcprox %s' % warcprox.__version__)
arg_parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
arg_parser.add_argument(
'--trace', dest='trace', action='store_true',
help='trace-level logging')
arg_parser.add_argument(
'--profile', dest='profile', action='store_true', default=False,
help='profile the warc writer thread')
'--plugin', metavar='PLUGIN_CLASS', dest='plugins',
action='append', help=(
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
'May be used multiple times to register multiple plugins. '
'Plugin classes are loaded from the regular python module '
'search path. They will be instantiated with no arguments and '
'must have a method `notify(self, recorded_url, records)` '
'which will be called for each url, after warc records have '
'been written.'))
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('--trace', dest='trace', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
arg_parser.add_argument(
'--requests', dest='requests', type=int, default=200,
help='number of urls to fetch')

View File

@ -39,8 +39,10 @@ deps = [
'certauth==1.1.6',
'warctools',
'urlcanon>=0.1.dev16',
'urllib3',
'doublethink>=0.2.0.dev81',
'PySocks',
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
]
try:
import concurrent.futures
@ -49,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.2b1.dev98',
version='2.2.1b2.dev107',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
@ -58,7 +60,7 @@ setuptools.setup(
license='GPL',
packages=['warcprox'],
install_requires=deps,
tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
cmdclass = {'test': PyTest},
test_suite='warcprox.tests',
entry_points={

View File

@ -40,7 +40,7 @@ do
&& (cd /warcprox && git diff HEAD) | patch -p1 \
&& virtualenv -p $python /tmp/venv \
&& source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest requests warcio \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
&& py.test -v tests \
&& py.test -v --rethinkdb-servers=localhost tests \
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"

46
tests/test_dedup.py Normal file
View File

@ -0,0 +1,46 @@
import mock
from warcprox.dedup import CdxServerDedup
def test_cdx_dedup():
# Mock CDX Server responses to simulate found, not found and errors.
with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request:
url = "http://example.com"
# not found case
result = mock.Mock()
result.status = 200
result.data = b'20170101020405 test'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# found case
result = mock.Mock()
result.status = 200
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res["date"] == b"2017-02-03T04:05:03Z"
# invalid CDX result status code
result = mock.Mock()
result.status = 400
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# invalid CDX result content
result = mock.Mock()
result.status = 200
result.data = b'InvalidExceptionResult'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None

View File

@ -60,6 +60,7 @@ except ImportError:
import certauth.certauth
import warcprox
import warcprox.main
try:
import http.client as http_client
@ -241,173 +242,44 @@ def https_daemon(request, cert):
return https_daemon
@pytest.fixture(scope="module")
def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
captures_db = None
def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
orig_dir = os.getcwd()
work_dir = tempfile.mkdtemp()
logging.info('changing to working directory %r', work_dir)
os.chdir(work_dir)
argv = ['warcprox',
'--method-filter=GET',
'--method-filter=POST',
'--port=0',
'--playback-port=0',
'--onion-tor-socks-proxy=localhost:9050']
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
if rethinkdb_big_table:
db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
rr = doublethink.Rethinker(servers, db)
captures_db = warcprox.bigtable.RethinkCaptures(rr)
captures_db.start()
rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
argv.append('--rethinkdb-servers=%s' % rethinkdb_servers)
argv.append('--rethinkdb-db=%s' % rethinkdb_db)
if rethinkdb_big_table:
argv.append('--rethinkdb-big-table')
def fin():
if captures_db:
captures_db.close()
# logging.info('dropping rethinkdb database {}'.format(db))
# result = captures_db.rr.db_drop(db).run()
# logging.info("result=%s", result)
request.addfinalizer(fin)
args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
return captures_db
@pytest.fixture(scope="module")
def rethink_dedup_db(request, rethinkdb_servers, captures_db):
ddb = None
if rethinkdb_servers:
if captures_db:
ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db)
else:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
rr = doublethink.Rethinker(servers, db)
ddb = warcprox.dedup.RethinkDedupDb(rr)
def fin():
if rethinkdb_servers:
ddb.close()
if not captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
result = ddb.rr.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
return ddb
@pytest.fixture(scope="module")
def dedup_db(request, rethink_dedup_db):
dedup_db_file = None
ddb = rethink_dedup_db
if not ddb:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
f.close()
dedup_db_file = f.name
ddb = warcprox.dedup.DedupDb(dedup_db_file)
def fin():
if dedup_db_file:
logging.info('deleting file {}'.format(dedup_db_file))
os.unlink(dedup_db_file)
request.addfinalizer(fin)
return ddb
@pytest.fixture(scope="module")
def stats_db(request, rethinkdb_servers):
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
rr = doublethink.Rethinker(servers, db)
sdb = warcprox.stats.RethinkStatsDb(rr)
sdb.start()
else:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
f.close()
stats_db_file = f.name
sdb = warcprox.stats.StatsDb(stats_db_file)
def fin():
sdb.close()
if rethinkdb_servers:
logging.info('dropping rethinkdb database {}'.format(db))
result = sdb.rr.db_drop(db).run()
logging.info("result=%s", result)
# else:
# logging.info('deleting file {}'.format(stats_db_file))
# os.unlink(stats_db_file)
request.addfinalizer(fin)
return sdb
@pytest.fixture(scope="module")
def service_registry(request, rethinkdb_servers):
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
rr = doublethink.Rethinker(servers, db)
def fin():
logging.info('dropping rethinkdb database {}'.format(db))
result = rr.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
return doublethink.ServiceRegistry(rr)
else:
return None
@pytest.fixture(scope="module")
def warcprox_(request, captures_db, dedup_db, stats_db, service_registry):
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
f.close() # delete it, or CertificateAuthority will try to read it
ca_file = f.name
ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test')
recorded_url_q = warcprox.TimestampedQueue()
options = warcprox.Options(port=0, playback_port=0,
onion_tor_socks_proxy='localhost:9050')
proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
stats_db=stats_db, options=options)
options.port = proxy.server_port
options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
f.close()
playback_index_db_file = f.name
playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file)
playback_proxy = warcprox.playback.PlaybackProxy(ca=ca,
playback_index_db=playback_index_db, options=options)
options.playback_proxy = playback_proxy.server_port
options.method_filter = ['GET','POST']
options.crawl_log_dir = tempfile.mkdtemp(
prefix='warcprox-test-', suffix='-crawl-log')
crawl_logger = warcprox.crawl_log.CrawlLogger(options.crawl_log_dir)
writer_pool = warcprox.writer.WarcWriterPool(options)
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=[
captures_db or dedup_db, playback_index_db, stats_db,
crawl_logger], options=options)
for i in range(int(proxy.max_threads ** 0.5))]
warcprox_ = warcprox.controller.WarcproxController(
proxy=proxy, warc_writer_threads=warc_writer_threads,
playback_proxy=playback_proxy, service_registry=service_registry,
options=options)
logging.info('starting warcprox')
warcprox_thread = threading.Thread(name='WarcproxThread',
target=warcprox_.run_until_shutdown)
warcprox_thread = threading.Thread(
name='WarcproxThread', target=warcprox_.run_until_shutdown)
warcprox_thread.start()
def fin():
logging.info('stopping warcprox')
warcprox_.stop.set()
warcprox_thread.join()
for f in (ca_file, ca_dir, options.directory, playback_index_db_file,
options.crawl_log_dir):
if os.path.isdir(f):
logging.info('deleting directory {}'.format(f))
shutil.rmtree(f)
else:
logging.info('deleting file {}'.format(f))
os.unlink(f)
if rethinkdb_servers:
logging.info('dropping rethinkdb database %r', rethinkdb_db)
rr = doublethink.Rethinker(rethinkdb_servers)
result = rr.db_drop(rethinkdb_db).run()
logging.info('deleting working directory %r', work_dir)
os.chdir(orig_dir)
shutil.rmtree(work_dir)
request.addfinalizer(fin)
return warcprox_
@ -683,6 +555,22 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"accept": ["capture-metadata"]}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200
assert response.headers['Warcprox-Meta']
data = json.loads(response.headers['Warcprox-Meta'])
assert data['capture-metadata']
try:
dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'],
'%Y-%m-%dT%H:%M:%SZ')
assert dt
except ValueError:
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
@ -1233,9 +1121,8 @@ def test_method_filter(
assert response.content == payload
def test_dedup_ok_flag(
https_daemon, http_daemon, warcprox_, archiving_proxies,
rethinkdb_big_table):
if not rethinkdb_big_table:
https_daemon, http_daemon, warcprox_, archiving_proxies):
if not warcprox_.options.rethinkdb_big_table:
# this feature is n/a unless using rethinkdb big table
return
@ -1319,11 +1206,11 @@ def test_status_api(warcprox_):
assert status['pid'] == os.getpid()
assert status['threads'] == warcprox_.proxy.pool._max_workers
def test_svcreg_status(warcprox_, service_registry):
if service_registry:
def test_svcreg_status(warcprox_):
if warcprox_.service_registry:
start = time.time()
while time.time() - start < 15:
status = service_registry.available_service('warcprox')
status = warcprox_.service_registry.available_service('warcprox')
if status:
break
time.sleep(0.5)
@ -1380,11 +1267,11 @@ def test_controller_with_defaults():
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_choose_a_port_for_me(service_registry):
def test_choose_a_port_for_me(warcprox_):
options = warcprox.Options()
options.port = 0
controller = warcprox.controller.WarcproxController(
service_registry=service_registry, options=options)
service_registry=warcprox_.service_registry, options=options)
assert controller.proxy.server_port != 0
assert controller.proxy.server_port != 8000
assert controller.proxy.server_address == (
@ -1400,12 +1287,12 @@ def test_choose_a_port_for_me(service_registry):
status = json.loads(response.content.decode('ascii'))
assert status['port'] == controller.proxy.server_port
if service_registry:
if warcprox_.service_registry:
# check that service registry entry lists the correct port
start = time.time()
ports = []
while time.time() - start < 30:
svcs = service_registry.available_services('warcprox')
svcs = warcprox_.service_registry.available_services('warcprox')
ports = [svc['port'] for svc in svcs]
if controller.proxy.server_port in ports:
break
@ -1552,6 +1439,44 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
'contentSize', 'warcFilename', 'warcFileOffset'}
assert extra_info['contentSize'] == 145
def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies):
url = 'http://localhost:%s/b/g' % http_daemon.server_port
# create a very long warcprox-meta header
headers = {'Warcprox-Meta': json.dumps({
'x':'y'*1000000, 'warc-prefix': 'test_long_warcprox_meta'})}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
assert response.status_code == 200
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
assert os.path.exists(warc_path)
# read the warc
with open(warc_path, 'rb') as f:
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
record = next(rec_iter)
assert record.rec_type == 'warcinfo'
record = next(rec_iter)
assert record.rec_type == 'response'
assert record.rec_headers.get_header('warc-target-uri') == url
record = next(rec_iter)
assert record.rec_type == 'request'
assert record.rec_headers.get_header('warc-target-uri') == url
with pytest.raises(StopIteration):
next(rec_iter)
if __name__ == '__main__':
pytest.main()

57
tests/test_writer.py Normal file
View File

@ -0,0 +1,57 @@
import os
import fcntl
from multiprocessing import Process, Queue
from datetime import datetime
import pytest
from warcprox.mitmproxy import ProxyingRecorder
from warcprox.warcproxy import RecordedUrl
from warcprox.writer import WarcWriter
from warcprox import Options
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain',
status=200, client_ip='127.0.0.2',
request_data=b'abc',
response_recorder=recorder,
remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
def lock_file(queue, filename):
"""Try to lock file and return 1 if successful, else return 0.
It is necessary to run this method in a different process to test locking.
"""
try:
fi = open(filename, 'ab')
fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB)
fi.close()
queue.put('OBTAINED LOCK')
except IOError:
queue.put('FAILED TO OBTAIN LOCK')
def test_warc_writer_locking(tmpdir):
"""Test if WarcWriter is locking WARC files.
When we don't have the .open suffix, WarcWriter locks the file and the
external process trying to ``lock_file`` fails (result=0).
"""
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True))
wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
assert warcs
target_warc = os.path.join(dirname, warcs[0])
# launch another process and try to lock WARC file
queue = Queue()
p = Process(target=lock_file, args=(queue, target_warc))
p.start()
p.join()
assert queue.get() == 'FAILED TO OBTAIN LOCK'
wwriter.close_writer()
# locking must succeed after writer has closed the WARC file.
p = Process(target=lock_file, args=(queue, target_warc))
p.start()
p.join()
assert queue.get() == 'OBTAINED LOCK'

View File

@ -25,8 +25,6 @@ USA.
from __future__ import absolute_import
import logging
from hanzo import warctools
import random
import warcprox
import base64
import urlcanon
@ -115,6 +113,7 @@ class RethinkCaptures:
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
self.rr.table(self.table).index_create("sha1_warc_type", [
r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run()
self.rr.table(self.table).index_wait().run()
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
if algo != "sha1":
@ -221,7 +220,7 @@ class RethinkCapturesDedup:
self.captures_db = captures_db
self.options = options
def lookup(self, digest_key, bucket="__unspecified__"):
def lookup(self, digest_key, bucket="__unspecified__", url=None):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
algo, value_str = k.split(":")
if self.options.base32:

View File

@ -176,7 +176,7 @@ class WarcproxController(object):
assert(all(
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
for wwt in self.warc_writer_threads))
if self.warc_writer_threads[0].dedup_db:
if any((t.dedup_db for t in self.warc_writer_threads)):
self.warc_writer_threads[0].dedup_db.start()
for wwt in self.warc_writer_threads:
@ -211,8 +211,6 @@ class WarcproxController(object):
if self.proxy.stats_db:
self.proxy.stats_db.stop()
if self.warc_writer_threads[0].dedup_db:
self.warc_writer_threads[0].dedup_db.close()
self.proxy_thread.join()
if self.playback_proxy is not None:

View File

@ -21,13 +21,17 @@ USA.
from __future__ import absolute_import
from datetime import datetime
import logging
import os
import json
from hanzo import warctools
import warcprox
import random
import sqlite3
import urllib3
from urllib3.exceptions import HTTPError
urllib3.disable_warnings()
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -55,15 +59,6 @@ class DedupDb(object):
conn.commit()
conn.close()
def stop(self):
pass
def close(self):
pass
def sync(self):
pass
def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
@ -82,7 +77,7 @@ class DedupDb(object):
conn.close()
self.logger.debug('dedup db saved %s:%s', key, json_value)
def lookup(self, digest_key, bucket=""):
def lookup(self, digest_key, bucket="", url=None):
result = None
key = digest_key.decode('utf-8') + '|' + bucket
conn = sqlite3.connect(self.file)
@ -117,9 +112,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = dedup_db.lookup(digest_key)
recorded_url.dedup_info = dedup_db.lookup(digest_key,
url=recorded_url.url)
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
@ -151,15 +148,6 @@ class RethinkDedupDb:
def start(self):
pass
def stop(self):
pass
def close(self):
pass
def sync(self):
pass
def save(self, digest_key, response_record, bucket=""):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
@ -173,7 +161,7 @@ class RethinkDedupDb:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', k, record)
def lookup(self, digest_key, bucket=""):
def lookup(self, digest_key, bucket="", url=None):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
result = self.rr.table(self.table).get(k).run()
@ -193,3 +181,66 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class CdxServerDedup(object):
"""Query a CDX server to perform deduplication.
"""
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
http_pool = urllib3.PoolManager()
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
options=warcprox.Options()):
self.cdx_url = cdx_url
self.options = options
def start(self):
pass
def save(self, digest_key, response_record, bucket=""):
"""Does not apply to CDX server, as it is obviously read-only.
"""
pass
def lookup(self, digest_key, url):
"""Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be
computed on the original content, after decoding Content-Encoding and
Transfer-Encoding, if any), if they match, write a revisit record.
Get only the last item (limit=-1) because Wayback Machine has special
performance optimisation to handle that. limit < 0 is very inefficient
in general. Maybe it could be configurable in the future.
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
:param url: Target URL string
Result must contain:
{"url": <URL>, "date": "%Y-%m-%dT%H:%M:%SZ"}
"""
u = url.decode("utf-8") if isinstance(url, bytes) else url
try:
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1))
assert result.status == 200
if isinstance(digest_key, bytes):
dkey = digest_key
else:
dkey = digest_key.encode('utf-8')
dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey
line = result.data.strip()
if line:
(cdx_ts, cdx_digest) = line.split(b' ')
if cdx_digest == dkey:
dt = datetime.strptime(cdx_ts.decode('ascii'),
'%Y%m%d%H%M%S')
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
return dict(url=url, date=date)
except (HTTPError, AssertionError, ValueError) as exc:
self.logger.error('CdxServerDedup request failed for url=%s %s',
url, exc)
return None
def notify(self, recorded_url, records):
"""Since we don't save anything to CDX server, this does not apply.
"""
pass

View File

@ -39,7 +39,6 @@ import signal
import threading
import certauth.certauth
import warcprox
import re
import doublethink
import cryptography.hazmat.backends.openssl
import importlib
@ -79,6 +78,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix',
default=False, action='store_true', help=argparse.SUPPRESS)
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument(
@ -107,6 +108,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
@ -195,6 +198,9 @@ def init_controller(args):
else:
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
listeners.append(dedup_db)
elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None

View File

@ -37,8 +37,15 @@ except ImportError:
import urlparse as urllib_parse
try:
import http.client as http_client
# In python3 http.client.parse_headers() enforces http_client._MAXLINE
# as max length of an http header line, but we want to support very
# long warcprox-meta headers, so we tweak it here. Python2 doesn't seem
# to enforce any limit. Multiline headers could be an option but it
# turns out those are illegal as of RFC 7230. Plus, this is easier.
http_client._MAXLINE = 4194304 # 4 MiB
except ImportError:
import httplib as http_client
import json
import socket
import logging
import ssl
@ -157,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp, proxy_client, digest_algorithm, url=url)
self.fp = self.recorder
def begin(self):
def begin(self, extra_response_headers={}):
http_client.HTTPResponse.begin(self) # reads status line, headers
status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(
self.status, self.reason)
self.msg['Via'] = via_header_value(
self.msg.get('Via'), '%0.1f' % (self.version / 10.0))
if extra_response_headers:
for header, value in extra_response_headers.items():
self.msg[header] = value
for k,v in self.msg.items():
if k.lower() not in (
'connection', 'proxy-connection', 'keep-alive',
@ -355,12 +366,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error("exception proxying request", exc_info=True)
raise
def _proxy_request(self):
def _proxy_request(self, extra_response_headers={}):
'''
Sends the request to the remote server, then uses a ProxyingRecorder to
read the response and send it to the proxy client, while recording the
bytes in transit. Returns a tuple (request, response) where request is
the raw request bytes, and response is a ProxyingRecorder.
:param extra_response_headers: generated on warcprox._proxy_request.
It may contain extra HTTP headers such as ``Warcprox-Meta`` which
are written in the WARC record for this request.
'''
# Build request
req_str = '{} {} {}\r\n'.format(
@ -401,7 +416,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_sock, proxy_client=self.connection,
digest_algorithm=self.server.digest_algorithm,
url=self.url, method=self.command)
prox_rec_res.begin()
prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(8192)
while buf != b'':

View File

@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler):
def _send_headers_and_refd_payload(
self, headers, refers_to, refers_to_target_uri, refers_to_date):
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
"""Parameters:
"""
location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, record_id=refers_to)
refers_to_target_uri, refers_to_date, payload_digest)
self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o'])
@ -131,7 +134,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler):
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(
warctools.WarcRecord.REFERS_TO).decode('latin1')
refers_to_target_uri = record.get_header(
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
'latin1')
refers_to_date = record.get_header(
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
payload_digest = record.get_header(
warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
self.logger.debug(
'revisit record references %s:%s capture of %s',
refers_to_date, refers_to, refers_to_target_uri)
refers_to_date, payload_digest, refers_to_target_uri)
return self._send_headers_and_refd_payload(
record.content[1], refers_to, refers_to_target_uri,
refers_to_date)
record.content[1], refers_to_target_uri, refers_to_date,
payload_digest)
else:
# send it back raw, whatever it is
@ -264,12 +266,12 @@ class PlaybackIndexDb(object):
# XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
# there could be two visits of same url in the same second, and WARC-Date is
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...}
with self._lock:
conn = sqlite3.connect(self.file)
@ -283,10 +285,10 @@ class PlaybackIndexDb(object):
if date_str in py_value:
py_value[date_str].append(
{'f':warcfile, 'o':offset, 'i':record_id_str})
{'f': warcfile, 'o': offset, 'd': payload_digest_str})
else:
py_value[date_str] = [
{'f':warcfile, 'o':offset, 'i':record_id_str}]
{'f': warcfile, 'o': offset, 'd': payload_digest_str}]
json_value = json.dumps(py_value, separators=(',',':'))
@ -314,11 +316,11 @@ class PlaybackIndexDb(object):
latest_date = max(py_value)
result = py_value[latest_date][0]
result['i'] = result['i'].encode('ascii')
result['d'] = result['d'].encode('ascii')
return latest_date, result
# in python3 params are bytes
def lookup_exact(self, url, warc_date, record_id):
def lookup_exact(self, url, warc_date, payload_digest):
conn = sqlite3.connect(self.file)
cursor = conn.execute(
'select value from playback where url = ?', (url,))
@ -334,14 +336,13 @@ class PlaybackIndexDb(object):
if warc_date in py_value:
for record in py_value[warc_date]:
if record['i'] == record_id:
if record['d'] == payload_digest:
self.logger.debug(
"found exact match for (%r,%r,%r)",
warc_date, record_id, url)
record['i'] = record['i']
warc_date, payload_digest, url)
record['d'] = record['d']
return record
else:
self.logger.info(
"match not found for (%r,%r,%r)", warc_date, record_id, url)
"match not found for (%r,%r,%r)", warc_date, payload_digest, url)
return None

View File

@ -25,7 +25,6 @@ import logging
import os
import json
from hanzo import warctools
import random
import warcprox
import threading
import rethinkdb as r

View File

@ -27,7 +27,6 @@ import hashlib
import socket
import hanzo.httptools
from hanzo import warctools
import warcprox
import datetime
class WarcRecordBuilder:
@ -51,7 +50,7 @@ class WarcRecordBuilder:
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=recorded_url.dedup_info['id'],
refers_to=recorded_url.dedup_info.get('id'),
refers_to_target_uri=recorded_url.dedup_info['url'],
refers_to_date=recorded_url.dedup_info['date'],
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),

View File

@ -35,15 +35,12 @@ try:
except ImportError:
import Queue as queue
import logging
import re
import traceback
import json
import socket
from hanzo import warctools
from certauth.certauth import CertificateAuthority
import warcprox
import datetime
import ipaddress
import urlcanon
import os
@ -195,9 +192,22 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
remote_ip = self._remote_server_sock.getpeername()[0]
timestamp = datetime.datetime.utcnow()
extra_response_headers = {}
if warcprox_meta and 'accept' in warcprox_meta and \
'capture-metadata' in warcprox_meta['accept']:
rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}}
extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:')
req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request(
self)
self, extra_response_headers=extra_response_headers)
content_type = None
try:
content_type = prox_rec_res.headers.get('content-type')
except AttributeError: # py2
raw = prox_rec_res.msg.getrawheader('content-type')
if raw:
content_type = raw.strip()
recorded_url = RecordedUrl(
url=self.url, request_data=req,
@ -205,8 +215,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta=warcprox_meta, status=prox_rec_res.status,
size=prox_rec_res.recorder.len,
client_ip=self.client_address[0],
content_type=prox_rec_res.getheader("Content-Type"),
method=self.command, timestamp=timestamp, host=self.hostname,
content_type=content_type, method=self.command,
timestamp=timestamp, host=self.hostname,
duration=datetime.datetime.utcnow()-timestamp,
referer=self.headers.get('referer'))
self.server.recorded_url_q.put(recorded_url)

View File

@ -24,10 +24,10 @@ from __future__ import absolute_import
import logging
from datetime import datetime
from hanzo import warctools
import fcntl
import time
import warcprox
import os
import socket
import string
import random
import threading
@ -54,6 +54,7 @@ class WarcWriter:
self._f = None
self._fpath = None
self._f_finalname = None
self._f_open_suffix = '' if options.no_warc_open_suffix else '.open'
self._serial = 0
self._lock = threading.RLock()
@ -71,6 +72,12 @@ class WarcWriter:
with self._lock:
if self._fpath:
self.logger.info('closing %s', self._f_finalname)
if self._f_open_suffix == '':
try:
fcntl.lockf(self._f, fcntl.LOCK_UN)
except IOError as exc:
self.logger.error('could not unlock file %s (%s)',
self._fpath, exc)
self._f.close()
finalpath = os.path.sep.join(
[self.directory, self._f_finalname])
@ -92,9 +99,17 @@ class WarcWriter:
self.prefix, self.timestamp17(), self._serial,
self._randomtoken, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([
self.directory, self._f_finalname + '.open'])
self.directory, self._f_finalname + self._f_open_suffix])
self._f = open(self._fpath, 'wb')
# if no '.open' suffix is used for WARC, acquire an exclusive
# file lock.
if self._f_open_suffix == '':
try:
fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as exc:
self.logger.error('could not lock file %s (%s)',
self._fpath, exc)
warcinfo_record = self.record_builder.build_warcinfo_record(
self._f_finalname)

View File

@ -29,13 +29,8 @@ except ImportError:
import logging
import threading
import os
import hashlib
import time
import socket
import base64
from datetime import datetime
import hanzo.httptools
from hanzo import warctools
import warcprox
import cProfile
@ -46,7 +41,7 @@ class WarcWriterThread(threading.Thread):
def __init__(
self, name='WarcWriterThread', recorded_url_q=None,
writer_pool=None, dedup_db=None, listeners=None,
writer_pool=None, dedup_db=None, listeners=[],
options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name=name)