mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge branch 'master' into qa
* master: Update docstring Move Warcprox-Meta header construction to warcproxy Improve test_writer tests Replace timestamp parameter with more generic request/response syntax Return capture timestamp Swap fcntl.flock with fcntl.lockf Unit test fix for Python2 compatibility Test WarcWriter file locking when no_warc_open_suffix=True Rename writer var and add exception handling Acquire and exclusive file lock when not using .open WARC suffix Add hidden --no-warc-open-suffix CLI option Fix missing dummy url param in bigtable lookup method back to dev version number version 2.2 for pypi to address https://github.com/internetarchive/warcprox/issues/42 Expand comment with limit=-1 explanation Drop unnecessary split for newline in CDX results fix benchmarks (update command line args) Update CdxServerDedup lookup algorithm Pass url instead of recorded_url obj to dedup lookup methods Filter out warc/revisit records in CdxServerDedup Improve CdxServerDedup implementation Fix minor CdxServerDedup unit test Fix bug with dedup_info date encoding Add mock pkg to run-tests.sh Add CdxServerDedup unit tests and improve its exception handling Add CDX Server based deduplication cryptography lib version 2.1.1 is causing problems Revert changes to test_warcprox.py Revert changes to bigtable and dedup Revert warc to previous behavior Update unit test Replace invalid warcfilename variable in playback Stop using WarcRecord.REFERS_TO header and use payload_digest instead greatly simplify automated test setup by reusing initialization code from the command line executable; this also has the benefit of testing that initialization code avoid TypeError: 'NoneType' object is not iterable exception at shutdown wait for rethinkdb indexes to be ready Remove deleted ``close`` method call from test. bump dev version number after merging pull requests Add missing "," in deps Remove tox.ini, move warcio to test_requires allow very long request header lines, to support large warcprox-meta header values Remove redundant stop() & sync() dedup methods Remove redundant close method from DedupDb and RethinkDedupDb Remove unused imports Add missing packages from setup.py, add tox config. fix python2 tests don't use http.client.HTTPResponse.getheader() to get the content-type header, because it can return a comma-delimited string no SIGQUIT on windows, so no SIGQUIT handler https://github.com/internetarchive/warcprox/pull/32 warrants a version bump fix --size option (https://github.com/internetarchive/warcprox/issues/31) fix --playback-port option (https://github.com/internetarchive/warcprox/issues/29) fix zero-indexing of warc_writer_threads so they can be disabled via empty list
This commit is contained in:
commit
b64c50c32c
@ -47,6 +47,7 @@ Usage
|
|||||||
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
|
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
|
||||||
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
|
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
|
||||||
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
|
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
|
||||||
|
[--cdxserver-dedup CDX_SERVER_URL]
|
||||||
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
|
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
|
||||||
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
|
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
|
||||||
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
|
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
|
||||||
@ -100,6 +101,9 @@ Usage
|
|||||||
persistent deduplication database file; empty
|
persistent deduplication database file; empty
|
||||||
string or /dev/null disables deduplication
|
string or /dev/null disables deduplication
|
||||||
(default: ./warcprox.sqlite)
|
(default: ./warcprox.sqlite)
|
||||||
|
--cdxserver-dedup CDX_SERVER_URL
|
||||||
|
use a CDX server for deduplication
|
||||||
|
(default: None)
|
||||||
--rethinkdb-servers RETHINKDB_SERVERS
|
--rethinkdb-servers RETHINKDB_SERVERS
|
||||||
rethinkdb servers, used for dedup and stats if
|
rethinkdb servers, used for dedup and stats if
|
||||||
specified; e.g.
|
specified; e.g.
|
||||||
|
@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
|
|||||||
arg_parser = argparse.ArgumentParser(
|
arg_parser = argparse.ArgumentParser(
|
||||||
prog=prog, description=desc,
|
prog=prog, description=desc,
|
||||||
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
|
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
|
||||||
arg_parser.add_argument(
|
|
||||||
'-z', '--gzip', dest='gzip', action='store_true',
|
### these warcprox options are not configurable for the benchmarks
|
||||||
|
# arg_parser.add_argument('-p', '--port', dest='port', default='8000',
|
||||||
|
# type=int, help='port to listen on')
|
||||||
|
# arg_parser.add_argument('-b', '--address', dest='address',
|
||||||
|
# default='localhost', help='address to listen on')
|
||||||
|
# arg_parser.add_argument('-c', '--cacert', dest='cacert',
|
||||||
|
# default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
|
||||||
|
# help='CA certificate file; if file does not exist, it will be created')
|
||||||
|
# arg_parser.add_argument('--certs-dir', dest='certs_dir',
|
||||||
|
# default='./{0}-warcprox-ca'.format(socket.gethostname()),
|
||||||
|
# help='where to store and load generated certificates')
|
||||||
|
# arg_parser.add_argument('-d', '--dir', dest='directory',
|
||||||
|
# default='./warcs', help='where to write warcs')
|
||||||
|
|
||||||
|
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
|
||||||
help='write gzip-compressed warc records')
|
help='write gzip-compressed warc records')
|
||||||
|
arg_parser.add_argument('-n', '--prefix', dest='prefix',
|
||||||
|
default='WARCPROX', help='WARC filename prefix')
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'-s', '--size', dest='size', default=1000*1000*1000, type=int,
|
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
|
||||||
help='WARC file rollover size threshold in bytes')
|
type=int, help='WARC file rollover size threshold in bytes')
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument('--rollover-idle-time',
|
||||||
'--rollover-idle-time', dest='rollover_idle_time', default=None,
|
dest='rollover_idle_time', default=None, type=int,
|
||||||
type=int, help=(
|
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
|
||||||
'WARC file rollover idle time threshold in seconds (so that '
|
|
||||||
"Friday's last open WARC doesn't sit there all weekend "
|
|
||||||
'waiting for more data)'))
|
|
||||||
try:
|
try:
|
||||||
hash_algos = hashlib.algorithms_guaranteed
|
hash_algos = hashlib.algorithms_guaranteed
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
hash_algos = hashlib.algorithms
|
hash_algos = hashlib.algorithms
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
|
||||||
'-g', '--digest-algorithm', dest='digest_algorithm',
|
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
|
||||||
default='sha1', help='digest algorithm, one of %s' % hash_algos)
|
|
||||||
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
||||||
default=False, help='write digests in Base32 instead of hex')
|
default=False, help='write digests in Base32 instead of hex')
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
|
||||||
'--method-filter', metavar='HTTP_METHOD',
|
action='append', help='only record requests with the given http method(s) (can be used more than once)')
|
||||||
action='append', help=(
|
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
|
||||||
'only record requests with the given http method(s) (can be '
|
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
|
||||||
'used more than once)'))
|
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
|
||||||
arg_parser.add_argument(
|
type=int, default=None, help='port to listen on for instant playback')
|
||||||
'--stats-db-file', dest='stats_db_file',
|
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
|
||||||
default=os.path.join(tmpdir, 'stats.db'), help=(
|
default='./warcprox-playback-index.db',
|
||||||
'persistent statistics database file; empty string or '
|
help='playback index database file (only used if --playback-port is specified)')
|
||||||
'/dev/null disables statistics tracking'))
|
|
||||||
group = arg_parser.add_mutually_exclusive_group()
|
group = arg_parser.add_mutually_exclusive_group()
|
||||||
group.add_argument(
|
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
||||||
'-j', '--dedup-db-file', dest='dedup_db_file',
|
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
||||||
default=os.path.join(tmpdir, 'dedup.db'), help=(
|
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
||||||
'persistent deduplication database file; empty string or '
|
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
||||||
'/dev/null disables deduplication'))
|
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
|
||||||
group.add_argument(
|
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
|
||||||
'--rethinkdb-servers', dest='rethinkdb_servers', help=(
|
arg_parser.add_argument('--rethinkdb-big-table',
|
||||||
'rethinkdb servers, used for dedup and stats if specified; '
|
dest='rethinkdb_big_table', action='store_true', default=False,
|
||||||
'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org'))
|
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
|
||||||
# arg_parser.add_argument(
|
|
||||||
# '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=(
|
|
||||||
# 'rethinkdb database name (ignored unless --rethinkdb-servers '
|
|
||||||
# 'is specified)'))
|
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--rethinkdb-big-table', dest='rethinkdb_big_table',
|
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
|
||||||
action='store_true', default=False, help=(
|
default='captures', help=argparse.SUPPRESS)
|
||||||
'use a big rethinkdb table called "captures", instead of a '
|
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
|
||||||
'small table called "dedup"; table is suitable for use as '
|
default=500, help=argparse.SUPPRESS)
|
||||||
'index for playback (ignored unless --rethinkdb-servers is '
|
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
|
||||||
'specified)'))
|
help=argparse.SUPPRESS)
|
||||||
|
arg_parser.add_argument('--profile', action='store_true', default=False,
|
||||||
|
help=argparse.SUPPRESS)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--queue-size', dest='queue_size', type=int, default=1, help=(
|
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
|
||||||
'max size of the queue of urls waiting to be processed by '
|
default=None, help=(
|
||||||
'the warc writer thread'))
|
'host:port of tor socks proxy, used only to connect to '
|
||||||
|
'.onion sites'))
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--max-threads', dest='max_threads', type=int, help=(
|
'--plugin', metavar='PLUGIN_CLASS', dest='plugins',
|
||||||
'number of proxy server threads (if not specified, chosen based '
|
action='append', help=(
|
||||||
'on system resource limits'))
|
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
|
||||||
arg_parser.add_argument(
|
'May be used multiple times to register multiple plugins. '
|
||||||
'--version', action='version',
|
'Plugin classes are loaded from the regular python module '
|
||||||
version='warcprox %s' % warcprox.__version__)
|
'search path. They will be instantiated with no arguments and '
|
||||||
arg_parser.add_argument(
|
'must have a method `notify(self, recorded_url, records)` '
|
||||||
'-v', '--verbose', dest='verbose', action='store_true',
|
'which will be called for each url, after warc records have '
|
||||||
help='verbose logging')
|
'been written.'))
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument('--version', action='version',
|
||||||
'--trace', dest='trace', action='store_true',
|
version="warcprox {}".format(warcprox.__version__))
|
||||||
help='trace-level logging')
|
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument('--trace', dest='trace', action='store_true')
|
||||||
'--profile', dest='profile', action='store_true', default=False,
|
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
|
||||||
help='profile the warc writer thread')
|
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--requests', dest='requests', type=int, default=200,
|
'--requests', dest='requests', type=int, default=200,
|
||||||
help='number of urls to fetch')
|
help='number of urls to fetch')
|
||||||
|
6
setup.py
6
setup.py
@ -39,8 +39,10 @@ deps = [
|
|||||||
'certauth==1.1.6',
|
'certauth==1.1.6',
|
||||||
'warctools',
|
'warctools',
|
||||||
'urlcanon>=0.1.dev16',
|
'urlcanon>=0.1.dev16',
|
||||||
|
'urllib3',
|
||||||
'doublethink>=0.2.0.dev81',
|
'doublethink>=0.2.0.dev81',
|
||||||
'PySocks',
|
'PySocks',
|
||||||
|
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
|
||||||
]
|
]
|
||||||
try:
|
try:
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
@ -49,7 +51,7 @@ except:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='warcprox',
|
name='warcprox',
|
||||||
version='2.1b1.dev94',
|
version='2.2.1b2.dev107',
|
||||||
description='WARC writing MITM HTTP/S proxy',
|
description='WARC writing MITM HTTP/S proxy',
|
||||||
url='https://github.com/internetarchive/warcprox',
|
url='https://github.com/internetarchive/warcprox',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
@ -58,7 +60,7 @@ setuptools.setup(
|
|||||||
license='GPL',
|
license='GPL',
|
||||||
packages=['warcprox'],
|
packages=['warcprox'],
|
||||||
install_requires=deps,
|
install_requires=deps,
|
||||||
tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
||||||
cmdclass = {'test': PyTest},
|
cmdclass = {'test': PyTest},
|
||||||
test_suite='warcprox.tests',
|
test_suite='warcprox.tests',
|
||||||
entry_points={
|
entry_points={
|
||||||
|
@ -40,7 +40,7 @@ do
|
|||||||
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
||||||
&& virtualenv -p $python /tmp/venv \
|
&& virtualenv -p $python /tmp/venv \
|
||||||
&& source /tmp/venv/bin/activate \
|
&& source /tmp/venv/bin/activate \
|
||||||
&& pip --log-file /tmp/pip.log install . pytest requests warcio \
|
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
|
||||||
&& py.test -v tests \
|
&& py.test -v tests \
|
||||||
&& py.test -v --rethinkdb-servers=localhost tests \
|
&& py.test -v --rethinkdb-servers=localhost tests \
|
||||||
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
|
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
|
||||||
|
46
tests/test_dedup.py
Normal file
46
tests/test_dedup.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import mock
|
||||||
|
from warcprox.dedup import CdxServerDedup
|
||||||
|
|
||||||
|
|
||||||
|
def test_cdx_dedup():
|
||||||
|
# Mock CDX Server responses to simulate found, not found and errors.
|
||||||
|
with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request:
|
||||||
|
url = "http://example.com"
|
||||||
|
# not found case
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'20170101020405 test'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
||||||
|
|
||||||
|
# found case
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res["date"] == b"2017-02-03T04:05:03Z"
|
||||||
|
|
||||||
|
# invalid CDX result status code
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 400
|
||||||
|
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
||||||
|
# invalid CDX result content
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'InvalidExceptionResult'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
@ -60,6 +60,7 @@ except ImportError:
|
|||||||
import certauth.certauth
|
import certauth.certauth
|
||||||
|
|
||||||
import warcprox
|
import warcprox
|
||||||
|
import warcprox.main
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import http.client as http_client
|
import http.client as http_client
|
||||||
@ -241,173 +242,45 @@ def https_daemon(request, cert):
|
|||||||
return https_daemon
|
return https_daemon
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
|
def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
|
||||||
captures_db = None
|
orig_dir = os.getcwd()
|
||||||
|
work_dir = tempfile.mkdtemp()
|
||||||
|
logging.info('changing to working directory %r', work_dir)
|
||||||
|
os.chdir(work_dir)
|
||||||
|
|
||||||
|
argv = ['warcprox',
|
||||||
|
'--method-filter=GET',
|
||||||
|
'--method-filter=POST',
|
||||||
|
'--port=0',
|
||||||
|
'--playback-port=0',
|
||||||
|
'--onion-tor-socks-proxy=localhost:9050',
|
||||||
|
'--crawl-log-dir=crawl-logs']
|
||||||
if rethinkdb_servers:
|
if rethinkdb_servers:
|
||||||
servers = rethinkdb_servers.split(",")
|
rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
||||||
|
argv.append('--rethinkdb-servers=%s' % rethinkdb_servers)
|
||||||
|
argv.append('--rethinkdb-db=%s' % rethinkdb_db)
|
||||||
if rethinkdb_big_table:
|
if rethinkdb_big_table:
|
||||||
db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
argv.append('--rethinkdb-big-table')
|
||||||
rr = doublethink.Rethinker(servers, db)
|
|
||||||
captures_db = warcprox.bigtable.RethinkCaptures(rr)
|
|
||||||
captures_db.start()
|
|
||||||
|
|
||||||
def fin():
|
args = warcprox.main.parse_args(argv)
|
||||||
if captures_db:
|
warcprox_ = warcprox.main.init_controller(args)
|
||||||
captures_db.close()
|
|
||||||
# logging.info('dropping rethinkdb database {}'.format(db))
|
|
||||||
# result = captures_db.rr.db_drop(db).run()
|
|
||||||
# logging.info("result=%s", result)
|
|
||||||
request.addfinalizer(fin)
|
|
||||||
|
|
||||||
return captures_db
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def rethink_dedup_db(request, rethinkdb_servers, captures_db):
|
|
||||||
ddb = None
|
|
||||||
if rethinkdb_servers:
|
|
||||||
if captures_db:
|
|
||||||
ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db)
|
|
||||||
else:
|
|
||||||
servers = rethinkdb_servers.split(",")
|
|
||||||
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
|
||||||
rr = doublethink.Rethinker(servers, db)
|
|
||||||
ddb = warcprox.dedup.RethinkDedupDb(rr)
|
|
||||||
|
|
||||||
def fin():
|
|
||||||
if rethinkdb_servers:
|
|
||||||
ddb.close()
|
|
||||||
if not captures_db:
|
|
||||||
logging.info('dropping rethinkdb database {}'.format(db))
|
|
||||||
result = ddb.rr.db_drop(db).run()
|
|
||||||
logging.info("result=%s", result)
|
|
||||||
request.addfinalizer(fin)
|
|
||||||
|
|
||||||
return ddb
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def dedup_db(request, rethink_dedup_db):
|
|
||||||
dedup_db_file = None
|
|
||||||
ddb = rethink_dedup_db
|
|
||||||
if not ddb:
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
|
|
||||||
f.close()
|
|
||||||
dedup_db_file = f.name
|
|
||||||
ddb = warcprox.dedup.DedupDb(dedup_db_file)
|
|
||||||
|
|
||||||
def fin():
|
|
||||||
if dedup_db_file:
|
|
||||||
logging.info('deleting file {}'.format(dedup_db_file))
|
|
||||||
os.unlink(dedup_db_file)
|
|
||||||
request.addfinalizer(fin)
|
|
||||||
|
|
||||||
return ddb
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def stats_db(request, rethinkdb_servers):
|
|
||||||
if rethinkdb_servers:
|
|
||||||
servers = rethinkdb_servers.split(",")
|
|
||||||
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
|
||||||
rr = doublethink.Rethinker(servers, db)
|
|
||||||
sdb = warcprox.stats.RethinkStatsDb(rr)
|
|
||||||
sdb.start()
|
|
||||||
else:
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
|
|
||||||
f.close()
|
|
||||||
stats_db_file = f.name
|
|
||||||
sdb = warcprox.stats.StatsDb(stats_db_file)
|
|
||||||
|
|
||||||
def fin():
|
|
||||||
sdb.close()
|
|
||||||
if rethinkdb_servers:
|
|
||||||
logging.info('dropping rethinkdb database {}'.format(db))
|
|
||||||
result = sdb.rr.db_drop(db).run()
|
|
||||||
logging.info("result=%s", result)
|
|
||||||
# else:
|
|
||||||
# logging.info('deleting file {}'.format(stats_db_file))
|
|
||||||
# os.unlink(stats_db_file)
|
|
||||||
request.addfinalizer(fin)
|
|
||||||
|
|
||||||
return sdb
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def service_registry(request, rethinkdb_servers):
|
|
||||||
if rethinkdb_servers:
|
|
||||||
servers = rethinkdb_servers.split(",")
|
|
||||||
db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
|
|
||||||
rr = doublethink.Rethinker(servers, db)
|
|
||||||
|
|
||||||
def fin():
|
|
||||||
logging.info('dropping rethinkdb database {}'.format(db))
|
|
||||||
result = rr.db_drop(db).run()
|
|
||||||
logging.info("result=%s", result)
|
|
||||||
request.addfinalizer(fin)
|
|
||||||
|
|
||||||
return doublethink.ServiceRegistry(rr)
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def warcprox_(request, captures_db, dedup_db, stats_db, service_registry):
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True)
|
|
||||||
f.close() # delete it, or CertificateAuthority will try to read it
|
|
||||||
ca_file = f.name
|
|
||||||
ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
|
|
||||||
ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test')
|
|
||||||
|
|
||||||
recorded_url_q = warcprox.TimestampedQueue()
|
|
||||||
|
|
||||||
options = warcprox.Options(port=0, playback_port=0,
|
|
||||||
onion_tor_socks_proxy='localhost:9050')
|
|
||||||
proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
|
|
||||||
stats_db=stats_db, options=options)
|
|
||||||
options.port = proxy.server_port
|
|
||||||
|
|
||||||
options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
|
|
||||||
|
|
||||||
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
|
|
||||||
f.close()
|
|
||||||
playback_index_db_file = f.name
|
|
||||||
playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file)
|
|
||||||
playback_proxy = warcprox.playback.PlaybackProxy(ca=ca,
|
|
||||||
playback_index_db=playback_index_db, options=options)
|
|
||||||
options.playback_proxy = playback_proxy.server_port
|
|
||||||
|
|
||||||
options.method_filter = ['GET','POST']
|
|
||||||
|
|
||||||
options.crawl_log_dir = tempfile.mkdtemp(
|
|
||||||
prefix='warcprox-test-', suffix='-crawl-log')
|
|
||||||
crawl_logger = warcprox.crawl_log.CrawlLogger(options.crawl_log_dir)
|
|
||||||
|
|
||||||
writer_pool = warcprox.writer.WarcWriterPool(options)
|
|
||||||
warc_writer_threads = [
|
|
||||||
warcprox.writerthread.WarcWriterThread(
|
|
||||||
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
|
|
||||||
dedup_db=dedup_db, listeners=[
|
|
||||||
captures_db or dedup_db, playback_index_db, stats_db,
|
|
||||||
crawl_logger], options=options)
|
|
||||||
for i in range(int(proxy.max_threads ** 0.5))]
|
|
||||||
|
|
||||||
warcprox_ = warcprox.controller.WarcproxController(
|
|
||||||
proxy=proxy, warc_writer_threads=warc_writer_threads,
|
|
||||||
playback_proxy=playback_proxy, service_registry=service_registry,
|
|
||||||
options=options)
|
|
||||||
logging.info('starting warcprox')
|
logging.info('starting warcprox')
|
||||||
warcprox_thread = threading.Thread(name='WarcproxThread',
|
warcprox_thread = threading.Thread(
|
||||||
target=warcprox_.run_until_shutdown)
|
name='WarcproxThread', target=warcprox_.run_until_shutdown)
|
||||||
warcprox_thread.start()
|
warcprox_thread.start()
|
||||||
|
|
||||||
def fin():
|
def fin():
|
||||||
logging.info('stopping warcprox')
|
|
||||||
warcprox_.stop.set()
|
warcprox_.stop.set()
|
||||||
warcprox_thread.join()
|
warcprox_thread.join()
|
||||||
for f in (ca_file, ca_dir, options.directory, playback_index_db_file,
|
if rethinkdb_servers:
|
||||||
options.crawl_log_dir):
|
logging.info('dropping rethinkdb database %r', rethinkdb_db)
|
||||||
if os.path.isdir(f):
|
rr = doublethink.Rethinker(rethinkdb_servers)
|
||||||
logging.info('deleting directory {}'.format(f))
|
result = rr.db_drop(rethinkdb_db).run()
|
||||||
shutil.rmtree(f)
|
logging.info('deleting working directory %r', work_dir)
|
||||||
else:
|
os.chdir(orig_dir)
|
||||||
logging.info('deleting file {}'.format(f))
|
shutil.rmtree(work_dir)
|
||||||
os.unlink(f)
|
|
||||||
request.addfinalizer(fin)
|
request.addfinalizer(fin)
|
||||||
|
|
||||||
return warcprox_
|
return warcprox_
|
||||||
@ -683,6 +556,22 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
|
|||||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||||
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
|
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
|
||||||
|
|
||||||
|
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
|
||||||
|
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
|
||||||
|
request_meta = {"accept": ["capture-metadata"]}
|
||||||
|
headers = {"Warcprox-Meta": json.dumps(request_meta)}
|
||||||
|
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.headers['Warcprox-Meta']
|
||||||
|
data = json.loads(response.headers['Warcprox-Meta'])
|
||||||
|
assert data['capture-metadata']
|
||||||
|
try:
|
||||||
|
dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'],
|
||||||
|
'%Y-%m-%dT%H:%M:%SZ')
|
||||||
|
assert dt
|
||||||
|
except ValueError:
|
||||||
|
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
|
||||||
|
|
||||||
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
|
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
|
||||||
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
|
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
|
||||||
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
|
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
|
||||||
@ -1233,9 +1122,8 @@ def test_method_filter(
|
|||||||
assert response.content == payload
|
assert response.content == payload
|
||||||
|
|
||||||
def test_dedup_ok_flag(
|
def test_dedup_ok_flag(
|
||||||
https_daemon, http_daemon, warcprox_, archiving_proxies,
|
https_daemon, http_daemon, warcprox_, archiving_proxies):
|
||||||
rethinkdb_big_table):
|
if not warcprox_.options.rethinkdb_big_table:
|
||||||
if not rethinkdb_big_table:
|
|
||||||
# this feature is n/a unless using rethinkdb big table
|
# this feature is n/a unless using rethinkdb big table
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -1319,11 +1207,11 @@ def test_status_api(warcprox_):
|
|||||||
assert status['pid'] == os.getpid()
|
assert status['pid'] == os.getpid()
|
||||||
assert status['threads'] == warcprox_.proxy.pool._max_workers
|
assert status['threads'] == warcprox_.proxy.pool._max_workers
|
||||||
|
|
||||||
def test_svcreg_status(warcprox_, service_registry):
|
def test_svcreg_status(warcprox_):
|
||||||
if service_registry:
|
if warcprox_.service_registry:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while time.time() - start < 15:
|
while time.time() - start < 15:
|
||||||
status = service_registry.available_service('warcprox')
|
status = warcprox_.service_registry.available_service('warcprox')
|
||||||
if status:
|
if status:
|
||||||
break
|
break
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
@ -1380,11 +1268,11 @@ def test_controller_with_defaults():
|
|||||||
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
|
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
|
||||||
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
|
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
|
||||||
|
|
||||||
def test_choose_a_port_for_me(service_registry):
|
def test_choose_a_port_for_me(warcprox_):
|
||||||
options = warcprox.Options()
|
options = warcprox.Options()
|
||||||
options.port = 0
|
options.port = 0
|
||||||
controller = warcprox.controller.WarcproxController(
|
controller = warcprox.controller.WarcproxController(
|
||||||
service_registry=service_registry, options=options)
|
service_registry=warcprox_.service_registry, options=options)
|
||||||
assert controller.proxy.server_port != 0
|
assert controller.proxy.server_port != 0
|
||||||
assert controller.proxy.server_port != 8000
|
assert controller.proxy.server_port != 8000
|
||||||
assert controller.proxy.server_address == (
|
assert controller.proxy.server_address == (
|
||||||
@ -1400,12 +1288,12 @@ def test_choose_a_port_for_me(service_registry):
|
|||||||
status = json.loads(response.content.decode('ascii'))
|
status = json.loads(response.content.decode('ascii'))
|
||||||
assert status['port'] == controller.proxy.server_port
|
assert status['port'] == controller.proxy.server_port
|
||||||
|
|
||||||
if service_registry:
|
if warcprox_.service_registry:
|
||||||
# check that service registry entry lists the correct port
|
# check that service registry entry lists the correct port
|
||||||
start = time.time()
|
start = time.time()
|
||||||
ports = []
|
ports = []
|
||||||
while time.time() - start < 30:
|
while time.time() - start < 30:
|
||||||
svcs = service_registry.available_services('warcprox')
|
svcs = warcprox_.service_registry.available_services('warcprox')
|
||||||
ports = [svc['port'] for svc in svcs]
|
ports = [svc['port'] for svc in svcs]
|
||||||
if controller.proxy.server_port in ports:
|
if controller.proxy.server_port in ports:
|
||||||
break
|
break
|
||||||
@ -1552,6 +1440,44 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
|
|||||||
'contentSize', 'warcFilename', 'warcFileOffset'}
|
'contentSize', 'warcFilename', 'warcFileOffset'}
|
||||||
assert extra_info['contentSize'] == 145
|
assert extra_info['contentSize'] == 145
|
||||||
|
|
||||||
|
def test_long_warcprox_meta(
|
||||||
|
warcprox_, http_daemon, archiving_proxies, playback_proxies):
|
||||||
|
url = 'http://localhost:%s/b/g' % http_daemon.server_port
|
||||||
|
|
||||||
|
# create a very long warcprox-meta header
|
||||||
|
headers = {'Warcprox-Meta': json.dumps({
|
||||||
|
'x':'y'*1000000, 'warc-prefix': 'test_long_warcprox_meta'})}
|
||||||
|
response = requests.get(
|
||||||
|
url, proxies=archiving_proxies, headers=headers, verify=False)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
# wait for writer thread to process
|
||||||
|
time.sleep(0.5)
|
||||||
|
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
|
||||||
|
time.sleep(0.5)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# check that warcprox-meta was parsed and honored ("warc-prefix" param)
|
||||||
|
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
|
||||||
|
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
|
||||||
|
warc_path = os.path.join(writer.directory, writer._f_finalname)
|
||||||
|
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
|
||||||
|
assert os.path.exists(warc_path)
|
||||||
|
|
||||||
|
# read the warc
|
||||||
|
with open(warc_path, 'rb') as f:
|
||||||
|
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
|
||||||
|
record = next(rec_iter)
|
||||||
|
assert record.rec_type == 'warcinfo'
|
||||||
|
record = next(rec_iter)
|
||||||
|
assert record.rec_type == 'response'
|
||||||
|
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||||
|
record = next(rec_iter)
|
||||||
|
assert record.rec_type == 'request'
|
||||||
|
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||||
|
with pytest.raises(StopIteration):
|
||||||
|
next(rec_iter)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
pytest.main()
|
pytest.main()
|
||||||
|
|
||||||
|
57
tests/test_writer.py
Normal file
57
tests/test_writer.py
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
import os
|
||||||
|
import fcntl
|
||||||
|
from multiprocessing import Process, Queue
|
||||||
|
from datetime import datetime
|
||||||
|
import pytest
|
||||||
|
from warcprox.mitmproxy import ProxyingRecorder
|
||||||
|
from warcprox.warcproxy import RecordedUrl
|
||||||
|
from warcprox.writer import WarcWriter
|
||||||
|
from warcprox import Options
|
||||||
|
|
||||||
|
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
|
||||||
|
|
||||||
|
recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain',
|
||||||
|
status=200, client_ip='127.0.0.2',
|
||||||
|
request_data=b'abc',
|
||||||
|
response_recorder=recorder,
|
||||||
|
remote_ip='127.0.0.3',
|
||||||
|
timestamp=datetime.utcnow())
|
||||||
|
|
||||||
|
|
||||||
|
def lock_file(queue, filename):
|
||||||
|
"""Try to lock file and return 1 if successful, else return 0.
|
||||||
|
It is necessary to run this method in a different process to test locking.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
fi = open(filename, 'ab')
|
||||||
|
fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
fi.close()
|
||||||
|
queue.put('OBTAINED LOCK')
|
||||||
|
except IOError:
|
||||||
|
queue.put('FAILED TO OBTAIN LOCK')
|
||||||
|
|
||||||
|
|
||||||
|
def test_warc_writer_locking(tmpdir):
|
||||||
|
"""Test if WarcWriter is locking WARC files.
|
||||||
|
When we don't have the .open suffix, WarcWriter locks the file and the
|
||||||
|
external process trying to ``lock_file`` fails (result=0).
|
||||||
|
"""
|
||||||
|
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
|
||||||
|
wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True))
|
||||||
|
wwriter.write_records(recorded_url)
|
||||||
|
warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
|
||||||
|
assert warcs
|
||||||
|
target_warc = os.path.join(dirname, warcs[0])
|
||||||
|
# launch another process and try to lock WARC file
|
||||||
|
queue = Queue()
|
||||||
|
p = Process(target=lock_file, args=(queue, target_warc))
|
||||||
|
p.start()
|
||||||
|
p.join()
|
||||||
|
assert queue.get() == 'FAILED TO OBTAIN LOCK'
|
||||||
|
wwriter.close_writer()
|
||||||
|
|
||||||
|
# locking must succeed after writer has closed the WARC file.
|
||||||
|
p = Process(target=lock_file, args=(queue, target_warc))
|
||||||
|
p.start()
|
||||||
|
p.join()
|
||||||
|
assert queue.get() == 'OBTAINED LOCK'
|
@ -25,8 +25,6 @@ USA.
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from hanzo import warctools
|
|
||||||
import random
|
|
||||||
import warcprox
|
import warcprox
|
||||||
import base64
|
import base64
|
||||||
import urlcanon
|
import urlcanon
|
||||||
@ -115,6 +113,7 @@ class RethinkCaptures:
|
|||||||
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
|
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
|
||||||
self.rr.table(self.table).index_create("sha1_warc_type", [
|
self.rr.table(self.table).index_create("sha1_warc_type", [
|
||||||
r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run()
|
r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run()
|
||||||
|
self.rr.table(self.table).index_wait().run()
|
||||||
|
|
||||||
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
|
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
|
||||||
if algo != "sha1":
|
if algo != "sha1":
|
||||||
@ -221,7 +220,7 @@ class RethinkCapturesDedup:
|
|||||||
self.captures_db = captures_db
|
self.captures_db = captures_db
|
||||||
self.options = options
|
self.options = options
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket="__unspecified__"):
|
def lookup(self, digest_key, bucket="__unspecified__", url=None):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
algo, value_str = k.split(":")
|
algo, value_str = k.split(":")
|
||||||
if self.options.base32:
|
if self.options.base32:
|
||||||
|
@ -176,7 +176,7 @@ class WarcproxController(object):
|
|||||||
assert(all(
|
assert(all(
|
||||||
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
|
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
|
||||||
for wwt in self.warc_writer_threads))
|
for wwt in self.warc_writer_threads))
|
||||||
if self.warc_writer_threads[0].dedup_db:
|
if any((t.dedup_db for t in self.warc_writer_threads)):
|
||||||
self.warc_writer_threads[0].dedup_db.start()
|
self.warc_writer_threads[0].dedup_db.start()
|
||||||
|
|
||||||
for wwt in self.warc_writer_threads:
|
for wwt in self.warc_writer_threads:
|
||||||
@ -211,8 +211,6 @@ class WarcproxController(object):
|
|||||||
|
|
||||||
if self.proxy.stats_db:
|
if self.proxy.stats_db:
|
||||||
self.proxy.stats_db.stop()
|
self.proxy.stats_db.stop()
|
||||||
if self.warc_writer_threads[0].dedup_db:
|
|
||||||
self.warc_writer_threads[0].dedup_db.close()
|
|
||||||
|
|
||||||
self.proxy_thread.join()
|
self.proxy_thread.join()
|
||||||
if self.playback_proxy is not None:
|
if self.playback_proxy is not None:
|
||||||
|
@ -21,13 +21,17 @@ USA.
|
|||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
import random
|
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import urllib3
|
||||||
|
from urllib3.exceptions import HTTPError
|
||||||
|
|
||||||
|
urllib3.disable_warnings()
|
||||||
|
|
||||||
class DedupDb(object):
|
class DedupDb(object):
|
||||||
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
||||||
@ -55,15 +59,6 @@ class DedupDb(object):
|
|||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def sync(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def save(self, digest_key, response_record, bucket=""):
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
@ -82,7 +77,7 @@ class DedupDb(object):
|
|||||||
conn.close()
|
conn.close()
|
||||||
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket=""):
|
def lookup(self, digest_key, bucket="", url=None):
|
||||||
result = None
|
result = None
|
||||||
key = digest_key.decode('utf-8') + '|' + bucket
|
key = digest_key.decode('utf-8') + '|' + bucket
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
@ -117,9 +112,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
|
|||||||
and recorded_url.response_recorder.payload_size() > 0):
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
||||||
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
||||||
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
|
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
|
||||||
|
recorded_url.url)
|
||||||
else:
|
else:
|
||||||
recorded_url.dedup_info = dedup_db.lookup(digest_key)
|
recorded_url.dedup_info = dedup_db.lookup(digest_key,
|
||||||
|
url=recorded_url.url)
|
||||||
|
|
||||||
class RethinkDedupDb:
|
class RethinkDedupDb:
|
||||||
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
||||||
@ -151,15 +148,6 @@ class RethinkDedupDb:
|
|||||||
def start(self):
|
def start(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def sync(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def save(self, digest_key, response_record, bucket=""):
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
k = "{}|{}".format(k, bucket)
|
k = "{}|{}".format(k, bucket)
|
||||||
@ -173,7 +161,7 @@ class RethinkDedupDb:
|
|||||||
raise Exception("unexpected result %s saving %s", result, record)
|
raise Exception("unexpected result %s saving %s", result, record)
|
||||||
self.logger.debug('dedup db saved %s:%s', k, record)
|
self.logger.debug('dedup db saved %s:%s', k, record)
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket=""):
|
def lookup(self, digest_key, bucket="", url=None):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
k = "{}|{}".format(k, bucket)
|
k = "{}|{}".format(k, bucket)
|
||||||
result = self.rr.table(self.table).get(k).run()
|
result = self.rr.table(self.table).get(k).run()
|
||||||
@ -193,3 +181,66 @@ class RethinkDedupDb:
|
|||||||
else:
|
else:
|
||||||
self.save(digest_key, records[0])
|
self.save(digest_key, records[0])
|
||||||
|
|
||||||
|
|
||||||
|
class CdxServerDedup(object):
|
||||||
|
"""Query a CDX server to perform deduplication.
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
|
||||||
|
http_pool = urllib3.PoolManager()
|
||||||
|
|
||||||
|
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
|
||||||
|
options=warcprox.Options()):
|
||||||
|
self.cdx_url = cdx_url
|
||||||
|
self.options = options
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
|
"""Does not apply to CDX server, as it is obviously read-only.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def lookup(self, digest_key, url):
|
||||||
|
"""Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be
|
||||||
|
computed on the original content, after decoding Content-Encoding and
|
||||||
|
Transfer-Encoding, if any), if they match, write a revisit record.
|
||||||
|
|
||||||
|
Get only the last item (limit=-1) because Wayback Machine has special
|
||||||
|
performance optimisation to handle that. limit < 0 is very inefficient
|
||||||
|
in general. Maybe it could be configurable in the future.
|
||||||
|
|
||||||
|
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
|
||||||
|
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
:param url: Target URL string
|
||||||
|
Result must contain:
|
||||||
|
{"url": <URL>, "date": "%Y-%m-%dT%H:%M:%SZ"}
|
||||||
|
"""
|
||||||
|
u = url.decode("utf-8") if isinstance(url, bytes) else url
|
||||||
|
try:
|
||||||
|
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
|
||||||
|
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
|
||||||
|
limit=-1))
|
||||||
|
assert result.status == 200
|
||||||
|
if isinstance(digest_key, bytes):
|
||||||
|
dkey = digest_key
|
||||||
|
else:
|
||||||
|
dkey = digest_key.encode('utf-8')
|
||||||
|
dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey
|
||||||
|
line = result.data.strip()
|
||||||
|
if line:
|
||||||
|
(cdx_ts, cdx_digest) = line.split(b' ')
|
||||||
|
if cdx_digest == dkey:
|
||||||
|
dt = datetime.strptime(cdx_ts.decode('ascii'),
|
||||||
|
'%Y%m%d%H%M%S')
|
||||||
|
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
|
||||||
|
return dict(url=url, date=date)
|
||||||
|
except (HTTPError, AssertionError, ValueError) as exc:
|
||||||
|
self.logger.error('CdxServerDedup request failed for url=%s %s',
|
||||||
|
url, exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def notify(self, recorded_url, records):
|
||||||
|
"""Since we don't save anything to CDX server, this does not apply.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
@ -39,7 +39,6 @@ import signal
|
|||||||
import threading
|
import threading
|
||||||
import certauth.certauth
|
import certauth.certauth
|
||||||
import warcprox
|
import warcprox
|
||||||
import re
|
|
||||||
import doublethink
|
import doublethink
|
||||||
import cryptography.hazmat.backends.openssl
|
import cryptography.hazmat.backends.openssl
|
||||||
import importlib
|
import importlib
|
||||||
@ -79,11 +78,13 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
default='./warcs', help='where to write warcs')
|
default='./warcs', help='where to write warcs')
|
||||||
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
|
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
|
||||||
help='write gzip-compressed warc records')
|
help='write gzip-compressed warc records')
|
||||||
|
arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix',
|
||||||
|
default=False, action='store_true', help=argparse.SUPPRESS)
|
||||||
arg_parser.add_argument('-n', '--prefix', dest='prefix',
|
arg_parser.add_argument('-n', '--prefix', dest='prefix',
|
||||||
default='WARCPROX', help='WARC filename prefix')
|
default='WARCPROX', help='WARC filename prefix')
|
||||||
arg_parser.add_argument('-s', '--size', dest='size',
|
arg_parser.add_argument(
|
||||||
default=1000*1000*1000, type=int,
|
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
|
||||||
help='WARC file rollover size threshold in bytes')
|
type=int, help='WARC file rollover size threshold in bytes')
|
||||||
arg_parser.add_argument('--rollover-idle-time',
|
arg_parser.add_argument('--rollover-idle-time',
|
||||||
dest='rollover_idle_time', default=None, type=int,
|
dest='rollover_idle_time', default=None, type=int,
|
||||||
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
|
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
|
||||||
@ -107,6 +108,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
group = arg_parser.add_mutually_exclusive_group()
|
group = arg_parser.add_mutually_exclusive_group()
|
||||||
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
||||||
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
||||||
|
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
|
||||||
|
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
|
||||||
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
||||||
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
||||||
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
|
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
|
||||||
@ -195,6 +198,9 @@ def init_controller(args):
|
|||||||
else:
|
else:
|
||||||
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
|
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
|
||||||
listeners.append(dedup_db)
|
listeners.append(dedup_db)
|
||||||
|
elif args.cdxserver_dedup:
|
||||||
|
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
|
||||||
|
listeners.append(dedup_db)
|
||||||
elif args.dedup_db_file in (None, '', '/dev/null'):
|
elif args.dedup_db_file in (None, '', '/dev/null'):
|
||||||
logging.info('deduplication disabled')
|
logging.info('deduplication disabled')
|
||||||
dedup_db = None
|
dedup_db = None
|
||||||
@ -225,9 +231,7 @@ def init_controller(args):
|
|||||||
playback_index_db = warcprox.playback.PlaybackIndexDb(
|
playback_index_db = warcprox.playback.PlaybackIndexDb(
|
||||||
args.playback_index_db_file, options=options)
|
args.playback_index_db_file, options=options)
|
||||||
playback_proxy = warcprox.playback.PlaybackProxy(
|
playback_proxy = warcprox.playback.PlaybackProxy(
|
||||||
server_address=(args.address, args.playback_port), ca=ca,
|
ca=ca, playback_index_db=playback_index_db, options=options)
|
||||||
playback_index_db=playback_index_db, warcs_dir=args.directory,
|
|
||||||
options=options)
|
|
||||||
listeners.append(playback_index_db)
|
listeners.append(playback_index_db)
|
||||||
else:
|
else:
|
||||||
playback_index_db = None
|
playback_index_db = None
|
||||||
@ -306,7 +310,11 @@ def main(argv=sys.argv):
|
|||||||
|
|
||||||
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
|
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
|
||||||
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
|
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
|
||||||
|
try:
|
||||||
signal.signal(signal.SIGQUIT, dump_state)
|
signal.signal(signal.SIGQUIT, dump_state)
|
||||||
|
except AttributeError:
|
||||||
|
# SIGQUIT does not exist on some platforms (windows)
|
||||||
|
pass
|
||||||
|
|
||||||
controller.run_until_shutdown()
|
controller.run_until_shutdown()
|
||||||
|
|
||||||
|
@ -37,8 +37,15 @@ except ImportError:
|
|||||||
import urlparse as urllib_parse
|
import urlparse as urllib_parse
|
||||||
try:
|
try:
|
||||||
import http.client as http_client
|
import http.client as http_client
|
||||||
|
# In python3 http.client.parse_headers() enforces http_client._MAXLINE
|
||||||
|
# as max length of an http header line, but we want to support very
|
||||||
|
# long warcprox-meta headers, so we tweak it here. Python2 doesn't seem
|
||||||
|
# to enforce any limit. Multiline headers could be an option but it
|
||||||
|
# turns out those are illegal as of RFC 7230. Plus, this is easier.
|
||||||
|
http_client._MAXLINE = 4194304 # 4 MiB
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import httplib as http_client
|
import httplib as http_client
|
||||||
|
import json
|
||||||
import socket
|
import socket
|
||||||
import logging
|
import logging
|
||||||
import ssl
|
import ssl
|
||||||
@ -157,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
|
|||||||
self.fp, proxy_client, digest_algorithm, url=url)
|
self.fp, proxy_client, digest_algorithm, url=url)
|
||||||
self.fp = self.recorder
|
self.fp = self.recorder
|
||||||
|
|
||||||
def begin(self):
|
def begin(self, extra_response_headers={}):
|
||||||
http_client.HTTPResponse.begin(self) # reads status line, headers
|
http_client.HTTPResponse.begin(self) # reads status line, headers
|
||||||
|
|
||||||
status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(
|
status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(
|
||||||
self.status, self.reason)
|
self.status, self.reason)
|
||||||
self.msg['Via'] = via_header_value(
|
self.msg['Via'] = via_header_value(
|
||||||
self.msg.get('Via'), '%0.1f' % (self.version / 10.0))
|
self.msg.get('Via'), '%0.1f' % (self.version / 10.0))
|
||||||
|
if extra_response_headers:
|
||||||
|
for header, value in extra_response_headers.items():
|
||||||
|
self.msg[header] = value
|
||||||
|
|
||||||
for k,v in self.msg.items():
|
for k,v in self.msg.items():
|
||||||
if k.lower() not in (
|
if k.lower() not in (
|
||||||
'connection', 'proxy-connection', 'keep-alive',
|
'connection', 'proxy-connection', 'keep-alive',
|
||||||
@ -355,12 +366,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
self.logger.error("exception proxying request", exc_info=True)
|
self.logger.error("exception proxying request", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _proxy_request(self):
|
def _proxy_request(self, extra_response_headers={}):
|
||||||
'''
|
'''
|
||||||
Sends the request to the remote server, then uses a ProxyingRecorder to
|
Sends the request to the remote server, then uses a ProxyingRecorder to
|
||||||
read the response and send it to the proxy client, while recording the
|
read the response and send it to the proxy client, while recording the
|
||||||
bytes in transit. Returns a tuple (request, response) where request is
|
bytes in transit. Returns a tuple (request, response) where request is
|
||||||
the raw request bytes, and response is a ProxyingRecorder.
|
the raw request bytes, and response is a ProxyingRecorder.
|
||||||
|
|
||||||
|
:param extra_response_headers: generated on warcprox._proxy_request.
|
||||||
|
It may contain extra HTTP headers such as ``Warcprox-Meta`` which
|
||||||
|
are written in the WARC record for this request.
|
||||||
'''
|
'''
|
||||||
# Build request
|
# Build request
|
||||||
req_str = '{} {} {}\r\n'.format(
|
req_str = '{} {} {}\r\n'.format(
|
||||||
@ -401,7 +416,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
self._remote_server_sock, proxy_client=self.connection,
|
self._remote_server_sock, proxy_client=self.connection,
|
||||||
digest_algorithm=self.server.digest_algorithm,
|
digest_algorithm=self.server.digest_algorithm,
|
||||||
url=self.url, method=self.command)
|
url=self.url, method=self.command)
|
||||||
prox_rec_res.begin()
|
prox_rec_res.begin(extra_response_headers=extra_response_headers)
|
||||||
|
|
||||||
buf = prox_rec_res.read(8192)
|
buf = prox_rec_res.read(8192)
|
||||||
while buf != b'':
|
while buf != b'':
|
||||||
|
@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
|
|
||||||
|
|
||||||
def _send_headers_and_refd_payload(
|
def _send_headers_and_refd_payload(
|
||||||
self, headers, refers_to, refers_to_target_uri, refers_to_date):
|
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
|
||||||
|
"""Parameters:
|
||||||
|
|
||||||
|
"""
|
||||||
location = self.server.playback_index_db.lookup_exact(
|
location = self.server.playback_index_db.lookup_exact(
|
||||||
refers_to_target_uri, refers_to_date, record_id=refers_to)
|
refers_to_target_uri, refers_to_date, payload_digest)
|
||||||
self.logger.debug('loading http payload from {}'.format(location))
|
self.logger.debug('loading http payload from {}'.format(location))
|
||||||
|
|
||||||
fh = self._open_warc_at_offset(location['f'], location['o'])
|
fh = self._open_warc_at_offset(location['f'], location['o'])
|
||||||
@ -131,7 +134,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
|
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
|
||||||
|
|
||||||
warc_type = record.get_header(warctools.WarcRecord.TYPE)
|
warc_type = record.get_header(warctools.WarcRecord.TYPE)
|
||||||
if warc_type != warctools.WarcRecord.RESPONSE:
|
if warc_type != warctools.WarcRecord.RESPONSE:
|
||||||
@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
|
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
|
||||||
raise Exception('unknown revisit record profile {}'.format(warc_profile))
|
raise Exception('unknown revisit record profile {}'.format(warc_profile))
|
||||||
|
|
||||||
refers_to = record.get_header(
|
|
||||||
warctools.WarcRecord.REFERS_TO).decode('latin1')
|
|
||||||
refers_to_target_uri = record.get_header(
|
refers_to_target_uri = record.get_header(
|
||||||
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
|
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
|
||||||
'latin1')
|
'latin1')
|
||||||
refers_to_date = record.get_header(
|
refers_to_date = record.get_header(
|
||||||
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
|
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
|
||||||
|
payload_digest = record.get_header(
|
||||||
|
warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'revisit record references %s:%s capture of %s',
|
'revisit record references %s:%s capture of %s',
|
||||||
refers_to_date, refers_to, refers_to_target_uri)
|
refers_to_date, payload_digest, refers_to_target_uri)
|
||||||
return self._send_headers_and_refd_payload(
|
return self._send_headers_and_refd_payload(
|
||||||
record.content[1], refers_to, refers_to_target_uri,
|
record.content[1], refers_to_target_uri, refers_to_date,
|
||||||
refers_to_date)
|
payload_digest)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# send it back raw, whatever it is
|
# send it back raw, whatever it is
|
||||||
@ -210,7 +212,6 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
|
class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
|
||||||
logger = logging.getLogger("warcprox.playback.PlaybackProxy")
|
logger = logging.getLogger("warcprox.playback.PlaybackProxy")
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, ca=None, playback_index_db=None, options=warcprox.Options()):
|
def __init__(self, ca=None, playback_index_db=None, options=warcprox.Options()):
|
||||||
server_address = (options.address or 'localhost', options.playback_port if options.playback_port is not None else 8001)
|
server_address = (options.address or 'localhost', options.playback_port if options.playback_port is not None else 8001)
|
||||||
http_server.HTTPServer.__init__(self, server_address, PlaybackProxyHandler, bind_and_activate=True)
|
http_server.HTTPServer.__init__(self, server_address, PlaybackProxyHandler, bind_and_activate=True)
|
||||||
@ -231,7 +232,7 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
|
|||||||
class PlaybackIndexDb(object):
|
class PlaybackIndexDb(object):
|
||||||
logger = logging.getLogger("warcprox.playback.PlaybackIndexDb")
|
logger = logging.getLogger("warcprox.playback.PlaybackIndexDb")
|
||||||
|
|
||||||
def __init__(self, file='./warcprox.sqlite'):
|
def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()):
|
||||||
self.file = file
|
self.file = file
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
@ -265,12 +266,12 @@ class PlaybackIndexDb(object):
|
|||||||
# XXX canonicalize url?
|
# XXX canonicalize url?
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
|
||||||
|
|
||||||
# there could be two visits of same url in the same second, and WARC-Date is
|
# there could be two visits of same url in the same second, and WARC-Date is
|
||||||
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
|
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
|
||||||
|
|
||||||
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
|
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...}
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
@ -284,10 +285,10 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
if date_str in py_value:
|
if date_str in py_value:
|
||||||
py_value[date_str].append(
|
py_value[date_str].append(
|
||||||
{'f':warcfile, 'o':offset, 'i':record_id_str})
|
{'f': warcfile, 'o': offset, 'd': payload_digest_str})
|
||||||
else:
|
else:
|
||||||
py_value[date_str] = [
|
py_value[date_str] = [
|
||||||
{'f':warcfile, 'o':offset, 'i':record_id_str}]
|
{'f': warcfile, 'o': offset, 'd': payload_digest_str}]
|
||||||
|
|
||||||
json_value = json.dumps(py_value, separators=(',',':'))
|
json_value = json.dumps(py_value, separators=(',',':'))
|
||||||
|
|
||||||
@ -315,11 +316,11 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
latest_date = max(py_value)
|
latest_date = max(py_value)
|
||||||
result = py_value[latest_date][0]
|
result = py_value[latest_date][0]
|
||||||
result['i'] = result['i'].encode('ascii')
|
result['d'] = result['d'].encode('ascii')
|
||||||
return latest_date, result
|
return latest_date, result
|
||||||
|
|
||||||
# in python3 params are bytes
|
# in python3 params are bytes
|
||||||
def lookup_exact(self, url, warc_date, record_id):
|
def lookup_exact(self, url, warc_date, payload_digest):
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
cursor = conn.execute(
|
cursor = conn.execute(
|
||||||
'select value from playback where url = ?', (url,))
|
'select value from playback where url = ?', (url,))
|
||||||
@ -335,14 +336,13 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
if warc_date in py_value:
|
if warc_date in py_value:
|
||||||
for record in py_value[warc_date]:
|
for record in py_value[warc_date]:
|
||||||
if record['i'] == record_id:
|
if record['d'] == payload_digest:
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"found exact match for (%r,%r,%r)",
|
"found exact match for (%r,%r,%r)",
|
||||||
warc_date, record_id, url)
|
warc_date, payload_digest, url)
|
||||||
record['i'] = record['i']
|
record['d'] = record['d']
|
||||||
return record
|
return record
|
||||||
else:
|
else:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"match not found for (%r,%r,%r)", warc_date, record_id, url)
|
"match not found for (%r,%r,%r)", warc_date, payload_digest, url)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -25,7 +25,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import random
|
|
||||||
import warcprox
|
import warcprox
|
||||||
import threading
|
import threading
|
||||||
import rethinkdb as r
|
import rethinkdb as r
|
||||||
|
@ -27,7 +27,6 @@ import hashlib
|
|||||||
import socket
|
import socket
|
||||||
import hanzo.httptools
|
import hanzo.httptools
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
class WarcRecordBuilder:
|
class WarcRecordBuilder:
|
||||||
@ -51,7 +50,7 @@ class WarcRecordBuilder:
|
|||||||
url=recorded_url.url, warc_date=warc_date,
|
url=recorded_url.url, warc_date=warc_date,
|
||||||
data=response_header_block,
|
data=response_header_block,
|
||||||
warc_type=warctools.WarcRecord.REVISIT,
|
warc_type=warctools.WarcRecord.REVISIT,
|
||||||
refers_to=recorded_url.dedup_info['id'],
|
refers_to=recorded_url.dedup_info.get('id'),
|
||||||
refers_to_target_uri=recorded_url.dedup_info['url'],
|
refers_to_target_uri=recorded_url.dedup_info['url'],
|
||||||
refers_to_date=recorded_url.dedup_info['date'],
|
refers_to_date=recorded_url.dedup_info['date'],
|
||||||
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
||||||
|
@ -35,15 +35,12 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
import Queue as queue
|
import Queue as queue
|
||||||
import logging
|
import logging
|
||||||
import re
|
|
||||||
import traceback
|
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
from certauth.certauth import CertificateAuthority
|
from certauth.certauth import CertificateAuthority
|
||||||
import warcprox
|
import warcprox
|
||||||
import datetime
|
import datetime
|
||||||
import ipaddress
|
|
||||||
import urlcanon
|
import urlcanon
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@ -195,9 +192,22 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
|
|
||||||
remote_ip = self._remote_server_sock.getpeername()[0]
|
remote_ip = self._remote_server_sock.getpeername()[0]
|
||||||
timestamp = datetime.datetime.utcnow()
|
timestamp = datetime.datetime.utcnow()
|
||||||
|
extra_response_headers = {}
|
||||||
|
if warcprox_meta and 'accept' in warcprox_meta and \
|
||||||
|
'capture-metadata' in warcprox_meta['accept']:
|
||||||
|
rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}}
|
||||||
|
extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:')
|
||||||
|
|
||||||
req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request(
|
req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request(
|
||||||
self)
|
self, extra_response_headers=extra_response_headers)
|
||||||
|
|
||||||
|
content_type = None
|
||||||
|
try:
|
||||||
|
content_type = prox_rec_res.headers.get('content-type')
|
||||||
|
except AttributeError: # py2
|
||||||
|
raw = prox_rec_res.msg.getrawheader('content-type')
|
||||||
|
if raw:
|
||||||
|
content_type = raw.strip()
|
||||||
|
|
||||||
recorded_url = RecordedUrl(
|
recorded_url = RecordedUrl(
|
||||||
url=self.url, request_data=req,
|
url=self.url, request_data=req,
|
||||||
@ -205,9 +215,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
warcprox_meta=warcprox_meta, status=prox_rec_res.status,
|
warcprox_meta=warcprox_meta, status=prox_rec_res.status,
|
||||||
size=prox_rec_res.recorder.len,
|
size=prox_rec_res.recorder.len,
|
||||||
client_ip=self.client_address[0],
|
client_ip=self.client_address[0],
|
||||||
content_type=prox_rec_res.getheader("Content-Type"),
|
content_type=content_type, method=self.command,
|
||||||
method=self.command, timestamp=timestamp, host=self.hostname,
|
timestamp=timestamp, host=self.hostname,
|
||||||
duration=datetime.datetime.utcnow()-timestamp,
|
duration=datetime.datetime.utcnow() - timestamp,
|
||||||
referer=self.headers.get('referer'))
|
referer=self.headers.get('referer'))
|
||||||
self.server.recorded_url_q.put(recorded_url)
|
self.server.recorded_url_q.put(recorded_url)
|
||||||
|
|
||||||
|
@ -24,10 +24,10 @@ from __future__ import absolute_import
|
|||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
|
import fcntl
|
||||||
import time
|
import time
|
||||||
import warcprox
|
import warcprox
|
||||||
import os
|
import os
|
||||||
import socket
|
|
||||||
import string
|
import string
|
||||||
import random
|
import random
|
||||||
import threading
|
import threading
|
||||||
@ -54,6 +54,7 @@ class WarcWriter:
|
|||||||
self._f = None
|
self._f = None
|
||||||
self._fpath = None
|
self._fpath = None
|
||||||
self._f_finalname = None
|
self._f_finalname = None
|
||||||
|
self._f_open_suffix = '' if options.no_warc_open_suffix else '.open'
|
||||||
self._serial = 0
|
self._serial = 0
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
@ -71,6 +72,12 @@ class WarcWriter:
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
if self._fpath:
|
if self._fpath:
|
||||||
self.logger.info('closing %s', self._f_finalname)
|
self.logger.info('closing %s', self._f_finalname)
|
||||||
|
if self._f_open_suffix == '':
|
||||||
|
try:
|
||||||
|
fcntl.lockf(self._f, fcntl.LOCK_UN)
|
||||||
|
except IOError as exc:
|
||||||
|
self.logger.error('could not unlock file %s (%s)',
|
||||||
|
self._fpath, exc)
|
||||||
self._f.close()
|
self._f.close()
|
||||||
finalpath = os.path.sep.join(
|
finalpath = os.path.sep.join(
|
||||||
[self.directory, self._f_finalname])
|
[self.directory, self._f_finalname])
|
||||||
@ -92,9 +99,17 @@ class WarcWriter:
|
|||||||
self.prefix, self.timestamp17(), self._serial,
|
self.prefix, self.timestamp17(), self._serial,
|
||||||
self._randomtoken, '.gz' if self.gzip else '')
|
self._randomtoken, '.gz' if self.gzip else '')
|
||||||
self._fpath = os.path.sep.join([
|
self._fpath = os.path.sep.join([
|
||||||
self.directory, self._f_finalname + '.open'])
|
self.directory, self._f_finalname + self._f_open_suffix])
|
||||||
|
|
||||||
self._f = open(self._fpath, 'wb')
|
self._f = open(self._fpath, 'wb')
|
||||||
|
# if no '.open' suffix is used for WARC, acquire an exclusive
|
||||||
|
# file lock.
|
||||||
|
if self._f_open_suffix == '':
|
||||||
|
try:
|
||||||
|
fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
except IOError as exc:
|
||||||
|
self.logger.error('could not lock file %s (%s)',
|
||||||
|
self._fpath, exc)
|
||||||
|
|
||||||
warcinfo_record = self.record_builder.build_warcinfo_record(
|
warcinfo_record = self.record_builder.build_warcinfo_record(
|
||||||
self._f_finalname)
|
self._f_finalname)
|
||||||
|
@ -29,13 +29,8 @@ except ImportError:
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import os
|
|
||||||
import hashlib
|
|
||||||
import time
|
import time
|
||||||
import socket
|
|
||||||
import base64
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import hanzo.httptools
|
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
import cProfile
|
import cProfile
|
||||||
@ -46,7 +41,7 @@ class WarcWriterThread(threading.Thread):
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, name='WarcWriterThread', recorded_url_q=None,
|
self, name='WarcWriterThread', recorded_url_q=None,
|
||||||
writer_pool=None, dedup_db=None, listeners=None,
|
writer_pool=None, dedup_db=None, listeners=[],
|
||||||
options=warcprox.Options()):
|
options=warcprox.Options()):
|
||||||
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
||||||
threading.Thread.__init__(self, name=name)
|
threading.Thread.__init__(self, name=name)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user