diff --git a/.travis.yml b/.travis.yml index 5f7d8b3..a1848b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,8 +34,8 @@ before_script: script: - py.test -v tests -- py.test -v --rethinkdb-servers=localhost tests -- py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests +- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests +- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests notifications: slack: diff --git a/setup.py b/setup.py index 89f37dc..47852c9 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', - 'doublethink>=0.2.0.dev81', + 'doublethink>=0.2.0.dev87', 'PySocks', ] try: diff --git a/tests/conftest.py b/tests/conftest.py index 27d4141..d130491 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,39 +1,41 @@ -# -# tests/conftest.py - command line options for warcprox tests -# -# Copyright (C) 2015-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +# vim: set fileencoding=utf-8: +''' +tests/conftest.py - command line options for warcprox tests + +Copyright (C) 2015-2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' import pytest def pytest_addoption(parser): - parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers', - help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - parser.addoption('--rethinkdb-big-table', - dest='rethinkdb_big_table', action='store_true', default=False, - help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') - -@pytest.fixture(scope="module") -def rethinkdb_servers(request): - return request.config.getoption("--rethinkdb-servers") - -@pytest.fixture(scope="module") -def rethinkdb_big_table(request): - return request.config.getoption("--rethinkdb-big-table") - + parser.addoption( + '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=( + 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_dedup_table')) + parser.addoption( + '--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=( + 'rethinkdb big table url (table will be populated with ' + 'various capture information and is suitable for use as ' + 'index for playback), e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/captures')) + parser.addoption( + '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=( + '🐷   url pointing to trough configuration rethinkdb database, ' + 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' + '/trough_configuration')) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..747f042 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -42,7 +42,9 @@ do && source /tmp/venv/bin/activate \ && pip --log-file /tmp/pip.log install . pytest requests warcio \ && py.test -v tests \ - && py.test -v --rethinkdb-servers=localhost tests \ - && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" + && py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \ + && py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \ + && py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/test3 tests \ + " done diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b24a5c8..53b6f71 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -242,7 +242,7 @@ def https_daemon(request, cert): return https_daemon @pytest.fixture(scope="module") -def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): +def warcprox_(request): orig_dir = os.getcwd() work_dir = tempfile.mkdtemp() logging.info('changing to working directory %r', work_dir) @@ -254,12 +254,15 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): '--port=0', '--playback-port=0', '--onion-tor-socks-proxy=localhost:9050'] - if rethinkdb_servers: - rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) - argv.append('--rethinkdb-db=%s' % rethinkdb_db) - if rethinkdb_big_table: - argv.append('--rethinkdb-big-table') + if request.config.getoption('--rethinkdb-dedup-url'): + argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) + # test these here only + argv.append('--rethinkdb-stats-url=rethinkdb://localhost/test0/stats') + argv.append('--rethinkdb-services-url=rethinkdb://localhost/test0/services') + elif request.config.getoption('--rethinkdb-big-table-url'): + argv.append('--rethinkdb-big-table-url=%s' % request.config.getoption('--rethinkdb-big-table-url')) + elif request.config.getoption('--rethinkdb-trough-db-url'): + argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url')) args = warcprox.main.parse_args(argv) warcprox_ = warcprox.main.init_controller(args) @@ -272,10 +275,22 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): def fin(): warcprox_.stop.set() warcprox_thread.join() - if rethinkdb_servers: - logging.info('dropping rethinkdb database %r', rethinkdb_db) - rr = doublethink.Rethinker(rethinkdb_servers) - result = rr.db_drop(rethinkdb_db).run() + for rethinkdb_url in ( + warcprox_.options.rethinkdb_big_table_url, + warcprox_.options.rethinkdb_dedup_url, + warcprox_.options.rethinkdb_services_url, + warcprox_.options.rethinkdb_stats_url): + if not rethinkdb_url: + continue + parsed = doublethink.parse_rethinkdb_url(rethinkdb_url) + rr = doublethink.Rethinker(servers=parsed.hosts) + try: + logging.info('dropping rethinkdb database %r', parsed.database) + rr.db_drop(parsed.database).run() + except Exception as e: + logging.warn( + 'problem deleting rethinkdb database %r: %s', + parsed.database, e) logging.info('deleting working directory %r', work_dir) os.chdir(orig_dir) shutil.rmtree(work_dir) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..d2147b8 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -39,13 +39,12 @@ class RethinkCaptures: """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") - def __init__( - self, rr, table="captures", shards=None, replicas=None, - options=warcprox.Options()): - self.rr = rr - self.table = table - self.shards = shards or len(rr.servers) - self.replicas = replicas or min(3, len(rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_big_table_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table self.options = options self._ensure_db_table() @@ -107,7 +106,9 @@ class RethinkCaptures: self.logger.info( "creating rethinkdb table %r in database %r", self.table, self.rr.dbname) - self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run() + self.rr.table_create( + self.table, shards=len(self.rr.servers), + replicas=min(3, len(self.rr.servers))).run() self.rr.table(self.table).index_create( "abbr_canon_surt_timestamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run() @@ -216,8 +217,8 @@ class RethinkCaptures: class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") - def __init__(self, captures_db, options=warcprox.Options()): - self.captures_db = captures_db + def __init__(self, options=warcprox.Options()): + self.captures_db = RethinkCaptures(options=options) self.options = options def lookup(self, digest_key, bucket="__unspecified__"): @@ -247,3 +248,7 @@ class RethinkCapturesDedup: def close(self): self.captures_db.close() + + def notify(self, recorded_url, records): + self.captures_db.notify(recorded_url, records) + diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 41439c2..0a45b7c 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -28,6 +28,8 @@ from hanzo import warctools import warcprox import sqlite3 import requests +import doublethink +import rethinkdb as r class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -115,11 +117,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") - def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()): - self.rr = rr - self.table = table - self.shards = shards or len(rr.servers) - self.replicas = replicas or min(3, len(rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table self._ensure_db_table() self.options = options @@ -132,12 +134,11 @@ class RethinkDedupDb: if not self.table in tables: self.logger.info( "creating rethinkdb table %r in database %r shards=%r " - "replicas=%r", self.table, self.rr.dbname, self.shards, - self.replicas) + "replicas=%r", self.table, self.rr.dbname, + len(self.rr.servers), min(3, len(self.rr.servers))) self.rr.table_create( - self.table, primary_key="key", shards=self.shards, - replicas=self.replicas).run() - + self.table, primary_key="key", shards=len(self.rr.servers), + replicas=min(3, len(self.rr.servers))).run() def start(self): pass @@ -182,6 +183,11 @@ class TroughDedupDb(object): logger = logging.getLogger("warcprox.dedup.TroughDedupDb") def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_trough_db_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.svcreg = doublethink.ServiceRegistry(self.rr) self.options = options def start(self): @@ -191,28 +197,21 @@ class TroughDedupDb(object): pass def _write_url(self, bucket): - import doublethink segment_id = 'warcprox-trough-%s' % bucket - rr = doublethink.Rethinker( - servers=['localhost'], db='trough_configuration') - services = doublethink.ServiceRegistry(rr) - master_node = services.unique_service('trough-sync-master') + master_node = self.svcreg.unique_service('trough-sync-master') response = requests.post(master_node['url'], segment_id) response.raise_for_status() write_url = response.text.strip() return write_url def _read_url(self, bucket): - import doublethink - import rethinkdb as r segment_id = 'warcprox-trough-%s' % bucket - rr = doublethink.Rethinker( - servers=['localhost'], db='trough_configuration') - reql = rr.table('services').get_all(segment_id, index='segment').filter( - {'role':'trough-read'}).filter( - lambda svc: r.now().sub( - svc['last_heartbeat']).lt(svc['ttl']) - ).order_by('load') + reql = self.rr.table('services').get_all( + segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') logging.debug('querying rethinkdb: %r', reql) results = reql.run() if results: diff --git a/warcprox/main.py b/warcprox/main.py index f4ac391..b7f3ec6 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# vim: set fileencoding=utf-8: ''' warcprox/main.py - entrypoint for warcprox executable, parses command line arguments, initializes components, starts controller, handles signals @@ -42,6 +43,7 @@ import warcprox import doublethink import cryptography.hazmat.backends.openssl import importlib +import doublethink class BetterArgumentDefaultsHelpFormatter( argparse.ArgumentDefaultsHelpFormatter, @@ -96,8 +98,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default=False, help='write digests in Base32 instead of hex') arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', action='append', help='only record requests with the given http method(s) (can be used more than once)') - arg_parser.add_argument('--stats-db-file', dest='stats_db_file', - default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + + group = arg_parser.add_mutually_exclusive_group() + group.add_argument( + '--stats-db-file', dest='stats_db_file', + default='./warcprox.sqlite', help=( + 'persistent statistics database file; empty string or ' + '/dev/null disables statistics tracking')) + group.add_argument( + '--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=( + 'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_stats_table')) + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', type=int, default=None, help='port to listen on for instant playback') arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', @@ -106,17 +118,25 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') - group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', - help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true') - arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', - help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument('--rethinkdb-big-table', - dest='rethinkdb_big_table', action='store_true', default=False, - help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') + group.add_argument( + '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=( + 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_dedup_table')) + group.add_argument( + '--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=( + 'rethinkdb big table url (table will be populated with ' + 'various capture information and is suitable for use as ' + 'index for playback), e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/captures')) + group.add_argument( + '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=( + '🐷   url pointing to trough configuration rethinkdb database, ' + 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' + '/trough_configuration')) arg_parser.add_argument( - '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', - default='captures', help=argparse.SUPPRESS) + '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( + 'rethinkdb service registry table url; if provided, warcprox ' + 'will create and heartbeat entry for itself')) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, @@ -178,30 +198,23 @@ def init_controller(args): exit(1) listeners = [] - if args.trough: + + if args.rethinkdb_dedup_url: + dedup_db = warcprox.dedup.RethinkDedupDb(options=options) + elif args.rethinkdb_big_table_url: + dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) + elif args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) - listeners.append(dedup_db) - elif args.rethinkdb_servers: - rr = doublethink.Rethinker( - args.rethinkdb_servers.split(","), args.rethinkdb_db) - if args.rethinkdb_big_table: - captures_db = warcprox.bigtable.RethinkCaptures( - rr, table=args.rethinkdb_big_table_name, options=options) - dedup_db = warcprox.bigtable.RethinkCapturesDedup( - captures_db, options=options) - listeners.append(captures_db) - else: - dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) - listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None else: dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) + if dedup_db: listeners.append(dedup_db) - if args.rethinkdb_servers: - stats_db = warcprox.stats.RethinkStatsDb(rr, options=options) + if args.rethinkdb_stats_url: + stats_db = warcprox.stats.RethinkStatsDb(options=options) listeners.append(stats_db) elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') @@ -252,8 +265,11 @@ def init_controller(args): listeners=listeners, options=options) for i in range(int(proxy.max_threads ** 0.5))] - if args.rethinkdb_servers: - svcreg = doublethink.ServiceRegistry(rr) + if args.rethinkdb_services_url: + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_services_url) + rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) else: svcreg = None @@ -303,8 +319,7 @@ def main(argv=sys.argv): loglevel = logging.INFO logging.basicConfig( - stream=sys.stdout, level=loglevel, - format=( + stream=sys.stdout, level=loglevel, format=( '%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) @@ -318,6 +333,7 @@ def ensure_rethinkdb_tables(): tables. So it's a good idea to use this utility at an early step when spinning up a cluster. ''' + raise Exception('adjust my args') arg_parser = argparse.ArgumentParser( prog=os.path.basename(sys.argv[0]), formatter_class=BetterArgumentDefaultsHelpFormatter) diff --git a/warcprox/stats.py b/warcprox/stats.py index 55693a2..5a87461 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -32,6 +32,7 @@ import datetime import urlcanon import sqlite3 import copy +import doublethink def _empty_bucket(bucket): return { @@ -189,11 +190,12 @@ class RethinkStatsDb(StatsDb): """Updates database in batch every 2.0 seconds""" logger = logging.getLogger("warcprox.stats.RethinkStatsDb") - def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): - self.rr = rethinker - self.table = table - self.shards = shards or 1 # 1 shard by default because it's probably a small table - self.replicas = replicas or min(3, len(self.rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table + self.replicas = min(3, len(self.rr.servers)) self._ensure_db_table() self.options = options @@ -271,10 +273,10 @@ class RethinkStatsDb(StatsDb): if not self.table in tables: self.logger.info( "creating rethinkdb table %r in database %r shards=%r " - "replicas=%r", self.table, self.rr.dbname, self.shards, + "replicas=%r", self.table, self.rr.dbname, 1, self.replicas) self.rr.table_create( - self.table, primary_key="bucket", shards=self.shards, + self.table, primary_key="bucket", shards=1, replicas=self.replicas).run() def close(self):