change rethinkdb-related command line options to use "rethinkdb urls" (parser just added to doublethink) to reduce the proliferation of rethinkdb options, and add --rethinkdb-trough-db-url option

This commit is contained in:
Noah Levitt 2017-10-11 12:06:19 -07:00
parent 4eda89f232
commit d177b3b80d
9 changed files with 164 additions and 123 deletions

View File

@ -34,8 +34,8 @@ before_script:
script:
- py.test -v tests
- py.test -v --rethinkdb-servers=localhost tests
- py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests
- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
notifications:
slack:

View File

@ -39,7 +39,7 @@ deps = [
'certauth==1.1.6',
'warctools',
'urlcanon>=0.1.dev16',
'doublethink>=0.2.0.dev81',
'doublethink>=0.2.0.dev87',
'PySocks',
]
try:

View File

@ -1,39 +1,41 @@
#
# tests/conftest.py - command line options for warcprox tests
#
# Copyright (C) 2015-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
# vim: set fileencoding=utf-8:
'''
tests/conftest.py - command line options for warcprox tests
Copyright (C) 2015-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
import pytest
def pytest_addoption(parser):
parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
parser.addoption('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
@pytest.fixture(scope="module")
def rethinkdb_servers(request):
return request.config.getoption("--rethinkdb-servers")
@pytest.fixture(scope="module")
def rethinkdb_big_table(request):
return request.config.getoption("--rethinkdb-big-table")
parser.addoption(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
parser.addoption(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
parser.addoption(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))

View File

@ -42,7 +42,9 @@ do
&& source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest requests warcio \
&& py.test -v tests \
&& py.test -v --rethinkdb-servers=localhost tests \
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/test3 tests \
"
done

View File

@ -242,7 +242,7 @@ def https_daemon(request, cert):
return https_daemon
@pytest.fixture(scope="module")
def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
def warcprox_(request):
orig_dir = os.getcwd()
work_dir = tempfile.mkdtemp()
logging.info('changing to working directory %r', work_dir)
@ -254,12 +254,15 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
'--port=0',
'--playback-port=0',
'--onion-tor-socks-proxy=localhost:9050']
if rethinkdb_servers:
rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
argv.append('--rethinkdb-servers=%s' % rethinkdb_servers)
argv.append('--rethinkdb-db=%s' % rethinkdb_db)
if rethinkdb_big_table:
argv.append('--rethinkdb-big-table')
if request.config.getoption('--rethinkdb-dedup-url'):
argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url'))
# test these here only
argv.append('--rethinkdb-stats-url=rethinkdb://localhost/test0/stats')
argv.append('--rethinkdb-services-url=rethinkdb://localhost/test0/services')
elif request.config.getoption('--rethinkdb-big-table-url'):
argv.append('--rethinkdb-big-table-url=%s' % request.config.getoption('--rethinkdb-big-table-url'))
elif request.config.getoption('--rethinkdb-trough-db-url'):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
@ -272,10 +275,22 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
def fin():
warcprox_.stop.set()
warcprox_thread.join()
if rethinkdb_servers:
logging.info('dropping rethinkdb database %r', rethinkdb_db)
rr = doublethink.Rethinker(rethinkdb_servers)
result = rr.db_drop(rethinkdb_db).run()
for rethinkdb_url in (
warcprox_.options.rethinkdb_big_table_url,
warcprox_.options.rethinkdb_dedup_url,
warcprox_.options.rethinkdb_services_url,
warcprox_.options.rethinkdb_stats_url):
if not rethinkdb_url:
continue
parsed = doublethink.parse_rethinkdb_url(rethinkdb_url)
rr = doublethink.Rethinker(servers=parsed.hosts)
try:
logging.info('dropping rethinkdb database %r', parsed.database)
rr.db_drop(parsed.database).run()
except Exception as e:
logging.warn(
'problem deleting rethinkdb database %r: %s',
parsed.database, e)
logging.info('deleting working directory %r', work_dir)
os.chdir(orig_dir)
shutil.rmtree(work_dir)

View File

@ -39,13 +39,12 @@ class RethinkCaptures:
"""Inserts in batches every 0.5 seconds"""
logger = logging.getLogger("warcprox.bigtable.RethinkCaptures")
def __init__(
self, rr, table="captures", shards=None, replicas=None,
options=warcprox.Options()):
self.rr = rr
self.table = table
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_big_table_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.options = options
self._ensure_db_table()
@ -107,7 +106,9 @@ class RethinkCaptures:
self.logger.info(
"creating rethinkdb table %r in database %r",
self.table, self.rr.dbname)
self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run()
self.rr.table_create(
self.table, shards=len(self.rr.servers),
replicas=min(3, len(self.rr.servers))).run()
self.rr.table(self.table).index_create(
"abbr_canon_surt_timestamp",
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
@ -216,8 +217,8 @@ class RethinkCaptures:
class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, captures_db, options=warcprox.Options()):
self.captures_db = captures_db
def __init__(self, options=warcprox.Options()):
self.captures_db = RethinkCaptures(options=options)
self.options = options
def lookup(self, digest_key, bucket="__unspecified__"):
@ -247,3 +248,7 @@ class RethinkCapturesDedup:
def close(self):
self.captures_db.close()
def notify(self, recorded_url, records):
self.captures_db.notify(recorded_url, records)

View File

@ -28,6 +28,8 @@ from hanzo import warctools
import warcprox
import sqlite3
import requests
import doublethink
import rethinkdb as r
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -115,11 +117,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()):
self.rr = rr
self.table = table
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self._ensure_db_table()
self.options = options
@ -132,12 +134,11 @@ class RethinkDedupDb:
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards,
self.replicas)
"replicas=%r", self.table, self.rr.dbname,
len(self.rr.servers), min(3, len(self.rr.servers)))
self.rr.table_create(
self.table, primary_key="key", shards=self.shards,
replicas=self.replicas).run()
self.table, primary_key="key", shards=len(self.rr.servers),
replicas=min(3, len(self.rr.servers))).run()
def start(self):
pass
@ -182,6 +183,11 @@ class TroughDedupDb(object):
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self.options = options
def start(self):
@ -191,28 +197,21 @@ class TroughDedupDb(object):
pass
def _write_url(self, bucket):
import doublethink
segment_id = 'warcprox-trough-%s' % bucket
rr = doublethink.Rethinker(
servers=['localhost'], db='trough_configuration')
services = doublethink.ServiceRegistry(rr)
master_node = services.unique_service('trough-sync-master')
master_node = self.svcreg.unique_service('trough-sync-master')
response = requests.post(master_node['url'], segment_id)
response.raise_for_status()
write_url = response.text.strip()
return write_url
def _read_url(self, bucket):
import doublethink
import rethinkdb as r
segment_id = 'warcprox-trough-%s' % bucket
rr = doublethink.Rethinker(
servers=['localhost'], db='trough_configuration')
reql = rr.table('services').get_all(segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
logging.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
# vim: set fileencoding=utf-8:
'''
warcprox/main.py - entrypoint for warcprox executable, parses command line
arguments, initializes components, starts controller, handles signals
@ -42,6 +43,7 @@ import warcprox
import doublethink
import cryptography.hazmat.backends.openssl
import importlib
import doublethink
class BetterArgumentDefaultsHelpFormatter(
argparse.ArgumentDefaultsHelpFormatter,
@ -96,8 +98,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help=(
'persistent statistics database file; empty string or '
'/dev/null disables statistics tracking'))
group.add_argument(
'--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=(
'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_stats_table'))
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
@ -106,17 +118,25 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
group.add_argument(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
group.add_argument(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
group.add_argument(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))
arg_parser.add_argument(
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
default='captures', help=argparse.SUPPRESS)
'--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
@ -178,30 +198,23 @@ def init_controller(args):
exit(1)
listeners = []
if args.trough:
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
listeners.append(dedup_db)
elif args.rethinkdb_servers:
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(
rr, table=args.rethinkdb_big_table_name, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(
captures_db, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db)
if args.rethinkdb_servers:
stats_db = warcprox.stats.RethinkStatsDb(rr, options=options)
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
@ -252,8 +265,11 @@ def init_controller(args):
listeners=listeners, options=options)
for i in range(int(proxy.max_threads ** 0.5))]
if args.rethinkdb_servers:
svcreg = doublethink.ServiceRegistry(rr)
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else:
svcreg = None
@ -303,8 +319,7 @@ def main(argv=sys.argv):
loglevel = logging.INFO
logging.basicConfig(
stream=sys.stdout, level=loglevel,
format=(
stream=sys.stdout, level=loglevel, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
@ -318,6 +333,7 @@ def ensure_rethinkdb_tables():
tables. So it's a good idea to use this utility at an early step when
spinning up a cluster.
'''
raise Exception('adjust my args')
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(sys.argv[0]),
formatter_class=BetterArgumentDefaultsHelpFormatter)

View File

@ -32,6 +32,7 @@ import datetime
import urlcanon
import sqlite3
import copy
import doublethink
def _empty_bucket(bucket):
return {
@ -189,11 +190,12 @@ class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()):
self.rr = rethinker
self.table = table
self.shards = shards or 1 # 1 shard by default because it's probably a small table
self.replicas = replicas or min(3, len(self.rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.replicas = min(3, len(self.rr.servers))
self._ensure_db_table()
self.options = options
@ -271,10 +273,10 @@ class RethinkStatsDb(StatsDb):
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards,
"replicas=%r", self.table, self.rr.dbname, 1,
self.replicas)
self.rr.table_create(
self.table, primary_key="bucket", shards=self.shards,
self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run()
def close(self):