Merge branch 'trough-dedup' into qa

* trough-dedup:
  update travis-ci trough deployment
  on error from trough read or write url, delete read/write url from cache, so next request will retrieve a fresh, hopefully working, url (n.b. not covered by automated tests at this point)
  cache trough read and write urls
  update trough dedup to use new segment manager api to register schema sql
  it finally works! another travis tweak though
  can we edit /etc/hosts in travis-ci?
  ugh fix docker command line arg
  docker container for trough needs a hostname that works from outside the container (since it registers itself in the service registry)
  trough logs are inside the docker container now
  need docker to publish the rethinkdb port for --rethinkdb-dedup-url and --rethinkdb-big-table-url tests
  apparently you can't use docker run options --rm and --detach together
  in travis-ci, run trough in another docker container, so that its version of python can be independent of the one used to run the warcprox tests
  remove some debugging from .travis.yml and importantly, deactivate the trough virtualenv before installing warcprox and running tests (otherwise it uses the wrong version of python)
  missed an ampersand
  bangin (is the problem that we didn't start trough-read?
  banging
  more banging on travis-ci
  cryptography 2.1.1 seems to be the problem
  banging on travis-ci
  first attempt to run trough on travis-ci
  get all the tests to pass with ./tests/run-tests.sh
  install and run trough in docker container for testing
  change rethinkdb-related command line options to use "rethinkdb urls" (parser just added to doublethink) to reduce the proliferation of rethinkdb options, and add --rethinkdb-trough-db-url option
  trough for deduplication initial proof-of-concept-ish code
This commit is contained in:
Noah Levitt 2017-11-07 17:12:28 -08:00
commit 7ef2133628
11 changed files with 471 additions and 124 deletions

View File

@ -1,4 +1,4 @@
group: deprecated-2017Q2 sudo: required
language: python language: python
python: python:
@ -27,15 +27,38 @@ services:
before_install: before_install:
- sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778
- docker run -d --publish=28015:28015 rethinkdb - docker network create --driver=bridge trough
- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb
- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed
- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh
- cat /etc/hosts
- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄
- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts
- echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts
- echo 127.0.0.1 trough | sudo tee -a /etc/hosts
- cat /etc/hosts
- ping -c2 trough
install:
- pip install . pytest requests warcio
before_script: before_script:
- pip install . pytest requests warcio - ps ww -fHe
script: script:
- py.test -v tests - py.test -v tests
- py.test -v --rethinkdb-servers=localhost tests - py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests - py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests
after_script:
- ps ww -fHe
- docker exec trough cat /tmp/trough-write.out
- docker exec trough cat /tmp/trough-write-provisioner-server.out
- docker exec trough cat /tmp/trough-write-provisioner-local.out
- docker exec trough cat /tmp/trough-sync-server.out
- docker exec trough cat /tmp/trough-sync-local.out
- docker exec trough cat /tmp/trough-read.out
notifications: notifications:
slack: slack:

View File

@ -39,8 +39,8 @@ deps = [
'certauth==1.1.6', 'certauth==1.1.6',
'warctools', 'warctools',
'urlcanon>=0.1.dev16', 'urlcanon>=0.1.dev16',
'doublethink>=0.2.0.dev87',
'urllib3', 'urllib3',
'doublethink>=0.2.0.dev81',
'PySocks', 'PySocks',
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
] ]

View File

@ -1,7 +1,7 @@
# #
# Dockerfile for warcprox tests # Dockerfile for warcprox tests
# #
# Copyright (C) 2015-2016 Internet Archive # Copyright (C) 2015-2017 Internet Archive
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
@ -23,19 +23,19 @@ FROM phusion/baseimage
MAINTAINER Noah Levitt <nlevitt@archive.org> MAINTAINER Noah Levitt <nlevitt@archive.org>
# see https://github.com/stuartpb/rethinkdb-dockerfiles/blob/master/trusty/2.1.3/Dockerfile # see https://github.com/stuartpb/rethinkdb-dockerfiles/blob/master/trusty/2.1.3/Dockerfile
# and https://github.com/chali/hadoop-cdh-pseudo-docker/blob/master/Dockerfile
ENV LANG=C.UTF-8 ENV LANG=C.UTF-8
RUN apt-get update && apt-get --auto-remove -y dist-upgrade RUN apt-get update && apt-get --auto-remove -y dist-upgrade
# Add the RethinkDB repository and public key # Add the RethinkDB repository and public key
# "RethinkDB Packaging <packaging@rethinkdb.com>" http://download.rethinkdb.com/apt/pubkey.gpg RUN curl -s https://download.rethinkdb.com/apt/pubkey.gpg | apt-key add - \
RUN apt-key adv --keyserver pgp.mit.edu --recv-keys 1614552E5765227AEC39EFCFA7E00EF33A8F2399 \
&& echo "deb http://download.rethinkdb.com/apt xenial main" > /etc/apt/sources.list.d/rethinkdb.list \ && echo "deb http://download.rethinkdb.com/apt xenial main" > /etc/apt/sources.list.d/rethinkdb.list \
&& apt-get update && apt-get -y install rethinkdb && apt-get update && apt-get -y install rethinkdb
RUN mkdir -vp /etc/service/rethinkdb \ RUN mkdir -vp /etc/service/rethinkdb \
&& echo "#!/bin/sh\nrethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \ && echo "#!/bin/bash\nexec rethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \
&& chmod a+x /etc/service/rethinkdb/run && chmod a+x /etc/service/rethinkdb/run
RUN apt-get -y install git RUN apt-get -y install git
@ -53,6 +53,55 @@ RUN pip install virtualenv
RUN apt-get -y install tor RUN apt-get -y install tor
RUN mkdir -vp /etc/service/tor \ RUN mkdir -vp /etc/service/tor \
&& echo "#!/bin/sh\ntor\n" > /etc/service/tor/run \ && echo "#!/bin/sh\nexec tor\n" > /etc/service/tor/run \
&& chmod a+x /etc/service/tor/run && chmod a+x /etc/service/tor/run
# hadoop hdfs for trough
RUN curl -s https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key | apt-key add - \
&& . /etc/lsb-release \
&& echo "deb [arch=amd64] http://archive.cloudera.com/cdh5/ubuntu/$DISTRIB_CODENAME/amd64/cdh $DISTRIB_CODENAME-cdh5 contrib" >> /etc/apt/sources.list.d/cloudera.list
RUN apt-get update
RUN apt-get install -y openjdk-8-jdk hadoop-conf-pseudo
RUN su hdfs -c 'hdfs namenode -format'
RUN mv -v /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/core-site.xml.orig \
&& cat /etc/hadoop/conf/core-site.xml.orig | sed 's,localhost:8020,0.0.0.0:8020,' > /etc/hadoop/conf/core-site.xml
RUN mv -v /etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml.orig \
&& cat /etc/hadoop/conf/hdfs-site.xml.orig | sed 's,^</configuration>$, <property>\n <name>dfs.permissions.enabled</name>\n <value>false</value>\n </property>\n</configuration>,' > /etc/hadoop/conf/hdfs-site.xml
RUN echo '#!/bin/bash\nservice hadoop-hdfs-namenode start\nservice hadoop-hdfs-datanode start' > /etc/my_init.d/50_start_hdfs.sh \
&& chmod a+x /etc/my_init.d/50_start_hdfs.sh
# trough itself
RUN virtualenv -p python3 /opt/trough-ve3 \
&& . /opt/trough-ve3/bin/activate \
&& pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \
&& pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup
RUN mkdir -vp /etc/service/trough-sync-local \
&& echo "#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nexec sync.py >>/tmp/trough-sync-local.out 2>&1" > /etc/service/trough-sync-local/run \
&& chmod a+x /etc/service/trough-sync-local/run
RUN mkdir -vp /etc/service/trough-sync-server \
&& echo '#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec sync.py --server >>/tmp/trough-sync-server.out 2>&1' > /etc/service/trough-sync-server/run \
&& chmod a+x /etc/service/trough-sync-server/run
RUN mkdir -vp /etc/service/trough-read \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/reader.py >>/tmp/trough-read.out 2>&1' > /etc/service/trough-read/run \
&& chmod a+x /etc/service/trough-read/run
RUN mkdir -vp /etc/service/trough-write \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \
&& chmod a+x /etc/service/trough-write/run
RUN mkdir -vp /etc/service/trough-write-provisioner-local \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \
&& chmod a+x /etc/service/trough-write-provisioner-local/run
RUN mkdir -vp /etc/service/trough-write-provisioner-server \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \
&& chmod a+x /etc/service/trough-write-provisioner-server/run

View File

@ -1,39 +1,41 @@
# # vim: set fileencoding=utf-8:
# tests/conftest.py - command line options for warcprox tests '''
# tests/conftest.py - command line options for warcprox tests
# Copyright (C) 2015-2016 Internet Archive
# Copyright (C) 2015-2017 Internet Archive
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License This program is free software; you can redistribute it and/or
# as published by the Free Software Foundation; either version 2 modify it under the terms of the GNU General Public License
# of the License, or (at your option) any later version. as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of This program is distributed in the hope that it will be useful,
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software You should have received a copy of the GNU General Public License
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, along with this program; if not, write to the Free Software
# USA. Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
'''
import pytest import pytest
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers', parser.addoption(
help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
parser.addoption('--rethinkdb-big-table', 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
dest='rethinkdb_big_table', action='store_true', default=False, 'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') parser.addoption(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
@pytest.fixture(scope="module") 'rethinkdb big table url (table will be populated with '
def rethinkdb_servers(request): 'various capture information and is suitable for use as '
return request.config.getoption("--rethinkdb-servers") 'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
@pytest.fixture(scope="module") parser.addoption(
def rethinkdb_big_table(request): '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
return request.config.getoption("--rethinkdb-big-table") '🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))

View File

@ -5,8 +5,6 @@
# features enabled, against that instance of rethinkdb, and also run without # features enabled, against that instance of rethinkdb, and also run without
# rethinkdb features enabled. With python 2.7 and 3.4. # rethinkdb features enabled. With python 2.7 and 3.4.
# #
# tests/conftest.py - command line options for warcprox tests
#
# Copyright (C) 2015-2017 Internet Archive # Copyright (C) 2015-2017 Internet Archive
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
@ -33,7 +31,7 @@ script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
docker build -t internetarchive/warcprox-tests $script_dir docker build -t internetarchive/warcprox-tests $script_dir
for python in python2.7 python3 for python in python3 python2.7
do do
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \ docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \
bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \ bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \
@ -42,7 +40,9 @@ do
&& source /tmp/venv/bin/activate \ && source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && pip --log-file /tmp/pip.log install . pytest mock requests warcio \
&& py.test -v tests \ && py.test -v tests \
&& py.test -v --rethinkdb-servers=localhost tests \ && py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" && py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \
"
done done

34
tests/run-trough.sh Normal file
View File

@ -0,0 +1,34 @@
#!/bin/bash
#
# this is used by .travis.yml
#
pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string
pip install git+https://github.com/internetarchive/trough.git@toward-warcprox-dedup
mkdir /etc/trough
# hello docker user-defined bridge networking
echo '
HDFS_HOST: hadoop
RETHINKDB_HOSTS:
- rethinkdb
' > /etc/trough/settings.yml
sync.py >>/tmp/trough-sync-local.out 2>&1 &
sleep 5
python -c "
import doublethink
from trough.settings import settings
rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS'])
rr.db('trough_configuration').wait().run()"
sync.py --server >>/tmp/trough-sync-server.out 2>&1 &
uwsgi --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/writer.py >>/tmp/trough-write.out 2>&1 &
uwsgi --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:local >>/tmp/trough-segment-manager-local.out 2>&1 &
uwsgi --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:server >>/tmp/trough-segment-manager-server.out 2>&1 &
uwsgi --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/reader.py >>/tmp/trough-read.out 2>&1 &
wait

View File

@ -84,7 +84,8 @@ def _send(self, data):
# http_client.HTTPConnection.send = _send # http_client.HTTPConnection.send = _send
logging.basicConfig( logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, # stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
stream=sys.stdout, level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
@ -242,7 +243,7 @@ def https_daemon(request, cert):
return https_daemon return https_daemon
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): def warcprox_(request):
orig_dir = os.getcwd() orig_dir = os.getcwd()
work_dir = tempfile.mkdtemp() work_dir = tempfile.mkdtemp()
logging.info('changing to working directory %r', work_dir) logging.info('changing to working directory %r', work_dir)
@ -255,12 +256,15 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
'--playback-port=0', '--playback-port=0',
'--onion-tor-socks-proxy=localhost:9050', '--onion-tor-socks-proxy=localhost:9050',
'--crawl-log-dir=crawl-logs'] '--crawl-log-dir=crawl-logs']
if rethinkdb_servers: if request.config.getoption('--rethinkdb-dedup-url'):
rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url'))
argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) # test these here only
argv.append('--rethinkdb-db=%s' % rethinkdb_db) argv.append('--rethinkdb-stats-url=rethinkdb://localhost/test0/stats')
if rethinkdb_big_table: argv.append('--rethinkdb-services-url=rethinkdb://localhost/test0/services')
argv.append('--rethinkdb-big-table') elif request.config.getoption('--rethinkdb-big-table-url'):
argv.append('--rethinkdb-big-table-url=%s' % request.config.getoption('--rethinkdb-big-table-url'))
elif request.config.getoption('--rethinkdb-trough-db-url'):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv) args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args) warcprox_ = warcprox.main.init_controller(args)
@ -273,10 +277,22 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
def fin(): def fin():
warcprox_.stop.set() warcprox_.stop.set()
warcprox_thread.join() warcprox_thread.join()
if rethinkdb_servers: for rethinkdb_url in (
logging.info('dropping rethinkdb database %r', rethinkdb_db) warcprox_.options.rethinkdb_big_table_url,
rr = doublethink.Rethinker(rethinkdb_servers) warcprox_.options.rethinkdb_dedup_url,
result = rr.db_drop(rethinkdb_db).run() warcprox_.options.rethinkdb_services_url,
warcprox_.options.rethinkdb_stats_url):
if not rethinkdb_url:
continue
parsed = doublethink.parse_rethinkdb_url(rethinkdb_url)
rr = doublethink.Rethinker(servers=parsed.hosts)
try:
logging.info('dropping rethinkdb database %r', parsed.database)
rr.db_drop(parsed.database).run()
except Exception as e:
logging.warn(
'problem deleting rethinkdb database %r: %s',
parsed.database, e)
logging.info('deleting working directory %r', work_dir) logging.info('deleting working directory %r', work_dir)
os.chdir(orig_dir) os.chdir(orig_dir)
shutil.rmtree(work_dir) shutil.rmtree(work_dir)
@ -410,6 +426,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id']) assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -483,6 +500,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id']) assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])

View File

@ -39,13 +39,12 @@ class RethinkCaptures:
"""Inserts in batches every 0.5 seconds""" """Inserts in batches every 0.5 seconds"""
logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") logger = logging.getLogger("warcprox.bigtable.RethinkCaptures")
def __init__( def __init__(self, options=warcprox.Options()):
self, rr, table="captures", shards=None, replicas=None, parsed = doublethink.parse_rethinkdb_url(
options=warcprox.Options()): options.rethinkdb_big_table_url)
self.rr = rr self.rr = doublethink.Rethinker(
self.table = table servers=parsed.hosts, db=parsed.database)
self.shards = shards or len(rr.servers) self.table = parsed.table
self.replicas = replicas or min(3, len(rr.servers))
self.options = options self.options = options
self._ensure_db_table() self._ensure_db_table()
@ -107,7 +106,9 @@ class RethinkCaptures:
self.logger.info( self.logger.info(
"creating rethinkdb table %r in database %r", "creating rethinkdb table %r in database %r",
self.table, self.rr.dbname) self.table, self.rr.dbname)
self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run() self.rr.table_create(
self.table, shards=len(self.rr.servers),
replicas=min(3, len(self.rr.servers))).run()
self.rr.table(self.table).index_create( self.rr.table(self.table).index_create(
"abbr_canon_surt_timestamp", "abbr_canon_surt_timestamp",
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run() [r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
@ -216,8 +217,8 @@ class RethinkCaptures:
class RethinkCapturesDedup: class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, captures_db, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
self.captures_db = captures_db self.captures_db = RethinkCaptures(options=options)
self.options = options self.options = options
def lookup(self, digest_key, bucket="__unspecified__", url=None): def lookup(self, digest_key, bucket="__unspecified__", url=None):
@ -247,3 +248,7 @@ class RethinkCapturesDedup:
def close(self): def close(self):
self.captures_db.close() self.captures_db.close()
def notify(self, recorded_url, records):
self.captures_db.notify(recorded_url, records)

View File

@ -21,13 +21,16 @@ USA.
from __future__ import absolute_import from __future__ import absolute_import
from datetime import datetime
import logging import logging
import os import os
import json import json
from hanzo import warctools from hanzo import warctools
import warcprox import warcprox
import sqlite3 import sqlite3
import requests
import doublethink
import rethinkdb as r
import datetime
import urllib3 import urllib3
from urllib3.exceptions import HTTPError from urllib3.exceptions import HTTPError
@ -121,11 +124,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb: class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
self.rr = rr parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
self.table = table self.rr = doublethink.Rethinker(
self.shards = shards or len(rr.servers) servers=parsed.hosts, db=parsed.database)
self.replicas = replicas or min(3, len(rr.servers)) self.table = parsed.table
self._ensure_db_table() self._ensure_db_table()
self.options = options self.options = options
@ -138,12 +141,11 @@ class RethinkDedupDb:
if not self.table in tables: if not self.table in tables:
self.logger.info( self.logger.info(
"creating rethinkdb table %r in database %r shards=%r " "creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards, "replicas=%r", self.table, self.rr.dbname,
self.replicas) len(self.rr.servers), min(3, len(self.rr.servers)))
self.rr.table_create( self.rr.table_create(
self.table, primary_key="key", shards=self.shards, self.table, primary_key="key", shards=len(self.rr.servers),
replicas=self.replicas).run() replicas=min(3, len(self.rr.servers))).run()
def start(self): def start(self):
pass pass
@ -181,7 +183,6 @@ class RethinkDedupDb:
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
class CdxServerDedup(object): class CdxServerDedup(object):
"""Query a CDX server to perform deduplication. """Query a CDX server to perform deduplication.
""" """
@ -231,8 +232,8 @@ class CdxServerDedup(object):
if line: if line:
(cdx_ts, cdx_digest) = line.split(b' ') (cdx_ts, cdx_digest) = line.split(b' ')
if cdx_digest == dkey: if cdx_digest == dkey:
dt = datetime.strptime(cdx_ts.decode('ascii'), dt = datetime.datetime.strptime(
'%Y%m%d%H%M%S') cdx_ts.decode('ascii'), '%Y%m%d%H%M%S')
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
return dict(url=url, date=date) return dict(url=url, date=date)
except (HTTPError, AssertionError, ValueError) as exc: except (HTTPError, AssertionError, ValueError) as exc:
@ -244,3 +245,197 @@ class CdxServerDedup(object):
"""Since we don't save anything to CDX server, this does not apply. """Since we don't save anything to CDX server, this does not apply.
""" """
pass pass
class TroughClient(object):
logger = logging.getLogger("warcprox.dedup.TroughClient")
def __init__(self, rethinkdb_trough_db_url):
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
logging.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id)
response = requests.put(url, sql)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))
class TroughDedupDb(object):
'''
https://github.com/internetarchive/trough
'''
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
SCHEMA_ID = 'warcprox-dedup-v1'
SCHEMA_SQL = ('create table dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
def __init__(self, options=warcprox.Options()):
self.options = options
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
self._write_url_cache = {}
self._read_url_cache = {}
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
def _write_url(self, bucket):
if not bucket in self._write_url_cache:
segment_id = 'warcprox-trough-%s' % bucket
self._write_url_cache[bucket] = self._trough_cli.write_url(
segment_id, self.SCHEMA_ID)
logging.info(
'bucket %r write url is %r', bucket,
self._write_url_cache[bucket])
return self._write_url_cache[bucket]
def _read_url(self, bucket):
if not self._read_url_cache.get(bucket):
segment_id = 'warcprox-trough-%s' % bucket
self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id)
logging.info(
'bucket %r read url is %r', bucket,
self._read_url_cache[bucket])
return self._read_url_cache[bucket]
def sql_value(self, x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# py3: repr(u'abc') => 'abc'
# repr(b'abc') => b'abc'
# py2: repr(u'abc') => u'abc'
# repr(b'abc') => 'abc'
# Repr gives us a prefix we don't want in different situations
# depending on whether this is py2 or py3. Chop it off either way.
r = repr(x)
if r[:1] == "'":
return r
else:
return r[1:]
else:
raise Exception("don't know how to make an sql value from %r" % x)
def save(self, digest_key, response_record, bucket='__unspecified__'):
write_url = self._write_url(bucket)
record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
sql = ('insert into dedup (digest_key, url, date, id) '
'values (%s, %s, %s, %s);') % (
self.sql_value(digest_key), self.sql_value(url),
self.sql_value(warc_date), self.sql_value(record_id))
try:
response = requests.post(write_url, sql)
except:
logging.error(
'problem with trough write url %r', write_url,
exc_info=True)
del self._write_url_cache[bucket]
return
if response.status_code != 200:
del self._write_url_cache[bucket]
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
else:
logging.trace('posted %r to %s', sql, write_url)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
read_url = self._read_url(bucket)
if not read_url:
return None
sql = 'select * from dedup where digest_key=%s;' % (
self.sql_value(digest_key))
try:
response = requests.post(read_url, sql)
except:
logging.error(
'problem with trough read url %r', read_url, exc_info=True)
del self._read_url_cache[bucket]
return None
if response.status_code != 200:
del self._read_url_cache[bucket]
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
return None
logging.debug('got %r from query %r', response.text, sql)
results = json.loads(response.text)
assert len(results) <= 1 # sanity check (digest_key is primary key)
if results:
result = results[0]
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
self.logger.debug(
'trough lookup of key=%r returning %r', digest_key, result)
return result
else:
return None
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest,
self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['captures-bucket'])
else:
self.save(digest_key, records[0])

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
# vim: set fileencoding=utf-8:
''' '''
warcprox/main.py - entrypoint for warcprox executable, parses command line warcprox/main.py - entrypoint for warcprox executable, parses command line
arguments, initializes components, starts controller, handles signals arguments, initializes components, starts controller, handles signals
@ -42,6 +43,7 @@ import warcprox
import doublethink import doublethink
import cryptography.hazmat.backends.openssl import cryptography.hazmat.backends.openssl
import importlib import importlib
import doublethink
class BetterArgumentDefaultsHelpFormatter( class BetterArgumentDefaultsHelpFormatter(
argparse.ArgumentDefaultsHelpFormatter, argparse.ArgumentDefaultsHelpFormatter,
@ -58,7 +60,7 @@ class BetterArgumentDefaultsHelpFormatter(
if isinstance(action, argparse._StoreConstAction): if isinstance(action, argparse._StoreConstAction):
return action.help return action.help
else: else:
return super()._get_help_string(action) return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action)
def _build_arg_parser(prog=os.path.basename(sys.argv[0])): def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog, arg_parser = argparse.ArgumentParser(prog=prog,
@ -98,8 +100,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default=False, help='write digests in Base32 instead of hex') default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)') action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help=(
'persistent statistics database file; empty string or '
'/dev/null disables statistics tracking'))
group.add_argument(
'--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=(
'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_stats_table'))
arg_parser.add_argument('-P', '--playback-port', dest='playback_port', arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback') type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
@ -108,18 +120,27 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
group = arg_parser.add_mutually_exclusive_group() group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
group.add_argument(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
group.add_argument(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument( arg_parser.add_argument(
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', '--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
default='captures', help=argparse.SUPPRESS) 'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
arg_parser.add_argument('--queue-size', dest='queue_size', type=int, arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS) default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int, arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
@ -186,30 +207,25 @@ def init_controller(args):
exit(1) exit(1)
listeners = [] listeners = []
if args.rethinkdb_servers:
rr = doublethink.Rethinker( if args.rethinkdb_dedup_url:
args.rethinkdb_servers.split(","), args.rethinkdb_db) dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
if args.rethinkdb_big_table: elif args.rethinkdb_big_table_url:
captures_db = warcprox.bigtable.RethinkCaptures( dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
rr, table=args.rethinkdb_big_table_name, options=options) elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup( dedup_db = warcprox.dedup.TroughDedupDb(options)
captures_db, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
listeners.append(dedup_db)
elif args.cdxserver_dedup: elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'): elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled') logging.info('deduplication disabled')
dedup_db = None dedup_db = None
else: else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db) listeners.append(dedup_db)
if args.rethinkdb_servers: if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(rr, options=options) stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db) listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'): elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled') logging.info('statistics tracking disabled')
@ -263,8 +279,11 @@ def init_controller(args):
listeners=listeners, options=options) listeners=listeners, options=options)
for i in range(int(proxy.max_threads ** 0.5))] for i in range(int(proxy.max_threads ** 0.5))]
if args.rethinkdb_servers: if args.rethinkdb_services_url:
svcreg = doublethink.ServiceRegistry(rr) parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else: else:
svcreg = None svcreg = None
@ -298,8 +317,7 @@ def main(argv=sys.argv):
loglevel = logging.INFO loglevel = logging.INFO
logging.basicConfig( logging.basicConfig(
stream=sys.stdout, level=loglevel, stream=sys.stdout, level=loglevel, format=(
format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
@ -326,6 +344,7 @@ def ensure_rethinkdb_tables():
tables. So it's a good idea to use this utility at an early step when tables. So it's a good idea to use this utility at an early step when
spinning up a cluster. spinning up a cluster.
''' '''
raise Exception('adjust my args')
arg_parser = argparse.ArgumentParser( arg_parser = argparse.ArgumentParser(
prog=os.path.basename(sys.argv[0]), prog=os.path.basename(sys.argv[0]),
formatter_class=BetterArgumentDefaultsHelpFormatter) formatter_class=BetterArgumentDefaultsHelpFormatter)

View File

@ -32,6 +32,7 @@ import datetime
import urlcanon import urlcanon
import sqlite3 import sqlite3
import copy import copy
import doublethink
def _empty_bucket(bucket): def _empty_bucket(bucket):
return { return {
@ -189,11 +190,12 @@ class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds""" """Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb") logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
self.rr = rethinker parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.table = table self.rr = doublethink.Rethinker(
self.shards = shards or 1 # 1 shard by default because it's probably a small table servers=parsed.hosts, db=parsed.database)
self.replicas = replicas or min(3, len(self.rr.servers)) self.table = parsed.table
self.replicas = min(3, len(self.rr.servers))
self._ensure_db_table() self._ensure_db_table()
self.options = options self.options = options
@ -271,10 +273,10 @@ class RethinkStatsDb(StatsDb):
if not self.table in tables: if not self.table in tables:
self.logger.info( self.logger.info(
"creating rethinkdb table %r in database %r shards=%r " "creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards, "replicas=%r", self.table, self.rr.dbname, 1,
self.replicas) self.replicas)
self.rr.table_create( self.rr.table_create(
self.table, primary_key="bucket", shards=self.shards, self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run() replicas=self.replicas).run()
def close(self): def close(self):