Merge branch 'wip-postfetch-chain' into qa

* wip-postfetch-chain:
  postfetch chain info for /status and service reg
  batch for at least 2 seconds
  batch storing for trough dedup
  fixes to make tests pass
  use batch postfetch processor for stats
  don't keep next processor waiting
  include RunningStats raw stats in status info
  make --profile work again
  improve batching, make tests pass
  batch trough dedup loader
  fix running_stats thing
  make run-benchmarks.py work (with no args)
  keep running stats
  shut down postfetch processors
  tests are passing
  slightly less incomplete work on new postfetch processor chain
  very incomplete work on new postfetch processor chain
  Update CdxServerDedup unit test
  Chec writer._fname in unit test
  Configurable CdxServerDedup urllib3 connection pool size
  roll over idle warcs on time
  Yet another unit test fix
  Change the writer unit test
  fix github problem with unit test
  Another fix for the unit test
  Fix writer unit test
  Add WarcWriter warc_filename unit test
  Fix warc_filename default value
  Configurable WARC filenames
  fix logging.notice/trace methods which were masking file/line/function of log message
  update test_svcreg_status to expect new fields
  change where RunningStats is initialized and fix tests
  more stats available from /status (and in rethindkb services table)
  timeouts for trough requests to prevent hanging
  dropping claim of support for python 2.7 (not worth hacking around tempfile.TemporaryDirectory to make tests pass)
  implementation of special prefix "-" which means "do not archive"
  test for special warc prefix "-" which means "do not archive"
  if --profile is enabled, dump results every ten minutes, as well as at shutdown
This commit is contained in:
Noah Levitt 2018-01-18 11:14:37 -08:00
commit a7545e3614
19 changed files with 1334 additions and 851 deletions

View File

@ -15,6 +15,8 @@ matrix:
allow_failures:
- python: nightly
- python: 3.7-dev
- python: 2.7
- python: pypy
addons:
apt:

View File

@ -9,7 +9,7 @@ https://github.com/allfro/pymiproxy
Install
~~~~~~~
Warcprox runs on python 2.7 or 3.4+.
Warcprox runs on python 3.4+.
To install latest release run:
@ -41,16 +41,18 @@ Usage
usage: warcprox [-h] [-p PORT] [-b ADDRESS] [-c CACERT]
[--certs-dir CERTS_DIR] [-d DIRECTORY] [-z] [-n PREFIX]
[-s SIZE] [--rollover-idle-time ROLLOVER_IDLE_TIME]
[-s ROLLOVER_SIZE]
[--rollover-idle-time ROLLOVER_IDLE_TIME]
[-g DIGEST_ALGORITHM] [--base32]
[--method-filter HTTP_METHOD]
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
[--stats-db-file STATS_DB_FILE | --rethinkdb-stats-url RETHINKDB_STATS_URL]
[-P PLAYBACK_PORT]
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
[--cdxserver-dedup CDX_SERVER_URL]
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
[-j DEDUP_DB_FILE | --rethinkdb-dedup-url RETHINKDB_DEDUP_URL | --rethinkdb-big-table-url RETHINKDB_BIG_TABLE_URL | --rethinkdb-trough-db-url RETHINKDB_TROUGH_DB_URL | --cdxserver-dedup CDXSERVER_DEDUP]
[--rethinkdb-services-url RETHINKDB_SERVICES_URL]
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
[--crawl-log-dir CRAWL_LOG_DIR] [--plugin PLUGIN_CLASS]
[--version] [-v] [--trace] [-q]
warcprox - WARC writing MITM HTTP/S proxy
@ -61,35 +63,38 @@ Usage
address to listen on (default: localhost)
-c CACERT, --cacert CACERT
CA certificate file; if file does not exist, it
will be created (default:
./ayutla.monkeybrains.net-warcprox-ca.pem)
will be created (default: ./ayutla.local-warcprox-
ca.pem)
--certs-dir CERTS_DIR
where to store and load generated certificates
(default: ./ayutla.monkeybrains.net-warcprox-ca)
(default: ./ayutla.local-warcprox-ca)
-d DIRECTORY, --dir DIRECTORY
where to write warcs (default: ./warcs)
-z, --gzip write gzip-compressed warc records
-n PREFIX, --prefix PREFIX
WARC filename prefix (default: WARCPROX)
-s SIZE, --size SIZE WARC file rollover size threshold in bytes
default WARC filename prefix (default: WARCPROX)
-s ROLLOVER_SIZE, --size ROLLOVER_SIZE
WARC file rollover size threshold in bytes
(default: 1000000000)
--rollover-idle-time ROLLOVER_IDLE_TIME
WARC file rollover idle time threshold in seconds
(so that Friday's last open WARC doesn't sit
there all weekend waiting for more data)
(default: None)
(so that Friday's last open WARC doesn't sit there
all weekend waiting for more data) (default: None)
-g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM
digest algorithm, one of sha1, sha384, sha512,
md5, sha224, sha256 (default: sha1)
digest algorithm, one of sha256, sha224, sha512,
sha384, md5, sha1 (default: sha1)
--base32 write digests in Base32 instead of hex
--method-filter HTTP_METHOD
only record requests with the given http
method(s) (can be used more than once) (default:
None)
only record requests with the given http method(s)
(can be used more than once) (default: None)
--stats-db-file STATS_DB_FILE
persistent statistics database file; empty string
or /dev/null disables statistics tracking
(default: ./warcprox.sqlite)
--rethinkdb-stats-url RETHINKDB_STATS_URL
rethinkdb stats table url, e.g. rethinkdb://db0.fo
o.org,db1.foo.org:38015/my_warcprox_db/my_stats_ta
ble (default: None)
-P PLAYBACK_PORT, --playback-port PLAYBACK_PORT
port to listen on for instant playback (default:
None)
@ -101,36 +106,44 @@ Usage
persistent deduplication database file; empty
string or /dev/null disables deduplication
(default: ./warcprox.sqlite)
--cdxserver-dedup CDX_SERVER_URL
use a CDX server for deduplication
--rethinkdb-dedup-url RETHINKDB_DEDUP_URL
rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,
db1.foo.org:38015/my_warcprox_db/my_dedup_table
(default: None)
--rethinkdb-servers RETHINKDB_SERVERS
rethinkdb servers, used for dedup and stats if
specified; e.g.
db0.foo.org,db0.foo.org:38015,db1.foo.org
(default: None)
--rethinkdb-db RETHINKDB_DB
rethinkdb database name (ignored unless
--rethinkdb-servers is specified) (default:
warcprox)
--rethinkdb-big-table
use a big rethinkdb table called "captures",
instead of a small table called "dedup"; table is
suitable for use as index for playback (ignored
unless --rethinkdb-servers is specified)
--rethinkdb-big-table-url RETHINKDB_BIG_TABLE_URL
rethinkdb big table url (table will be populated
with various capture information and is suitable
for use as index for playback), e.g. rethinkdb://d
b0.foo.org,db1.foo.org:38015/my_warcprox_db/captur
es (default: None)
--rethinkdb-trough-db-url RETHINKDB_TROUGH_DB_URL
🐷 url pointing to trough configuration rethinkdb
database, e.g. rethinkdb://db0.foo.org,db1.foo.org
:38015/trough_configuration (default: None)
--cdxserver-dedup CDXSERVER_DEDUP
use a CDX Server URL for deduplication; e.g.
https://web.archive.org/cdx/search (default: None)
--rethinkdb-services-url RETHINKDB_SERVICES_URL
rethinkdb service registry table url; if provided,
warcprox will create and heartbeat entry for
itself (default: None)
--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY
host:port of tor socks proxy, used only to
connect to .onion sites (default: None)
host:port of tor socks proxy, used only to connect
to .onion sites (default: None)
--crawl-log-dir CRAWL_LOG_DIR
if specified, write crawl log files in the
specified directory; one crawl log is written per
warc filename prefix; crawl log format mimics
heritrix (default: None)
--plugin PLUGIN_CLASS
Qualified name of plugin class, e.g.
"mypkg.mymod.MyClass". May be used multiple times
to register multiple plugins. Plugin classes are
loaded from the regular python module search
path. They will be instantiated with no arguments
and must have a method `notify(self,
recorded_url, records)` which will be called for
each url, after warc records have been written.
(default: None)
loaded from the regular python module search path.
They will be instantiated with no arguments and
must have a method `notify(self, recorded_url,
records)` which will be called for each url, after
warc records have been written. (default: None)
--version show program's version number and exit
-v, --verbose
--trace

View File

@ -215,9 +215,9 @@ if __name__ == '__main__':
args.cacert = os.path.join(tmpdir, 'benchmark-warcprox-ca.pem')
args.certs_dir = os.path.join(tmpdir, 'benchmark-warcprox-ca')
args.directory = os.path.join(tmpdir, 'warcs')
if args.rethinkdb_servers:
args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % (
datetime.datetime.utcnow())
# if args.rethinkdb_servers:
# args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % (
# datetime.datetime.utcnow())
start_servers()
logging.info(
@ -247,7 +247,9 @@ if __name__ == '__main__':
logging.info('SKIPPED')
logging.info('===== baseline benchmark finished =====')
warcprox_controller = warcprox.main.init_controller(args)
options = warcprox.Options(**vars(args))
warcprox_controller = warcprox.controller.WarcproxController(options)
warcprox_controller_thread = threading.Thread(
target=warcprox_controller.run_until_shutdown)
warcprox_controller_thread.start()

View File

@ -52,7 +52,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.3.1b4.dev127',
version='2.3.1b4.dev138',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
@ -76,7 +76,6 @@ setuptools.setup(
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'License :: OSI Approved :: GNU General Public License (GPL)',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',

View File

@ -4,43 +4,43 @@ from warcprox.dedup import CdxServerDedup
def test_cdx_dedup():
# Mock CDX Server responses to simulate found, not found and errors.
with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request:
url = "http://example.com"
# not found case
result = mock.Mock()
result.status = 200
result.data = b'20170101020405 test'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
url = "http://example.com"
# not found case
result = mock.Mock()
result.status = 200
result.data = b'20170101020405 test'
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
cdx_server.http_pool.request = mock.MagicMock(return_value=result)
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# found case
result = mock.Mock()
result.status = 200
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res["date"] == b"2017-02-03T04:05:03Z"
# found case
result = mock.Mock()
result.status = 200
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
cdx_server.http_pool.request = mock.MagicMock(return_value=result)
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res["date"] == b"2017-02-03T04:05:03Z"
# invalid CDX result status code
result = mock.Mock()
result.status = 400
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# invalid CDX result content
result = mock.Mock()
result.status = 200
result.data = b'InvalidExceptionResult'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# invalid CDX result status code
result = mock.Mock()
result.status = 400
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
cdx_server.http_pool.request = mock.MagicMock(return_value=result)
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# invalid CDX result content
result = mock.Mock()
result.status = 200
result.data = b'InvalidExceptionResult'
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
cdx_server.http_pool.request = mock.MagicMock(return_value=result)
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None

View File

@ -96,6 +96,14 @@ logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
def wait(callback, timeout=10):
start = time.time()
while time.time() - start < timeout:
if callback():
return
time.sleep(0.1)
raise Exception('timed out waiting for %s to return truthy' % callback)
# monkey patch dns lookup so we can test domain inheritance on localhost
orig_getaddrinfo = socket.getaddrinfo
orig_gethostbyname = socket.gethostbyname
@ -339,6 +347,9 @@ def warcprox_(request):
logging.info('changing to working directory %r', work_dir)
os.chdir(work_dir)
# we can't wait around all day in the tests
warcprox.BaseBatchPostfetchProcessor.MAX_BATCH_SEC = 0.5
argv = ['warcprox',
'--method-filter=GET',
'--method-filter=POST',
@ -357,9 +368,12 @@ def warcprox_(request):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
options = warcprox.Options(**vars(args))
warcprox_ = warcprox.controller.WarcproxController(options)
logging.info('starting warcprox')
warcprox_.start()
warcprox_thread = threading.Thread(
name='WarcproxThread', target=warcprox_.run_until_shutdown)
warcprox_thread.start()
@ -431,17 +445,9 @@ def test_httpds_no_proxy(http_daemon, https_daemon):
assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
def _poll_playback_until(playback_proxies, url, status, timeout_sec):
start = time.time()
# check playback (warc writing is asynchronous, give it up to 10 sec)
while time.time() - start < timeout_sec:
response = requests.get(url, proxies=playback_proxies, verify=False)
if response.status_code == status:
break
time.sleep(0.5)
return response
def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies):
url = 'http://localhost:{}/a/b'.format(http_daemon.server_port)
# ensure playback fails before archiving
@ -455,12 +461,17 @@ def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_
assert response.headers['warcprox-test-header'] == 'a!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'a!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies):
def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
url = 'https://localhost:{}/c/d'.format(https_daemon.server_port)
# ensure playback fails before archiving
@ -474,14 +485,19 @@ def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playbac
assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
# test dedup of same http url with same payload
def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/e/f'.format(http_daemon.server_port)
# ensure playback fails before archiving
@ -490,8 +506,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup is None
# archive
@ -500,21 +516,17 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
@ -525,7 +537,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# need revisit to have a later timestamp than original, else playing
# back the latest record might not hit the revisit
time.sleep(1.5)
time.sleep(1.1)
# fetch & archive revisit
response = requests.get(url, proxies=archiving_proxies, verify=False)
@ -533,14 +545,11 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
@ -548,7 +557,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# test playback
logging.debug('testing playback of revisit of {}'.format(url))
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
@ -556,6 +565,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# test dedup of same https url with same payload
def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'https://localhost:{}/g/h'.format(https_daemon.server_port)
# ensure playback fails before archiving
@ -564,7 +575,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup is None
@ -574,21 +585,18 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
@ -599,7 +607,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# need revisit to have a later timestamp than original, else playing
# back the latest record might not hit the revisit
time.sleep(1.5)
time.sleep(1.1)
# fetch & archive revisit
response = requests.get(url, proxies=archiving_proxies, verify=False)
@ -607,14 +615,11 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
@ -622,13 +627,15 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# test playback
logging.debug('testing playback of revisit of {}'.format(url))
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# XXX how to check dedup was used?
def test_limits(http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket/total/urls":10}}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
@ -638,11 +645,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers['warcprox-test-header'] == 'i!'
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
for i in range(9):
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
@ -650,11 +654,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers['warcprox-test-header'] == 'i!'
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(2.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 10)
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420
@ -665,6 +666,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"accept": ["capture-metadata"]}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
@ -680,7 +683,12 @@ def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
except ValueError:
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
# wait for postfetch chain (or subsequent test could fail)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
@ -691,15 +699,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
# logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
assert dedup_lookup
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -707,7 +714,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup is None
@ -718,14 +725,11 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup['url'] == url2.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
@ -740,11 +744,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# archive url1 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
@ -753,17 +754,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
# close the warc
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer()
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path)
# read the warc
@ -821,6 +819,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
fh.close()
def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
rules = [
{
"domain": "localhost",
@ -857,6 +857,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# blocked by SURT_MATCH
url = 'http://localhost:{}/fuh/guh'.format(http_daemon.server_port)
response = requests.get(
@ -872,6 +875,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
# 404 because server set up at the top of this file doesn't handle this url
assert response.status_code == 404
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# not blocked because surt scheme does not match (differs from heritrix
# behavior where https urls are coerced to http surt form)
url = 'https://localhost:{}/fuh/guh'.format(https_daemon.server_port)
@ -880,6 +886,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
verify=False)
assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# blocked by blanket domain block
url = 'http://bad.domain.com/'
response = requests.get(
@ -932,6 +941,8 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
def test_domain_doc_soft_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
request_meta = {
"stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]},
"soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10},
@ -946,11 +957,8 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# make sure stats from different domain don't count
url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port)
@ -961,15 +969,10 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11)
# (2) same host but different scheme and port: domain limit applies
#
url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
@ -988,12 +991,12 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
time.sleep(3)
logging.info(
'warcprox_.proxy.running_stats.urls - urls_before = %s',
warcprox_.proxy.running_stats.urls - urls_before)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 19)
# (10)
response = requests.get(
@ -1003,12 +1006,8 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20)
# (11) back to http, and this is the 11th request
url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port)
@ -1030,6 +1029,9 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21)
# https also blocked
url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get(
@ -1056,6 +1058,8 @@ def test_domain_doc_soft_limit(
def test_domain_data_soft_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# using idn
request_meta = {
"stats": {"buckets": [{"bucket":"test_domain_data_limit_bucket","tally-domains":['ÞzZ.LOCALhost']}]},
@ -1071,12 +1075,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# duplicate, does not count toward limit
url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port)
@ -1087,12 +1087,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# novel, pushes stats over the limit
url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port)
@ -1103,12 +1099,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# make sure limit doesn't get applied to a different host
url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port)
@ -1118,6 +1110,9 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
# blocked because we're over the limit now
url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port)
response = requests.get(
@ -1149,7 +1144,9 @@ def test_domain_data_soft_limit(
# connection to the internet, and relies on a third party site (facebook) being
# up and behaving a certain way
@pytest.mark.xfail
def test_tor_onion(archiving_proxies):
def test_tor_onion(archiving_proxies, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
response = requests.get('http://www.facebookcorewwwi.onion/',
proxies=archiving_proxies, verify=False, allow_redirects=False)
assert response.status_code == 302
@ -1158,7 +1155,12 @@ def test_tor_onion(archiving_proxies):
proxies=archiving_proxies, verify=False, allow_redirects=False)
assert response.status_code == 200
def test_missing_content_length(archiving_proxies, http_daemon, https_daemon):
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
def test_missing_content_length(archiving_proxies, http_daemon, https_daemon, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
# double-check that our test http server is responding as expected
url = 'http://localhost:%s/missing-content-length' % (
http_daemon.server_port)
@ -1195,8 +1197,14 @@ def test_missing_content_length(archiving_proxies, http_daemon, https_daemon):
b'This response is missing a Content-Length http header.')
assert not 'content-length' in response.headers
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
def test_method_filter(
https_daemon, http_daemon, archiving_proxies, playback_proxies):
warcprox_, https_daemon, http_daemon, archiving_proxies,
playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# we've configured warcprox with method_filters=['GET','POST'] so HEAD
# requests should not be archived
@ -1207,7 +1215,10 @@ def test_method_filter(
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b''
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 404
assert response.content == b'404 Not in Archive\n'
@ -1224,13 +1235,17 @@ def test_method_filter(
headers=headers, proxies=archiving_proxies)
assert response.status_code == 204
response = _poll_playback_until(
playback_proxies, url, status=200, timeout_sec=10)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert response.content == payload
def test_dedup_ok_flag(
https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
if not warcprox_.options.rethinkdb_big_table:
# this feature is n/a unless using rethinkdb big table
return
@ -1238,7 +1253,7 @@ def test_dedup_ok_flag(
url = 'http://localhost:{}/z/b'.format(http_daemon.server_port)
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1252,13 +1267,11 @@ def test_dedup_ok_flag(
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that dedup db doesn't give us anything for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1273,19 +1286,17 @@ def test_dedup_ok_flag(
assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# check that dedup db gives us something for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup
# inspect what's in rethinkdb more closely
rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db
rethink_captures = warcprox_.dedup_db.captures_db
results_iter = rethink_captures.rr.table(rethink_captures.table).get_all(
['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response',
'test_dedup_ok_flag'], index='sha1_warc_type').order_by(
@ -1308,7 +1319,10 @@ def test_status_api(warcprox_):
status = json.loads(response.content.decode('ascii'))
assert set(status.keys()) == {
'role', 'version', 'host', 'address', 'port', 'pid', 'load',
'queued_urls', 'queue_max_size', 'seconds_behind', 'threads'}
'queued_urls', 'queue_max_size', 'seconds_behind', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min',
'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port
@ -1327,7 +1341,10 @@ def test_svcreg_status(warcprox_):
assert set(status.keys()) == {
'id', 'role', 'version', 'host', 'port', 'pid', 'load',
'queued_urls', 'queue_max_size', 'seconds_behind',
'first_heartbeat', 'ttl', 'last_heartbeat', 'threads'}
'first_heartbeat', 'ttl', 'last_heartbeat', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests',
'rates_15min', 'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port
@ -1361,32 +1378,36 @@ def test_controller_with_defaults():
assert controller.proxy.recorded_url_q
assert controller.proxy.server_address == ('127.0.0.1', 8000)
assert controller.proxy.server_port == 8000
for wwt in controller.warc_writer_threads:
assert wwt
assert wwt.recorded_url_q
assert wwt.recorded_url_q is controller.proxy.recorded_url_q
assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.gzip is False
assert wwt.writer_pool.default_warc_writer.record_builder
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
assert controller.proxy.running_stats
assert not controller.proxy.stats_db
wwt = controller.warc_writer_thread
assert wwt
assert wwt.inq
assert wwt.outq
assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.gzip is False
assert wwt.writer_pool.default_warc_writer.record_builder
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_choose_a_port_for_me(warcprox_):
options = warcprox.Options()
options.port = 0
controller = warcprox.controller.WarcproxController(
service_registry=warcprox_.service_registry, options=options)
if warcprox_.service_registry:
options.rethinkdb_services_url = 'rethinkdb://localhost/test0/services'
controller = warcprox.controller.WarcproxController(options)
assert controller.proxy.server_port != 0
assert controller.proxy.server_port != 8000
assert controller.proxy.server_address == (
'127.0.0.1', controller.proxy.server_port)
th = threading.Thread(target=controller.run_until_shutdown)
controller.start()
th.start()
try:
# check that the status api lists the correct port
@ -1412,16 +1433,21 @@ def test_choose_a_port_for_me(warcprox_):
th.join()
def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:%s/a/z' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies)
assert response.headers['via'] == '1.1 warcprox'
playback_response = _poll_playback_until(
playback_proxies, url, status=200, timeout_sec=10)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
playback_response = requests.get(
url, proxies=playback_proxies, verify=False)
assert response.status_code == 200
assert not 'via' in playback_response
warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath
warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath
with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url:
@ -1444,15 +1470,19 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies):
assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix'
def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
try:
os.unlink(os.path.join(warcprox_.options.crawl_log_dir, 'crawl.log'))
except:
pass
# should go to default crawl log
url = 'http://localhost:%s/b/aa' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies)
assert response.status_code == 200
# should go to test_crawl_log_1.log
url = 'http://localhost:%s/b/bb' % http_daemon.server_port
headers = {
"Warcprox-Meta": json.dumps({"warc-prefix":"test_crawl_log_1"}),
@ -1461,13 +1491,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 200
start = time.time()
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
assert os.stat(file).st_size > 0
assert os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log'))
@ -1522,13 +1551,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 200
start = time.time()
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
assert os.stat(file).st_size > 0
crawl_log_2 = open(file, 'rb').read()
@ -1552,17 +1580,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert extra_info['contentSize'] == 145
# a request that is not saved to a warc (because of --method-filter)
# currently not logged at all (XXX maybe it should be)
url = 'http://localhost:%s/b/cc' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})}
response = requests.head(url, proxies=archiving_proxies, headers=headers)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')
start = time.time()
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
crawl_log_3 = open(file, 'rb').read()
@ -1597,13 +1622,10 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
headers=headers, proxies=archiving_proxies)
assert response.status_code == 204
start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 5)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')
assert os.path.exists(file)
crawl_log_4 = open(file, 'rb').read()
@ -1628,6 +1650,8 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:%s/b/g' % http_daemon.server_port
# create a very long warcprox-meta header
@ -1637,17 +1661,14 @@ def test_long_warcprox_meta(
url, proxies=archiving_proxies, headers=headers, verify=False)
assert response.status_code == 200
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
assert os.path.exists(warc_path)
# read the warc
@ -1667,7 +1688,6 @@ def test_long_warcprox_meta(
def test_empty_response(
warcprox_, http_daemon, https_daemon, archiving_proxies,
playback_proxies):
url = 'http://localhost:%s/empty-response' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502

View File

@ -1,22 +1,39 @@
'''
tests/test_writer.py - warcprox warc writing tests
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
import os
import fcntl
from multiprocessing import Process, Queue
from datetime import datetime
import pytest
import re
from warcprox.mitmproxy import ProxyingRecorder
from warcprox.warcproxy import RecordedUrl
from warcprox.writer import WarcWriter
from warcprox import Options
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain',
status=200, client_ip='127.0.0.2',
request_data=b'abc',
response_recorder=recorder,
remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
import time
import warcprox
import io
import tempfile
import logging
def lock_file(queue, filename):
"""Try to lock file and return 1 if successful, else return 0.
@ -36,6 +53,13 @@ def test_warc_writer_locking(tmpdir):
When we don't have the .open suffix, WarcWriter locks the file and the
external process trying to ``lock_file`` fails (result=0).
"""
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
recorded_url = RecordedUrl(
url='http://example.com', content_type='text/plain', status=200,
client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True))
wwriter.write_records(recorded_url)
@ -55,3 +79,101 @@ def test_warc_writer_locking(tmpdir):
p.start()
p.join()
assert queue.get() == 'OBTAINED LOCK'
def wait(callback, timeout):
start = time.time()
while time.time() - start < timeout:
if callback():
return
time.sleep(0.5)
raise Exception('timed out waiting for %s to return truthy' % callback)
def test_special_dont_write_prefix():
with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('cd %s', tmpdir)
os.chdir(tmpdir)
wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-'))
wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1)
try:
wwt.start()
# not to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwt.inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
# to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwt.inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'normal-warc-prefix'}))
recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records
recorded_url = wwt.outq.get(timeout=10)
assert recorded_url.warc_records
assert wwt.outq.empty()
finally:
wwt.stop.set()
wwt.join()
wwt = warcprox.writerthread.WarcWriterThread()
wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1)
try:
wwt.start()
# to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwt.inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
# not to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwt.inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': '-'}))
recorded_url = wwt.outq.get(timeout=10)
assert recorded_url.warc_records
recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records
assert wwt.outq.empty()
finally:
wwt.stop.set()
wwt.join()
def test_warc_writer_filename(tmpdir):
"""Test if WarcWriter is writing WARC files with custom filenames.
"""
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
recorded_url = RecordedUrl(
url='http://example.com', content_type='text/plain', status=200,
client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, prefix='foo',
warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}'))
wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname)]
assert warcs
assert re.search('\d{17}_foo_\d{14}_00000.warc.open', wwriter._fpath)

View File

@ -1,7 +1,7 @@
"""
warcprox/__init__.py - warcprox package main file, contains some utility code
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -19,6 +19,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
"""
import sys
import datetime
import threading
import time
import logging
from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version
@ -26,7 +31,6 @@ try:
import queue
except ImportError:
import Queue as queue
import datetime
def digest_str(hash_obj, base32=False):
import base64
@ -92,27 +96,162 @@ class RequestBlockedByRule(Exception):
def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg)
class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, options=Options()):
threading.Thread.__init__(self, name=self.__class__.__name__)
self.options = options
self.stop = threading.Event()
# these should be set before thread is started
self.inq = None
self.outq = None
self.profiler = None
def run(self):
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
def _get_process_put(self):
'''
Get url(s) from `self.inq`, process url(s), queue to `self.outq`.
Subclasses must implement this. Implementations may operate on
individual urls, or on batches.
May raise queue.Empty.
'''
raise Exception('not implemented')
def _run(self):
logging.info('%s starting up', self)
self._startup()
while not self.stop.is_set():
try:
while True:
try:
self._get_process_put()
except queue.Empty:
if self.stop.is_set():
break
logging.info('%s shutting down', self)
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'%s will try to continue after unexpected error',
self.name, exc_info=True)
time.sleep(0.5)
def _startup(self):
pass
def _shutdown(self):
pass
class BaseStandardPostfetchProcessor(BasePostfetchProcessor):
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self._process_url(recorded_url)
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url):
raise Exception('not implemented')
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
MAX_BATCH_SIZE = 500
MAX_BATCH_SEC = 10
MIN_BATCH_SEC = 2.0
def _get_process_put(self):
batch = []
start = time.time()
while True:
try:
batch.append(self.inq.get(block=True, timeout=0.5))
except queue.Empty:
if self.stop.is_set():
break
# else maybe keep adding to the batch
if len(batch) >= self.MAX_BATCH_SIZE:
break # full batch
elapsed = time.time() - start
if elapsed >= self.MAX_BATCH_SEC:
break # been batching for a while
if (elapsed >= self.MIN_BATCH_SEC and self.outq
and len(self.outq.queue) == 0):
break # next processor is waiting on us
if not batch:
raise queue.Empty
self.logger.info(
'gathered batch of %s in %0.2f sec',
len(batch), time.time() - start)
self._process_batch(batch)
if self.outq:
for recorded_url in batch:
self.outq.put(recorded_url)
def _process_batch(self, batch):
raise Exception('not implemented')
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, options=Options()):
BaseStandardPostfetchProcessor.__init__(self, options)
self.listener = listener
self.name = listener.__class__.__name__
def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records)
def start(self):
if hasattr(self.listener, 'start'):
self.listener.start()
BaseStandardPostfetchProcessor.start(self)
def _shutdown(self):
if hasattr(self.listener, 'stop'):
try:
self.listener.stop()
except:
self.logger.error(
'%s raised exception', listener.stop, exc_info=True)
# monkey-patch log levels TRACE and NOTICE
TRACE = 5
import logging
def _logging_trace(msg, *args, **kwargs):
logging.root.trace(msg, *args, **kwargs)
def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
logging.trace = _logging_trace
logging.Logger.trace = _logger_trace
logging.trace = logging.root.trace
logging.addLevelName(TRACE, 'TRACE')
NOTICE = (logging.INFO + logging.WARN) // 2
import logging
def _logging_notice(msg, *args, **kwargs):
logging.root.notice(msg, *args, **kwargs)
def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs)
logging.notice = _logging_notice
logging.Logger.notice = _logger_notice
logging.notice = logging.root.notice
logging.addLevelName(NOTICE, 'NOTICE')
import warcprox.controller as controller

View File

@ -215,7 +215,7 @@ class RethinkCaptures:
if self._timer:
self._timer.join()
class RethinkCapturesDedup:
class RethinkCapturesDedup(warcprox.dedup.DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, options=warcprox.Options()):

View File

@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -27,55 +27,189 @@ from __future__ import absolute_import
import logging
import threading
import time
import warcprox
import sys
import gc
import datetime
import warcprox
import certauth
import functools
import doublethink
class Factory:
@staticmethod
def dedup_db(options):
if options.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif options.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif options.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif options.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(
cdx_url=options.cdxserver_dedup)
elif options.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options)
return dedup_db
@staticmethod
def stats_processor(options):
# return warcprox.stats.StatsProcessor(options)
if options.rethinkdb_stats_url:
stats_processor = warcprox.stats.RethinkStatsProcessor(options)
elif options.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_processor = None
else:
stats_processor = warcprox.stats.StatsProcessor(options)
return stats_processor
@staticmethod
def warc_writer(options):
return warcprox.writerthread.WarcWriterThread(options)
@staticmethod
def playback_proxy(ca, options):
if options.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
else:
playback_index_db = None
playback_proxy = None
return playback_proxy
@staticmethod
def crawl_logger(options):
if options.crawl_log_dir:
return warcprox.crawl_log.CrawlLogger(
options.crawl_log_dir, options=options)
else:
return None
@staticmethod
def plugin(qualname):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
listener = class_()
plugin.notify # make sure it has this method
return plugin
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
@staticmethod
def service_registry(options):
if options.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
return doublethink.ServiceRegistry(rr, table=parsed.table)
else:
return None
class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController")
HEARTBEAT_INTERVAL = 20.0
def __init__(
self, proxy=None, warc_writer_threads=None, playback_proxy=None,
service_registry=None, options=warcprox.Options()):
def __init__(self, options=warcprox.Options()):
"""
Create warcprox controller.
If supplied, `proxy` should be an instance of WarcProxy, and
`warc_writer_threads` should be a list of WarcWriterThread instances.
If not supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If
not supplied, no playback proxy will run.
Create warcprox controller based on `options`.
"""
if proxy is not None:
self.proxy = proxy
else:
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
if warc_writer_threads is not None:
self.warc_writer_threads = warc_writer_threads
else:
self.warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i,
recorded_url_q=self.proxy.recorded_url_q,
options=options)
for i in range(int(self.proxy.max_threads ** 0.5))]
self.options = options
self.proxy_thread = None
self.playback_proxy_thread = None
self.playback_proxy = playback_proxy
self.service_registry = service_registry
self.options = options
self._last_rss = None
self.stop = threading.Event()
self._start_stop_lock = threading.Lock()
self.stats_processor = Factory.stats_processor(self.options)
self.proxy = warcprox.warcproxy.WarcProxy(
self.stats_processor, self.postfetch_status, options)
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options)
def postfetch_status(self):
result = {'postfetch_chain': []}
for processor in self._postfetch_chain:
if processor.__class__ == warcprox.ListenerPostfetchProcessor:
name = processor.listener.__class__.__name__
else:
name = processor.__class__.__name__
queued = len(processor.inq.queue)
if hasattr(processor, 'batch'):
queued += len(processor.batch)
result['postfetch_chain'].append({
'processor': name,
'queued_urls': len(processor.inq.queue)})
return result
def chain(self, processor0, processor1):
'''
Sets `processor0.outq` = `processor1.inq` = `queue.Queue()`
'''
assert not processor0.outq
assert not processor1.inq
q = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
processor0.outq = q
processor1.inq = q
def build_postfetch_chain(self, inq):
self._postfetch_chain = []
self.dedup_db = Factory.dedup_db(self.options)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.loader())
self.warc_writer_thread = Factory.warc_writer(self.options)
self._postfetch_chain.append(self.warc_writer_thread)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.storer())
if self.stats_processor:
self._postfetch_chain.append(self.stats_processor)
if self.playback_proxy:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.playback_proxy.playback_index_db, self.options))
crawl_logger = Factory.crawl_logger(self.options)
if crawl_logger:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
crawl_logger, self.options))
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.proxy.running_stats, self.options))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname)
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(plugin, self.options))
# chain them all up
self._postfetch_chain[0].inq = inq
for i in range(1, len(self._postfetch_chain)):
self.chain(self._postfetch_chain[i-1], self._postfetch_chain[i])
def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
with open("/proc/self/status") as f:
@ -149,12 +283,7 @@ class WarcproxController(object):
'ttl': self.HEARTBEAT_INTERVAL * 3,
'port': self.proxy.server_port,
}
status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / (
self.proxy.recorded_url_q.maxsize or 100)
status_info['queued_urls'] = self.proxy.recorded_url_q.qsize()
status_info['queue_max_size'] = self.proxy.recorded_url_q.maxsize
status_info['seconds_behind'] = self.proxy.recorded_url_q.seconds_behind()
status_info['threads'] = self.proxy.pool._max_workers
status_info.update(self.proxy.status())
self.status_info = self.service_registry.heartbeat(status_info)
self.logger.log(
@ -167,35 +296,27 @@ class WarcproxController(object):
self.logger.info('warcprox is already running')
return
if self.proxy.stats_db:
self.proxy.stats_db.start()
self.proxy_thread = threading.Thread(
target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start()
assert(all(
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
for wwt in self.warc_writer_threads))
if any((t.dedup_db for t in self.warc_writer_threads)):
self.warc_writer_threads[0].dedup_db.start()
for wwt in self.warc_writer_threads:
wwt.start()
if self.playback_proxy is not None:
if self.playback_proxy:
self.playback_proxy_thread = threading.Thread(
target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread')
target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread')
self.playback_proxy_thread.start()
for processor in self._postfetch_chain:
processor.start()
def shutdown(self):
with self._start_stop_lock:
if not self.proxy_thread or not self.proxy_thread.is_alive():
self.logger.info('warcprox is not running')
return
for wwt in self.warc_writer_threads:
wwt.stop.set()
for processor in self._postfetch_chain:
processor.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
@ -205,12 +326,8 @@ class WarcproxController(object):
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
# wait for threads to finish
for wwt in self.warc_writer_threads:
wwt.join()
if self.proxy.stats_db:
self.proxy.stats_db.stop()
for processor in self._postfetch_chain:
processor.join()
self.proxy_thread.join()
if self.playback_proxy is not None:
@ -227,6 +344,7 @@ class WarcproxController(object):
self.start()
last_mem_dbg = datetime.datetime.utcfromtimestamp(0)
last_profile_dump = datetime.datetime.utcnow()
try:
utc = datetime.timezone.utc
@ -240,6 +358,9 @@ class WarcproxController(object):
try:
while not self.stop.is_set():
if self.proxy.running_stats:
self.proxy.running_stats.snap()
if self.service_registry and (
not hasattr(self, "status_info") or (
datetime.datetime.now(utc)
@ -253,6 +374,12 @@ class WarcproxController(object):
# self.debug_mem()
# last_mem_dbg = datetime.datetime.utcnow()
if (self.options.profile and
(datetime.datetime.utcnow() - last_profile_dump
).total_seconds() > 60*10):
self._dump_profiling()
last_profile_dump = datetime.datetime.utcnow()
time.sleep(0.5)
if self.options.profile:
@ -283,18 +410,17 @@ class WarcproxController(object):
'aggregate performance profile of %s proxy threads:\n%s',
len(files), buf.getvalue())
# warc writer threads
files = []
for wwt in self.warc_writer_threads:
file = os.path.join(tmpdir, '%s.dat' % wwt.ident)
wwt.profiler.dump_stats(file)
files.append(file)
buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'aggregate performance profile of %s warc writer threads:\n%s',
len(self.warc_writer_threads), buf.getvalue())
# postfetch processors
for processor in self._postfetch_chain:
if not processor.profiler:
self.logger.notice('%s has no profiling data', processor)
continue
file = os.path.join(tmpdir, '%s.dat' % processor.ident)
processor.profiler.dump_stats(file)
buf = io.StringIO()
stats = pstats.Stats(file, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'performance profile of %s:\n%s', processor,
buf.getvalue())

View File

@ -1,7 +1,7 @@
'''
warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -32,9 +32,19 @@ import doublethink
import datetime
import urllib3
from urllib3.exceptions import HTTPError
import collections
urllib3.disable_warnings()
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.dedup_db = dedup_db
def _process_url(self, recorded_url):
decorate_with_dedup_info(
self.dedup_db, recorded_url, self.options.base32)
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -61,6 +71,12 @@ class DedupDb(object):
conn.commit()
conn.close()
def loader(self, *args, **kwargs):
return DedupLoader(self, self.options)
def storer(self, *args, **kwargs):
return warcprox.ListenerPostfetchProcessor(self, self.options)
def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
@ -106,20 +122,20 @@ class DedupDb(object):
else:
self.save(digest_key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
recorded_url.dedup_info = dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = dedup_db.lookup(digest_key,
url=recorded_url.url)
recorded_url.dedup_info = dedup_db.lookup(
digest_key, url=recorded_url.url)
class RethinkDedupDb:
class RethinkDedupDb(DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, options=warcprox.Options()):
@ -181,16 +197,16 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class CdxServerDedup(object):
class CdxServerDedup(DedupDb):
"""Query a CDX server to perform deduplication.
"""
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
http_pool = urllib3.PoolManager()
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
options=warcprox.Options()):
maxsize=200, options=warcprox.Options()):
self.cdx_url = cdx_url
self.options = options
self.http_pool = urllib3.PoolManager(maxsize=maxsize)
def start(self):
pass
@ -244,7 +260,82 @@ class CdxServerDedup(object):
"""
pass
class TroughDedupDb(object):
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should
have dedup info stored.
'''
buckets = collections.defaultdict(list)
for recorded_url in batch:
if (recorded_url.warc_records
and recorded_url.warc_records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
return buckets
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
self.trough_dedup_db.batch_save(buckets[bucket], bucket)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _startup(self):
self.trough_dedup_db.start()
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not
be looked up.
'''
buckets = collections.defaultdict(list)
for recorded_url in batch:
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
return buckets
def _build_key_index(self, batch):
'''
Returns `{digest_key: [recorded_url, ...]}`.
'''
key_index = collections.defaultdict(list)
for recorded_url in batch:
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
key_index[digest_key].append(recorded_url)
return key_index
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
results = self.trough_dedup_db.batch_lookup(
key_index.keys(), bucket)
for result in results:
for recorded_url in key_index[result['digest_key']]:
recorded_url.dedup_info = result
class TroughDedupDb(DedupDb):
'''
https://github.com/internetarchive/trough
'''
@ -256,7 +347,8 @@ class TroughDedupDb(object):
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
WRITE_SQL_TMPL = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
@ -264,6 +356,12 @@ class TroughDedupDb(object):
self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
def loader(self, *args, **kwargs):
return BatchTroughLoader(self, self.options)
def storer(self, *args, **kwargs):
return BatchTroughStorer(self, self.options)
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
@ -275,6 +373,21 @@ class TroughDedupDb(object):
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
def batch_save(self, batch, bucket='__unspecified__'):
sql_tmpl = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values %s;' % ','.join(
'(%s,%s,%s,%s)' for i in range(len(batch))))
values = []
for recorded_url in batch:
values.extend([
warcprox.digest_str(
recorded_url.payload_digest, self.options.base32),
recorded_url.url,
recorded_url.warc_records[0].date,
recorded_url.warc_records[0].id,])
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',
@ -291,6 +404,23 @@ class TroughDedupDb(object):
else:
return None
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
','.join('%s' for i in range(len(digest_keys))))
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
if results is None:
return []
self.logger.debug(
'trough batch lookup of %s keys returned %s results',
len(digest_keys), len(results))
assert len(results) >= 0 and len(results) <= len(digest_keys)
for result in results:
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
result['digest_key'] = result['digest_key'].encode('ascii')
return results
def notify(self, recorded_url, records):
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):

View File

@ -61,7 +61,7 @@ class BetterArgumentDefaultsHelpFormatter(
else:
return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action)
def _build_arg_parser(prog):
def _build_arg_parser(prog='warcprox'):
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=BetterArgumentDefaultsHelpFormatter)
@ -77,12 +77,18 @@ def _build_arg_parser(prog):
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('--warc-filename', dest='warc_filename',
default='{prefix}-{timestamp17}-{serialno}-{randomtoken}',
help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix',
default=False, action='store_true', help=argparse.SUPPRESS)
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
# not mentioned in --help: special value for '-' for --prefix means don't
# archive the capture, unless prefix set in warcprox-meta header
arg_parser.add_argument(
'-n', '--prefix', dest='prefix', default='WARCPROX',
help='default WARC filename prefix')
arg_parser.add_argument(
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
type=int, help='WARC file rollover size threshold in bytes')
@ -113,9 +119,9 @@ def _build_arg_parser(prog):
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
# arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
# default='./warcprox-playback-index.db',
# help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
@ -195,12 +201,12 @@ def dump_state(signum=None, frame=None):
'dumping state (caught signal %s)\n%s',
signum, '\n'.join(state_strs))
def init_controller(args):
def parse_args(argv):
'''
Creates a warcprox.controller.WarcproxController configured according to
the supplied arguments (normally the result of parse_args(sys.argv)).
Parses command line arguments with argparse.
'''
options = warcprox.Options(**vars(args))
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
try:
hashlib.new(args.digest_algorithm)
@ -208,102 +214,6 @@ def init_controller(args):
logging.fatal(e)
exit(1)
listeners = []
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db)
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_db = None
else:
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db)
recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size)
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
stats_db=stats_db, options=options)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
args.playback_index_db_file, options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
listeners.append(playback_index_db)
else:
playback_index_db = None
playback_proxy = None
if args.crawl_log_dir:
listeners.append(warcprox.crawl_log.CrawlLogger(
args.crawl_log_dir, options=options))
for qualname in args.plugins or []:
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
listener = class_()
listener.notify # make sure it has this method
listeners.append(listener)
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
writer_pool = warcprox.writer.WarcWriterPool(options=options)
# number of warc writer threads = sqrt(proxy.max_threads)
# I came up with this out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5)
logging.debug('initializing %d warc writer threads', num_writer_threads)
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
writer_pool=writer_pool, dedup_db=dedup_db,
listeners=listeners, options=options)
for i in range(num_writer_threads)]
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else:
svcreg = None
controller = warcprox.controller.WarcproxController(
proxy, warc_writer_threads, playback_proxy,
service_registry=svcreg, options=options)
return controller
def parse_args(argv):
'''
Parses command line arguments with argparse.
'''
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
return args
def main(argv=None):
@ -329,7 +239,8 @@ def main(argv=None):
# see https://github.com/pyca/cryptography/issues/2911
cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
controller = init_controller(args)
options = warcprox.Options(**vars(args))
controller = warcprox.controller.WarcproxController(options)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
@ -400,7 +311,8 @@ def ensure_rethinkdb_tables(argv=None):
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
did_something = True
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
stats_db = warcprox.stats.RethinkStatsProcessor(options=options)
stats_db._ensure_db_table()
did_something = True
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
@ -411,7 +323,7 @@ def ensure_rethinkdb_tables(argv=None):
if args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
logging.warn(
'trough it responsible for creating most of the rethinkdb '
'trough is responsible for creating most of the rethinkdb '
'tables that it uses')
did_something = True

View File

@ -62,6 +62,8 @@ except ImportError:
import concurrent.futures
import urlcanon
import time
import collections
import cProfile
class ProxyingRecorder(object):
"""
@ -79,7 +81,7 @@ class ProxyingRecorder(object):
self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None
self.proxy_client = proxy_client
self._proxy_client_conn_open = True
self._proxy_client_conn_open = bool(self.proxy_client)
self.len = 0
self.url = url
@ -451,6 +453,7 @@ class PooledMixIn(socketserver.ThreadingMixIn):
on system resource limits.
'''
self.active_requests = set()
self.unaccepted_requests = 0
if not max_threads:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
@ -475,6 +478,17 @@ class PooledMixIn(socketserver.ThreadingMixIn):
self.max_threads = max_threads
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
def status(self):
if hasattr(super(), 'status'):
result = super().status()
else:
result = {}
result.update({
'threads': self.pool._max_workers,
'active_requests': len(self.active_requests),
'unaccepted_requests': self.unaccepted_requests})
return result
def process_request(self, request, client_address):
self.active_requests.add(request)
future = self.pool.submit(
@ -503,12 +517,14 @@ class PooledMixIn(socketserver.ThreadingMixIn):
self.logger.trace(
'someone is connecting active_requests=%s',
len(self.active_requests))
self.unaccepted_requests += 1
while len(self.active_requests) > self.max_threads:
time.sleep(0.05)
res = self.socket.accept()
self.logger.trace(
'accepted after %.1f sec active_requests=%s socket=%s',
time.time() - start, len(self.active_requests), res[0])
self.unaccepted_requests -= 1
return res
class MitmProxy(http_server.HTTPServer):
@ -547,9 +563,14 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
# See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html
request_queue_size = 4096
def __init__(self, max_threads, options=warcprox.Options()):
PooledMixIn.__init__(self, max_threads)
self.profilers = {}
def __init__(self, options=warcprox.Options()):
if options.max_threads:
self.logger.info(
'max_threads=%s set by command line option',
options.max_threads)
PooledMixIn.__init__(self, options.max_threads)
self.profilers = collections.defaultdict(cProfile.Profile)
if options.profile:
self.process_request_thread = self._profile_process_request_thread
@ -557,9 +578,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
self.process_request_thread = self._process_request_thread
def _profile_process_request_thread(self, request, client_address):
if not threading.current_thread().ident in self.profilers:
import cProfile
self.profilers[threading.current_thread().ident] = cProfile.Profile()
profiler = self.profilers[threading.current_thread().ident]
profiler.enable()
self._process_request_thread(request, client_address)

View File

@ -121,9 +121,6 @@ class PlaybackProxyHandler(MitmProxyHandler):
def _send_headers_and_refd_payload(
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
"""Parameters:
"""
location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, payload_digest)
self.logger.debug('loading http payload from {}'.format(location))
@ -133,11 +130,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
if record.type != warctools.WarcRecord.RESPONSE:
raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type))
# find end of headers
@ -158,12 +157,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type == warctools.WarcRecord.RESPONSE:
if record.type == warctools.WarcRecord.RESPONSE:
headers_buf = bytearray()
while True:
line = record.content_file.readline()
@ -173,7 +173,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
return self._send_response(headers_buf, record.content_file)
elif warc_type == warctools.WarcRecord.REVISIT:
elif record.type == warctools.WarcRecord.REVISIT:
# response consists of http headers from revisit record and
# payload from the referenced record
warc_profile = record.get_header(warctools.WarcRecord.PROFILE)

View File

@ -21,18 +21,20 @@ USA.
from __future__ import absolute_import
from hanzo import warctools
import collections
import copy
import datetime
import doublethink
import json
import logging
import os
import json
from hanzo import warctools
import warcprox
import threading
import rethinkdb as r
import datetime
import urlcanon
import sqlite3
import copy
import doublethink
import threading
import time
import urlcanon
import warcprox
def _empty_bucket(bucket):
return {
@ -51,45 +53,88 @@ def _empty_bucket(bucket):
},
}
class StatsDb:
logger = logging.getLogger("warcprox.stats.StatsDb")
class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
logger = logging.getLogger("warcprox.stats.StatsProcessor")
def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()):
self.file = file
self.options = options
self._lock = threading.RLock()
def _startup(self):
if os.path.exists(self.options.stats_db_file):
self.logger.info(
'opening existing stats database %s',
self.options.stats_db_file)
else:
self.logger.info(
'creating new stats database %s',
self.options.stats_db_file)
def start(self):
with self._lock:
if os.path.exists(self.file):
self.logger.info(
'opening existing stats database %s', self.file)
else:
self.logger.info(
'creating new stats database %s', self.file)
conn = sqlite3.connect(self.options.stats_db_file)
conn.execute(
'create table if not exists buckets_of_stats ('
' bucket varchar(300) primary key,'
' stats varchar(4000)'
');')
conn.commit()
conn.close()
conn = sqlite3.connect(self.file)
self.logger.info(
'created table buckets_of_stats in %s',
self.options.stats_db_file)
def _process_batch(self, batch):
batch_buckets = self._tally_batch(batch)
self._update_db(batch_buckets)
logging.trace('updated stats from batch of %s', len(batch))
def _update_db(self, batch_buckets):
conn = sqlite3.connect(self.options.stats_db_file)
for bucket in batch_buckets:
bucket_stats = batch_buckets[bucket]
cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?',
(bucket,))
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
old_bucket_stats = json.loads(result_tuple[0])
bucket_stats['total']['urls'] += old_bucket_stats['total']['urls']
bucket_stats['total']['wire_bytes'] += old_bucket_stats['total']['wire_bytes']
bucket_stats['revisit']['urls'] += old_bucket_stats['revisit']['urls']
bucket_stats['revisit']['wire_bytes'] += old_bucket_stats['revisit']['wire_bytes']
bucket_stats['new']['urls'] += old_bucket_stats['new']['urls']
bucket_stats['new']['wire_bytes'] += old_bucket_stats['new']['wire_bytes']
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
'create table if not exists buckets_of_stats ('
' bucket varchar(300) primary key,'
' stats varchar(4000)'
');')
'insert or replace into buckets_of_stats '
'(bucket, stats) values (?, ?)', (bucket, json_value))
conn.commit()
conn.close()
conn.close()
self.logger.info('created table buckets_of_stats in %s', self.file)
def _tally_batch(self, batch):
batch_buckets = {}
for recorded_url in batch:
for bucket in self.buckets(recorded_url):
bucket_stats = batch_buckets.get(bucket)
if not bucket_stats:
bucket_stats = _empty_bucket(bucket)
batch_buckets[bucket] = bucket_stats
def stop(self):
pass
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
def close(self):
pass
def sync(self):
pass
if recorded_url.warc_records:
if recorded_url.warc_records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
return batch_buckets
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
conn = sqlite3.connect(self.file)
conn = sqlite3.connect(self.options.stats_db_file)
cursor = conn.execute(
'select stats from buckets_of_stats where bucket = ?',
(bucket0,))
@ -107,9 +152,6 @@ class StatsDb:
else:
return None
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
def buckets(self, recorded_url):
'''
Unravels bucket definitions in Warcprox-Meta header. Each bucket
@ -152,117 +194,20 @@ class StatsDb:
return buckets
def tally(self, recorded_url, records):
with self._lock:
conn = sqlite3.connect(self.file)
for bucket in self.buckets(recorded_url):
cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?',
(bucket,))
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
bucket_stats = json.loads(result_tuple[0])
else:
bucket_stats = _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
'insert or replace into buckets_of_stats '
'(bucket, stats) values (?, ?)', (bucket, json_value))
conn.commit()
conn.close()
class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
class RethinkStatsProcessor(StatsProcessor):
logger = logging.getLogger("warcprox.stats.RethinkStatsProcessor")
def __init__(self, options=warcprox.Options()):
StatsProcessor.__init__(self, options)
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.replicas = min(3, len(self.rr.servers))
def _startup(self):
self._ensure_db_table()
self.options = options
self._stop = threading.Event()
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = {}
self._timer = None
def start(self):
"""Starts batch update repeating timer."""
self._update_batch() # starts repeating timer
def _bucket_batch_update_reql(self, bucket, batch):
return self.rr.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), batch[bucket], old.merge({
"total": {
"urls": old["total"]["urls"].add(
batch[bucket]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(
batch[bucket]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(
batch[bucket]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(
batch[bucket]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(
batch[bucket]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(
batch[bucket]["revisit"]["wire_bytes"]),
},
})))
def _update_batch(self):
with self._batch_lock:
batch_copy = copy.deepcopy(self._batch)
self._batch = {}
try:
if len(batch_copy) > 0:
# XXX can all the buckets be done in one query?
for bucket in batch_copy:
result = self._bucket_batch_update_reql(
bucket, batch_copy).run()
if (not result["inserted"] and not result["replaced"]
or sorted(result.values()) != [0,0,0,0,0,1]):
raise Exception(
"unexpected result %s updating stats %s" % (
result, batch_copy[bucket]))
except Exception as e:
self.logger.error("problem updating stats", exc_info=True)
# now we need to restore the stats that didn't get saved to the
# batch so that they are saved in the next call to _update_batch()
with self._batch_lock:
self._add_to_batch(batch_copy)
finally:
if not self._stop.is_set():
self._timer = threading.Timer(2.0, self._update_batch)
self._timer.name = "RethinkStats-batch-update-timer-%s" % (
datetime.datetime.utcnow().isoformat())
self._timer.start()
else:
self.logger.info("finished")
def _ensure_db_table(self):
dbs = self.rr.db_list().run()
@ -280,17 +225,38 @@ class RethinkStatsDb(StatsDb):
self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run()
def close(self):
self.stop()
def _update_db(self, batch_buckets):
# XXX can all the buckets be done in one query?
for bucket in batch_buckets:
result = self._bucket_batch_update_reql(
bucket, batch_buckets[bucket]).run()
if (not result['inserted'] and not result['replaced']
or sorted(result.values()) != [0,0,0,0,0,1]):
self.logger.error(
'unexpected result %s updating stats %s' % (
result, batch_buckets[bucket]))
def stop(self):
self.logger.info("stopping rethinkdb stats table batch updates")
self._stop.set()
if self._timer:
self._timer.join()
def sync(self):
pass
def _bucket_batch_update_reql(self, bucket, new):
return self.rr.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), new, old.merge({
'total': {
'urls': old['total']['urls'].add(new['total']['urls']),
'wire_bytes': old['total']['wire_bytes'].add(
new['total']['wire_bytes']),
},
'new': {
'urls': old['new']['urls'].add(new['new']['urls']),
'wire_bytes': old['new']['wire_bytes'].add(
new['new']['wire_bytes']),
},
'revisit': {
'urls': old['revisit']['urls'].add(
new['revisit']['urls']),
'wire_bytes': old['revisit']['wire_bytes'].add(
new['revisit']['wire_bytes']),
},
})))
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
bucket0_stats = self.rr.table(self.table).get(bucket0).run()
@ -305,36 +271,83 @@ class RethinkStatsDb(StatsDb):
return bucket0_stats[bucket1]
return bucket0_stats
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
def _add_to_batch(self, add_me):
with self._batch_lock:
for bucket in add_me:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += add_me[bucket]["total"]["urls"]
bucket_stats["total"]["wire_bytes"] += add_me[bucket]["total"]["wire_bytes"]
bucket_stats["revisit"]["urls"] += add_me[bucket]["revisit"]["urls"]
bucket_stats["revisit"]["wire_bytes"] += add_me[bucket]["revisit"]["wire_bytes"]
bucket_stats["new"]["urls"] += add_me[bucket]["new"]["urls"]
bucket_stats["new"]["wire_bytes"] += add_me[bucket]["new"]["wire_bytes"]
class RunningStats:
'''
In-memory stats for measuring overall warcprox performance.
'''
def __init__(self):
self.urls = 0
self.warc_bytes = 0
self._lock = threading.RLock()
self.first_snap_time = time.time()
# snapshot every minute since the beginning of time
self.minute_snaps = [(self.first_snap_time, 0, 0)]
# snapshot every 10 seconds for the last 2 minutes (fill with zeroes)
self.ten_sec_snaps = collections.deque()
for i in range(0, 13):
self.ten_sec_snaps.append(
(self.first_snap_time - 120 + i * 10, 0, 0))
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
with self._lock:
self.urls += 1
if records:
self.warc_bytes += records[-1].offset + records[-1].length - records[0].offset
def snap(self):
now = time.time()
last_snap_time = self.minute_snaps[-1][0]
need_minute_snap = (now - self.first_snap_time) // 60 > (self.minute_snaps[-1][0] - self.first_snap_time) // 60
need_ten_sec_snap = (now - self.ten_sec_snaps[0][0]) // 10 > (self.ten_sec_snaps[-1][0] - self.ten_sec_snaps[0][0]) // 10
if need_minute_snap:
self.minute_snaps.append((now, self.urls, self.warc_bytes))
logging.debug('added minute snap %r', self.minute_snaps[-1])
if need_ten_sec_snap:
self.ten_sec_snaps.popleft()
self.ten_sec_snaps.append((now, self.urls, self.warc_bytes))
logging.trace('rotated in ten second snap %r', self.ten_sec_snaps[-1])
def _closest_ten_sec_snap(self, t):
# it's a deque so iterating over it is faster than indexed lookup
closest_snap = (0, 0, 0)
for snap in self.ten_sec_snaps:
if abs(t - snap[0]) < abs(t - closest_snap[0]):
closest_snap = snap
return closest_snap
def _closest_minute_snap(self, t):
minutes_ago = int((time.time() - t) / 60)
# jump to approximately where we expect the closest snap
i = max(0, len(self.minute_snaps) - minutes_ago)
# move back to the last one earlier than `t`
while self.minute_snaps[i][0] > t and i > 0:
i -= 1
closest_snap = self.minute_snaps[i]
# move forward until we start getting farther away from `t`
while i < len(self.minute_snaps):
if abs(t - self.minute_snaps[i][0]) <= abs(t - closest_snap[0]):
closest_snap = self.minute_snaps[i]
else:
break
i += 1
return closest_snap
def current_rates(self, time_period_minutes):
assert time_period_minutes > 0
with self._lock:
now = time.time()
urls = self.urls
warc_bytes = self.warc_bytes
t = now - time_period_minutes * 60
if time_period_minutes <= 2:
start_snap = self._closest_ten_sec_snap(t)
else:
start_snap = self._closest_minute_snap(t)
elapsed = now - start_snap[0]
logging.trace(
'elapsed=%0.1fs urls=%s warc_bytes=%s', elapsed,
urls - start_snap[1], warc_bytes - start_snap[2])
return elapsed, (urls - start_snap[1]) / elapsed, (warc_bytes - start_snap[2]) / elapsed

View File

@ -87,7 +87,7 @@ class TroughClient(object):
def promote(self, segment_id):
url = os.path.join(self.segment_manager_url(), 'promote')
payload_dict = {'segment': segment_id}
response = requests.post(url, json=payload_dict)
response = requests.post(url, json=payload_dict, timeout=21600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
@ -129,7 +129,7 @@ class TroughClient(object):
def write_url_nocache(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
response = requests.post(provision_url, json=payload_dict, timeout=600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
@ -175,7 +175,7 @@ class TroughClient(object):
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(write_url, sql)
response = requests.post(write_url, sql, timeout=600)
if segment_id not in self._dirty_segments:
with self._dirty_segments_lock:
self._dirty_segments.add(segment_id)
@ -200,7 +200,7 @@ class TroughClient(object):
return None
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(read_url, sql)
response = requests.post(read_url, sql, timeout=600)
except:
self._read_url_cache.pop(segment_id, None)
self.logger.error(
@ -221,7 +221,7 @@ class TroughClient(object):
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
response = requests.get(url, timeout=60)
if response.status_code == 200:
return True
elif response.status_code == 404:
@ -232,7 +232,7 @@ class TroughClient(object):
def register_schema(self, schema_id, sql):
url = os.path.join(
self.segment_manager_url(), 'schema', schema_id, 'sql')
response = requests.put(url, sql)
response = requests.put(url, sql, timeout=600)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (

View File

@ -2,7 +2,7 @@
warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic,
enqueue info on the recorded url queue
Copyright (C) 2013-2016 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -92,6 +92,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.url, rule))
def _enforce_limit(self, limit_key, limit_value, soft=False):
if not self.server.stats_db:
return
bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2)
_limit_key = limit_key
@ -183,9 +185,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
def _proxy_request(self):
warcprox_meta = None
raw_warcprox_meta = self.headers.get('Warcprox-Meta')
self.logger.log(
warcprox.TRACE, 'request for %s Warcprox-Meta header: %s',
self.url, repr(raw_warcprox_meta))
self.logger.trace(
'request for %s Warcprox-Meta header: %s', self.url,
raw_warcprox_meta)
if raw_warcprox_meta:
warcprox_meta = json.loads(raw_warcprox_meta)
del self.headers['Warcprox-Meta']
@ -232,14 +234,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'host': socket.gethostname(),
'address': self.connection.getsockname()[0],
'port': self.connection.getsockname()[1],
'load': 1.0 * self.server.recorded_url_q.qsize() / (
self.server.recorded_url_q.maxsize or 100),
'queued_urls': self.server.recorded_url_q.qsize(),
'queue_max_size': self.server.recorded_url_q.maxsize,
'seconds_behind': self.server.recorded_url_q.seconds_behind(),
'pid': os.getpid(),
'threads': self.server.pool._max_workers,
}
status_info.update(self.server.status())
payload = json.dumps(
status_info, indent=2).encode('utf-8') + b'\n'
self.send_response(200, 'OK')
@ -333,7 +330,7 @@ class RecordedUrl:
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None):
payload_digest=None, warc_records=None):
# XXX should test what happens with non-ascii url (when does
# url-encoding happen?)
if type(url) is not bytes:
@ -372,6 +369,7 @@ class RecordedUrl:
self.duration = duration
self.referer = referer
self.payload_digest = payload_digest
self.warc_records = warc_records
# inherit from object so that multiple inheritance from this class works
# properly in python 2
@ -380,8 +378,12 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)
@ -400,38 +402,63 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.digest_algorithm = options.digest_algorithm or 'sha1'
if ca is not None:
self.ca = ca
ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64]
self.ca = CertificateAuthority(
ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca',
ca_name=ca_name)
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.running_stats = warcprox.stats.RunningStats()
def status(self):
if hasattr(super(), 'status'):
result = super().status()
else:
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
self.ca = CertificateAuthority(ca_file='warcprox-ca.pem',
certs_dir='./warcprox-ca',
ca_name=ca_name)
if recorded_url_q is not None:
self.recorded_url_q = recorded_url_q
else:
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.stats_db = stats_db
self.options = options
result = {}
result.update({
'load': 1.0 * self.recorded_url_q.qsize() / (
self.recorded_url_q.maxsize or 100),
'queued_urls': self.recorded_url_q.qsize(),
'queue_max_size': self.recorded_url_q.maxsize,
'seconds_behind': self.recorded_url_q.seconds_behind(),
'urls_processed': self.running_stats.urls,
'warc_bytes_written': self.running_stats.warc_bytes,
'start_time': self.running_stats.first_snap_time,
})
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1)
result['rates_1min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(5)
result['rates_5min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(15)
result['rates_15min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
# gets postfetch chain status from the controller
if self.status_callback:
result.update(self.status_callback())
return result
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
if options.max_threads:
self.logger.info(
"max_threads=%s set by command line option",
options.max_threads)
warcprox.mitmproxy.PooledMitmProxy.__init__(
self, options.max_threads, options)
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options)
SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options)
self, stats_db, status_callback, options)
def server_activate(self):
http_server.HTTPServer.server_activate(self)

View File

@ -28,6 +28,7 @@ import fcntl
import time
import warcprox
import os
import socket
import string
import random
import threading
@ -42,6 +43,8 @@ class WarcWriter:
self._last_activity = time.time()
self.gzip = options.gzip or False
self.warc_filename = options.warc_filename or \
'{prefix}-{timestamp17}-{randomtoken}-{serialno}'
digest_algorithm = options.digest_algorithm or 'sha1'
base32 = options.base32
self.record_builder = warcprox.warc.WarcRecordBuilder(
@ -68,6 +71,10 @@ class WarcWriter:
now = datetime.utcnow()
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
def timestamp14(self):
now = datetime.utcnow()
return '{:%Y%m%d%H%M%S}'.format(now)
def close_writer(self):
with self._lock:
if self._fpath:
@ -86,8 +93,32 @@ class WarcWriter:
self._fpath = None
self._f = None
def serial(self):
return '{:05d}'.format(self._serial)
# h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
# ${prefix}-${timestamp17}-${randomtoken}-${serialno}.warc.gz"
def _warc_filename(self):
"""WARC filename is configurable with CLI parameter --warc-filename.
Default: '{prefix}-{timestamp17}-{serialno}-{randomtoken}'
Available variables are: prefix, timestamp14, timestamp17, serialno,
randomtoken, hostname, shorthostname.
Extension ``.warc`` or ``.warc.gz`` is appended automatically.
"""
hostname = socket.getfqdn()
shorthostname = hostname.split('.')[0]
fname = self.warc_filename.format(prefix=self.prefix,
timestamp14=self.timestamp14(),
timestamp17=self.timestamp17(),
serialno=self.serial(),
randomtoken=self._randomtoken,
hostname=hostname,
shorthostname=shorthostname)
if self.gzip:
fname = fname + '.warc.gz'
else:
fname = fname + '.warc'
return fname
def _writer(self):
with self._lock:
if self._fpath and os.path.getsize(
@ -95,9 +126,7 @@ class WarcWriter:
self.close_writer()
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial,
self._randomtoken, '.gz' if self.gzip else '')
self._f_finalname = self._warc_filename()
self._fpath = os.path.sep.join([
self.directory, self._f_finalname + self._f_open_suffix])

View File

@ -2,7 +2,7 @@
warcprox/writerthread.py - warc writer thread, reads from the recorded url
queue, writes warc records, runs final tasks after warc records are written
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -28,113 +28,54 @@ except ImportError:
import Queue as queue
import logging
import threading
import time
from datetime import datetime
from hanzo import warctools
import warcprox
import sys
class WarcWriterThread(threading.Thread):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread")
def __init__(
self, name='WarcWriterThread', recorded_url_q=None,
writer_pool=None, dedup_db=None, listeners=[],
options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name=name)
self.recorded_url_q = recorded_url_q
self.stop = threading.Event()
if writer_pool:
self.writer_pool = writer_pool
else:
self.writer_pool = warcprox.writer.WarcWriterPool()
self.dedup_db = dedup_db
self.listeners = listeners
self.options = options
self.idle = None
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def run(self):
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterThread")
_ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'}
def __init__(self, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.options = options
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def _get_process_put(self):
try:
warcprox.BaseStandardPostfetchProcessor._get_process_put(self)
finally:
self.writer_pool.maybe_idle_rollover()
def _process_url(self, recorded_url):
records = []
if self._should_archive(recorded_url):
records = self.writer_pool.write_records(recorded_url)
recorded_url.warc_records = records
self._log(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
def _filter_accepts(self, recorded_url):
if not self.method_filter:
return True
meth = recorded_url.method.upper()
return meth in self._ALWAYS_ACCEPT or meth in self.method_filter
def _run(self):
self.name = '%s(tid=%s)'% (self.name, warcprox.gettid())
while not self.stop.is_set():
try:
while True:
try:
if self.stop.is_set():
qsize = self.recorded_url_q.qsize()
if qsize % 50 == 0:
self.logger.info("%s urls left to write", qsize)
# XXX optimize handling of urls not to be archived throughout warcprox
def _should_archive(self, recorded_url):
prefix = (recorded_url.warcprox_meta['warc-prefix']
if recorded_url.warcprox_meta
and 'warc-prefix' in recorded_url.warcprox_meta
else self.options.prefix)
# special warc name prefix '-' means "don't archive"
return prefix != '-' and self._filter_accepts(recorded_url)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
records = []
self.idle = None
if self._filter_accepts(recorded_url):
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
self.writer_pool.maybe_idle_rollover()
except queue.Empty:
if self.stop.is_set():
break
self.idle = time.time()
self.logger.info('WarcWriterThread shutting down')
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'WarcWriterThread will try to continue after unexpected '
'error', exc_info=True)
time.sleep(0.5)
def _shutdown(self):
self.writer_pool.close_writers()
for listener in self.listeners:
if hasattr(listener, 'stop'):
try:
listener.stop()
except:
self.logger.error(
'%s raised exception', listener.stop, exc_info=True)
# closest thing we have to heritrix crawl log at the moment
def _log(self, recorded_url, records):
try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8")
payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8")
except:
payload_digest = "-"
@ -148,13 +89,3 @@ class WarcWriterThread(threading.Thread):
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)
def _final_tasks(self, recorded_url, records):
if self.listeners:
for listener in self.listeners:
try:
listener.notify(recorded_url, records)
except:
self.logger.error('%s raised exception',
listener.notify, exc_info=True)
self._log(recorded_url, records)