Merge pull request #37 from nlevitt/trough-dedup

WIP: trough for deduplication initial proof-of-concept-ish code
This commit is contained in:
jkafader 2017-11-30 16:14:43 -08:00 committed by GitHub
commit e5a3dd8b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 756 additions and 148 deletions

View File

@ -1,4 +1,4 @@
group: deprecated-2017Q2
sudo: required
language: python
python:
@ -13,8 +13,6 @@ python:
matrix:
allow_failures:
- python: pypy
- python: pypy3
- python: nightly
- python: 3.7-dev
@ -28,15 +26,38 @@ services:
before_install:
- sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778
- docker run -d --publish=28015:28015 rethinkdb
- docker network create --driver=bridge trough
- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb
- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed
- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh
- cat /etc/hosts
- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄
- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts
- echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts
- echo 127.0.0.1 trough | sudo tee -a /etc/hosts
- cat /etc/hosts
- ping -c2 trough
install:
- pip install . pytest requests warcio mock
before_script:
- pip install . pytest requests warcio
- ps ww -fHe
script:
- py.test -v tests
- py.test -v --rethinkdb-servers=localhost tests
- py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests
- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests
after_script:
- ps ww -fHe
- docker exec trough cat /tmp/trough-write.out
- docker exec trough cat /tmp/trough-write-provisioner-server.out
- docker exec trough cat /tmp/trough-write-provisioner-local.out
- docker exec trough cat /tmp/trough-sync-server.out
- docker exec trough cat /tmp/trough-sync-local.out
- docker exec trough cat /tmp/trough-read.out
notifications:
slack:

View File

@ -39,8 +39,9 @@ deps = [
'certauth==1.1.6',
'warctools',
'urlcanon>=0.1.dev16',
'doublethink>=0.2.0.dev87',
'urllib3',
'doublethink>=0.2.0.dev81',
'requests>=2.0.1',
'PySocks',
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
]
@ -60,7 +61,7 @@ setuptools.setup(
license='GPL',
packages=['warcprox'],
install_requires=deps,
tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
tests_require=['mock', 'pytest', 'warcio'],
cmdclass = {'test': PyTest},
test_suite='warcprox.tests',
entry_points={

View File

@ -1,7 +1,7 @@
#
# Dockerfile for warcprox tests
#
# Copyright (C) 2015-2016 Internet Archive
# Copyright (C) 2015-2017 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@ -23,19 +23,19 @@ FROM phusion/baseimage
MAINTAINER Noah Levitt <nlevitt@archive.org>
# see https://github.com/stuartpb/rethinkdb-dockerfiles/blob/master/trusty/2.1.3/Dockerfile
# and https://github.com/chali/hadoop-cdh-pseudo-docker/blob/master/Dockerfile
ENV LANG=C.UTF-8
RUN apt-get update && apt-get --auto-remove -y dist-upgrade
# Add the RethinkDB repository and public key
# "RethinkDB Packaging <packaging@rethinkdb.com>" http://download.rethinkdb.com/apt/pubkey.gpg
RUN apt-key adv --keyserver pgp.mit.edu --recv-keys 1614552E5765227AEC39EFCFA7E00EF33A8F2399 \
RUN curl -s https://download.rethinkdb.com/apt/pubkey.gpg | apt-key add - \
&& echo "deb http://download.rethinkdb.com/apt xenial main" > /etc/apt/sources.list.d/rethinkdb.list \
&& apt-get update && apt-get -y install rethinkdb
RUN mkdir -vp /etc/service/rethinkdb \
&& echo "#!/bin/sh\nrethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \
&& echo "#!/bin/bash\nexec rethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \
&& chmod a+x /etc/service/rethinkdb/run
RUN apt-get -y install git
@ -53,6 +53,55 @@ RUN pip install virtualenv
RUN apt-get -y install tor
RUN mkdir -vp /etc/service/tor \
&& echo "#!/bin/sh\ntor\n" > /etc/service/tor/run \
&& echo "#!/bin/sh\nexec tor\n" > /etc/service/tor/run \
&& chmod a+x /etc/service/tor/run
# hadoop hdfs for trough
RUN curl -s https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key | apt-key add - \
&& . /etc/lsb-release \
&& echo "deb [arch=amd64] http://archive.cloudera.com/cdh5/ubuntu/$DISTRIB_CODENAME/amd64/cdh $DISTRIB_CODENAME-cdh5 contrib" >> /etc/apt/sources.list.d/cloudera.list
RUN apt-get update
RUN apt-get install -y openjdk-8-jdk hadoop-conf-pseudo
RUN su hdfs -c 'hdfs namenode -format'
RUN mv -v /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/core-site.xml.orig \
&& cat /etc/hadoop/conf/core-site.xml.orig | sed 's,localhost:8020,0.0.0.0:8020,' > /etc/hadoop/conf/core-site.xml
RUN mv -v /etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml.orig \
&& cat /etc/hadoop/conf/hdfs-site.xml.orig | sed 's,^</configuration>$, <property>\n <name>dfs.permissions.enabled</name>\n <value>false</value>\n </property>\n</configuration>,' > /etc/hadoop/conf/hdfs-site.xml
RUN echo '#!/bin/bash\nservice hadoop-hdfs-namenode start\nservice hadoop-hdfs-datanode start' > /etc/my_init.d/50_start_hdfs.sh \
&& chmod a+x /etc/my_init.d/50_start_hdfs.sh
# trough itself
RUN virtualenv -p python3 /opt/trough-ve3 \
&& . /opt/trough-ve3/bin/activate \
&& pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \
&& pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup
RUN mkdir -vp /etc/service/trough-sync-local \
&& echo "#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nexec sync.py >>/tmp/trough-sync-local.out 2>&1" > /etc/service/trough-sync-local/run \
&& chmod a+x /etc/service/trough-sync-local/run
RUN mkdir -vp /etc/service/trough-sync-server \
&& echo '#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec sync.py --server >>/tmp/trough-sync-server.out 2>&1' > /etc/service/trough-sync-server/run \
&& chmod a+x /etc/service/trough-sync-server/run
RUN mkdir -vp /etc/service/trough-read \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/reader.py >>/tmp/trough-read.out 2>&1' > /etc/service/trough-read/run \
&& chmod a+x /etc/service/trough-read/run
RUN mkdir -vp /etc/service/trough-write \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \
&& chmod a+x /etc/service/trough-write/run
RUN mkdir -vp /etc/service/trough-write-provisioner-local \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \
&& chmod a+x /etc/service/trough-write-provisioner-local/run
RUN mkdir -vp /etc/service/trough-write-provisioner-server \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \
&& chmod a+x /etc/service/trough-write-provisioner-server/run

View File

@ -1,39 +1,41 @@
#
# tests/conftest.py - command line options for warcprox tests
#
# Copyright (C) 2015-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
# vim: set fileencoding=utf-8:
'''
tests/conftest.py - command line options for warcprox tests
Copyright (C) 2015-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
import pytest
def pytest_addoption(parser):
parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
parser.addoption('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
@pytest.fixture(scope="module")
def rethinkdb_servers(request):
return request.config.getoption("--rethinkdb-servers")
@pytest.fixture(scope="module")
def rethinkdb_big_table(request):
return request.config.getoption("--rethinkdb-big-table")
parser.addoption(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
parser.addoption(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
parser.addoption(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))

View File

@ -5,8 +5,6 @@
# features enabled, against that instance of rethinkdb, and also run without
# rethinkdb features enabled. With python 2.7 and 3.4.
#
# tests/conftest.py - command line options for warcprox tests
#
# Copyright (C) 2015-2017 Internet Archive
#
# This program is free software; you can redistribute it and/or
@ -33,7 +31,7 @@ script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
docker build -t internetarchive/warcprox-tests $script_dir
for python in python2.7 python3
for python in python3 python2.7
do
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \
bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \
@ -42,7 +40,9 @@ do
&& source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
&& py.test -v tests \
&& py.test -v --rethinkdb-servers=localhost tests \
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \
"
done

34
tests/run-trough.sh Normal file
View File

@ -0,0 +1,34 @@
#!/bin/bash
#
# this is used by .travis.yml
#
pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string
pip install git+https://github.com/internetarchive/trough.git@toward-warcprox-dedup
mkdir /etc/trough
# hello docker user-defined bridge networking
echo '
HDFS_HOST: hadoop
RETHINKDB_HOSTS:
- rethinkdb
' > /etc/trough/settings.yml
sync.py >>/tmp/trough-sync-local.out 2>&1 &
sleep 5
python -c "
import doublethink
from trough.settings import settings
rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS'])
rr.db('trough_configuration').wait().run()"
sync.py --server >>/tmp/trough-sync-server.out 2>&1 &
uwsgi --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/writer.py >>/tmp/trough-write.out 2>&1 &
uwsgi --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:local >>/tmp/trough-segment-manager-local.out 2>&1 &
uwsgi --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:server >>/tmp/trough-segment-manager-server.out 2>&1 &
uwsgi --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/reader.py >>/tmp/trough-read.out 2>&1 &
wait

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python
# vim: set fileencoding=utf-8:
'''
tests/test_ensure_rethinkdb_tables.py - automated tests of
ensure-rethinkdb-tables utility
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
import warcprox.main
import pytest
import socket
import doublethink
import logging
import sys
logging.basicConfig(
stream=sys.stdout, level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
def rethinkdb_is_running():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('127.0.0.1', 28015))
return True
except:
return False
if_rethinkdb = pytest.mark.skipif(
not rethinkdb_is_running(),
reason='rethinkdb not listening at 127.0.0.1:28015')
@if_rethinkdb
def test_individual_options():
rr = doublethink.Rethinker(['127.0.0.1'])
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-stats-url=rethinkdb://127.0.0.1/db0/stats'])
assert rr.db('db0').table_list().run() == ['stats']
finally:
rr.db_drop('db0').run()
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-services-url=rethinkdb://127.0.0.1/db1/services'])
assert rr.db('db1').table_list().run() == ['services']
finally:
rr.db_drop('db1').run()
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-dedup-url=rethinkdb://127.0.0.1/db2/dedup'])
assert rr.db('db2').table_list().run() == ['dedup']
finally:
rr.db_drop('db2').run()
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-big-table-url=rethinkdb://127.0.0.1/db3/captures'])
assert rr.db('db3').table_list().run() == ['captures']
finally:
rr.db_drop('db3').run()
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-trough-db-url=rethinkdb://127.0.0.1/db4'])
assert rr.db('db4').table_list().run() == ['services']
# ['assignment', 'lock', 'schema', 'services']
finally:
rr.db_drop('db4').run()
@if_rethinkdb
def test_combos():
rr = doublethink.Rethinker(['127.0.0.1'])
try:
warcprox.main.ensure_rethinkdb_tables([
'warcprox-ensure-rethinkdb-tables',
'--rethinkdb-stats-url=rethinkdb://127.0.0.1/db00/stats',
'--rethinkdb-trough-db-url=rethinkdb://127.0.0.1/db01',
])
assert rr.db('db00').table_list().run() == ['stats']
assert rr.db('db01').table_list().run() == ['services']
# ['assignment', 'lock', 'schema', 'services']
finally:
rr.db_drop('db00').run()
rr.db_drop('db01').run()

View File

@ -88,6 +88,7 @@ def _send(self, data):
# http_client.HTTPConnection.send = _send
logging.basicConfig(
# stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
stream=sys.stdout, level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
@ -332,7 +333,7 @@ def https_daemon(request, cert):
return https_daemon
@pytest.fixture(scope="module")
def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
def warcprox_(request):
orig_dir = os.getcwd()
work_dir = tempfile.mkdtemp()
logging.info('changing to working directory %r', work_dir)
@ -345,12 +346,15 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
'--playback-port=0',
'--onion-tor-socks-proxy=localhost:9050',
'--crawl-log-dir=crawl-logs']
if rethinkdb_servers:
rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
argv.append('--rethinkdb-servers=%s' % rethinkdb_servers)
argv.append('--rethinkdb-db=%s' % rethinkdb_db)
if rethinkdb_big_table:
argv.append('--rethinkdb-big-table')
if request.config.getoption('--rethinkdb-dedup-url'):
argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url'))
# test these here only
argv.append('--rethinkdb-stats-url=rethinkdb://localhost/test0/stats')
argv.append('--rethinkdb-services-url=rethinkdb://localhost/test0/services')
elif request.config.getoption('--rethinkdb-big-table-url'):
argv.append('--rethinkdb-big-table-url=%s' % request.config.getoption('--rethinkdb-big-table-url'))
elif request.config.getoption('--rethinkdb-trough-db-url'):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
@ -363,10 +367,22 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table):
def fin():
warcprox_.stop.set()
warcprox_thread.join()
if rethinkdb_servers:
logging.info('dropping rethinkdb database %r', rethinkdb_db)
rr = doublethink.Rethinker(rethinkdb_servers)
result = rr.db_drop(rethinkdb_db).run()
for rethinkdb_url in (
warcprox_.options.rethinkdb_big_table_url,
warcprox_.options.rethinkdb_dedup_url,
warcprox_.options.rethinkdb_services_url,
warcprox_.options.rethinkdb_stats_url):
if not rethinkdb_url:
continue
parsed = doublethink.parse_rethinkdb_url(rethinkdb_url)
rr = doublethink.Rethinker(servers=parsed.hosts)
try:
logging.info('dropping rethinkdb database %r', parsed.database)
rr.db_drop(parsed.database).run()
except Exception as e:
logging.warn(
'problem deleting rethinkdb database %r: %s',
parsed.database, e)
logging.info('deleting working directory %r', work_dir)
os.chdir(orig_dir)
shutil.rmtree(work_dir)
@ -500,6 +516,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -573,6 +590,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -1720,6 +1738,24 @@ def test_payload_digest(warcprox_, http_daemon):
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
def test_trough_segment_promotion(warcprox_):
if not warcprox_.options.rethinkdb_trough_db_url:
return
cli = warcprox.trough.TroughClient(
warcprox_.options.rethinkdb_trough_db_url, 3)
promoted = []
def mock(segment_id):
promoted.append(segment_id)
cli.promote = mock
cli.register_schema('default', 'create table foo (bar varchar(100))')
cli.write('my_seg', 'insert into foo (bar) values ("boof")')
assert promoted == []
time.sleep(3)
assert promoted == ['my_seg']
promoted = []
time.sleep(3)
assert promoted == []
if __name__ == '__main__':
pytest.main()

View File

@ -39,13 +39,12 @@ class RethinkCaptures:
"""Inserts in batches every 0.5 seconds"""
logger = logging.getLogger("warcprox.bigtable.RethinkCaptures")
def __init__(
self, rr, table="captures", shards=None, replicas=None,
options=warcprox.Options()):
self.rr = rr
self.table = table
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_big_table_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.options = options
self._ensure_db_table()
@ -107,7 +106,9 @@ class RethinkCaptures:
self.logger.info(
"creating rethinkdb table %r in database %r",
self.table, self.rr.dbname)
self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run()
self.rr.table_create(
self.table, shards=len(self.rr.servers),
replicas=min(3, len(self.rr.servers))).run()
self.rr.table(self.table).index_create(
"abbr_canon_surt_timestamp",
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
@ -217,8 +218,8 @@ class RethinkCaptures:
class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, captures_db, options=warcprox.Options()):
self.captures_db = captures_db
def __init__(self, options=warcprox.Options()):
self.captures_db = RethinkCaptures(options=options)
self.options = options
def lookup(self, digest_key, bucket="__unspecified__", url=None):
@ -248,3 +249,7 @@ class RethinkCapturesDedup:
def close(self):
self.captures_db.close()
def notify(self, recorded_url, records):
self.captures_db.notify(recorded_url, records)

View File

@ -21,13 +21,15 @@ USA.
from __future__ import absolute_import
from datetime import datetime
import logging
import os
import json
from hanzo import warctools
import warcprox
import warcprox.trough
import sqlite3
import doublethink
import datetime
import urllib3
from urllib3.exceptions import HTTPError
@ -120,11 +122,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()):
self.rr = rr
self.table = table
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self._ensure_db_table()
self.options = options
@ -137,12 +139,11 @@ class RethinkDedupDb:
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards,
self.replicas)
"replicas=%r", self.table, self.rr.dbname,
len(self.rr.servers), min(3, len(self.rr.servers)))
self.rr.table_create(
self.table, primary_key="key", shards=self.shards,
replicas=self.replicas).run()
self.table, primary_key="key", shards=len(self.rr.servers),
replicas=min(3, len(self.rr.servers))).run()
def start(self):
pass
@ -180,7 +181,6 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class CdxServerDedup(object):
"""Query a CDX server to perform deduplication.
"""
@ -230,8 +230,8 @@ class CdxServerDedup(object):
if line:
(cdx_ts, cdx_digest) = line.split(b' ')
if cdx_digest == dkey:
dt = datetime.strptime(cdx_ts.decode('ascii'),
'%Y%m%d%H%M%S')
dt = datetime.datetime.strptime(
cdx_ts.decode('ascii'), '%Y%m%d%H%M%S')
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
return dict(url=url, date=date)
except (HTTPError, AssertionError, ValueError) as exc:
@ -243,3 +243,62 @@ class CdxServerDedup(object):
"""Since we don't save anything to CDX server, this does not apply.
"""
pass
class TroughDedupDb(object):
'''
https://github.com/internetarchive/trough
'''
logger = logging.getLogger("warcprox.dedup.TroughDedupDb")
SCHEMA_ID = 'warcprox-dedup-v1'
SCHEMA_SQL = ('create table dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
self.options = options
self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
def save(self, digest_key, response_record, bucket='__unspecified__'):
record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
self._trough_cli.write(
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',
(digest_key,))
if results:
assert len(results) == 1 # sanity check (digest_key is primary key)
result = results[0]
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
self.logger.debug(
'trough lookup of key=%r returning %r', digest_key, result)
return result
else:
return None
def notify(self, recorded_url, records):
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['captures-bucket'])
else:
self.save(digest_key, records[0])

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
# vim: set fileencoding=utf-8:
'''
warcprox/main.py - entrypoint for warcprox executable, parses command line
arguments, initializes components, starts controller, handles signals
@ -58,9 +59,9 @@ class BetterArgumentDefaultsHelpFormatter(
if isinstance(action, argparse._StoreConstAction):
return action.help
else:
return super()._get_help_string(action)
return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action)
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
def _build_arg_parser(prog):
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=BetterArgumentDefaultsHelpFormatter)
@ -98,8 +99,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help=(
'persistent statistics database file; empty string or '
'/dev/null disables statistics tracking'))
group.add_argument(
'--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=(
'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_stats_table'))
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
@ -108,18 +119,27 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
group.add_argument(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
group.add_argument(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument(
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
default='captures', help=argparse.SUPPRESS)
'--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
@ -189,30 +209,25 @@ def init_controller(args):
exit(1)
listeners = []
if args.rethinkdb_servers:
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(
rr, table=args.rethinkdb_big_table_name, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(
captures_db, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
listeners.append(dedup_db)
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db)
if args.rethinkdb_servers:
stats_db = warcprox.stats.RethinkStatsDb(rr, options=options)
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
@ -269,8 +284,11 @@ def init_controller(args):
listeners=listeners, options=options)
for i in range(num_writer_threads)]
if args.rethinkdb_servers:
svcreg = doublethink.ServiceRegistry(rr)
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else:
svcreg = None
@ -280,7 +298,7 @@ def init_controller(args):
return controller
def parse_args(argv=sys.argv):
def parse_args(argv):
'''
Parses command line arguments with argparse.
'''
@ -288,11 +306,11 @@ def parse_args(argv=sys.argv):
args = arg_parser.parse_args(args=argv[1:])
return args
def main(argv=sys.argv):
def main(argv=None):
'''
Main method, entry point of warcprox command.
'''
args = parse_args(argv)
args = parse_args(argv or sys.argv)
if args.trace:
loglevel = warcprox.TRACE
@ -304,8 +322,7 @@ def main(argv=sys.argv):
loglevel = logging.INFO
logging.basicConfig(
stream=sys.stdout, level=loglevel,
format=(
stream=sys.stdout, level=loglevel, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
@ -324,7 +341,7 @@ def main(argv=sys.argv):
controller.run_until_shutdown()
def ensure_rethinkdb_tables():
def ensure_rethinkdb_tables(argv=None):
'''
Creates rethinkdb tables if they don't already exist. Warcprox normally
creates the tables it needs on demand at startup, but if multiple instances
@ -332,40 +349,74 @@ def ensure_rethinkdb_tables():
tables. So it's a good idea to use this utility at an early step when
spinning up a cluster.
'''
argv = argv or sys.argv
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(sys.argv[0]),
prog=os.path.basename(argv[0]),
formatter_class=BetterArgumentDefaultsHelpFormatter)
arg_parser.add_argument(
'--rethinkdb-servers', dest='rethinkdb_servers', default='localhost',
help='rethinkdb servers e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
'--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=(
'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_stats_table'))
group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=(
'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/my_dedup_table'))
group.add_argument(
'--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=(
'rethinkdb big table url (table will be populated with '
'various capture information and is suitable for use as '
'index for playback), e.g. rethinkdb://db0.foo.org,'
'db1.foo.org:38015/my_warcprox_db/captures'))
group.add_argument(
'--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=(
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))
arg_parser.add_argument(
'--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name')
'--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
arg_parser.add_argument(
'-q', '--quiet', dest='log_level',
action='store_const', default=logging.INFO, const=logging.WARN)
arg_parser.add_argument(
'-v', '--verbose', dest='log_level',
action='store_const', default=logging.INFO, const=logging.DEBUG)
args = arg_parser.parse_args(args=sys.argv[1:])
args = arg_parser.parse_args(args=argv[1:])
logging.basicConfig(
stream=sys.stdout, level=args.log_level,
format=(
stream=sys.stdout, level=args.log_level, format=(
'%(asctime)s %(levelname)s %(name)s.%(funcName)s'
'(%(filename)s:%(lineno)d) %(message)s'))
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(','), args.rethinkdb_db)
options = warcprox.Options(**vars(args))
# services table
doublethink.ServiceRegistry(rr)
did_something = False
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
did_something = True
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
did_something = True
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
did_something = True
if args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
did_something = True
if args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
logging.warn(
'trough it responsible for creating most of the rethinkdb '
'tables that it uses')
did_something = True
# stats table
warcprox.stats.RethinkStatsDb(rr)
# captures table
warcprox.bigtable.RethinkCaptures(rr)
if not did_something:
logging.error('nothing to do, no --rethinkdb-* options supplied')
if __name__ == '__main__':
main()

View File

@ -32,6 +32,7 @@ import datetime
import urlcanon
import sqlite3
import copy
import doublethink
def _empty_bucket(bucket):
return {
@ -190,11 +191,12 @@ class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()):
self.rr = rethinker
self.table = table
self.shards = shards or 1 # 1 shard by default because it's probably a small table
self.replicas = replicas or min(3, len(self.rr.servers))
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.replicas = min(3, len(self.rr.servers))
self._ensure_db_table()
self.options = options
@ -272,10 +274,10 @@ class RethinkStatsDb(StatsDb):
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, self.shards,
"replicas=%r", self.table, self.rr.dbname, 1,
self.replicas)
self.rr.table_create(
self.table, primary_key="bucket", shards=self.shards,
self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run()
def close(self):

239
warcprox/trough.py Normal file
View File

@ -0,0 +1,239 @@
'''
warcprox/trough.py - trough client code
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import os
import json
import requests
import doublethink
import rethinkdb as r
import datetime
import threading
import time
class TroughClient(object):
logger = logging.getLogger("warcprox.trough.TroughClient")
def __init__(self, rethinkdb_trough_db_url, promotion_interval=None):
'''
TroughClient constructor
Args:
rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to
trough configuration database
promotion_interval: if specified, `TroughClient` will spawn a
thread that "promotes" (pushed to hdfs) "dirty" trough segments
(segments that have received writes) periodically, sleeping for
`promotion_interval` seconds between cycles (default None)
'''
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self._write_url_cache = {}
self._read_url_cache = {}
self._dirty_segments = set()
self._dirty_segments_lock = threading.RLock()
self.promotion_interval = promotion_interval
self._promoter_thread = None
if promotion_interval:
self._promoter_thread = threading.Thread(
target=self._promotrix, name='TroughClient-promoter')
self._promoter_thread.setDaemon(True)
self._promoter_thread.start()
def _promotrix(self):
while True:
time.sleep(self.promotion_interval)
try:
with self._dirty_segments_lock:
dirty_segments = list(self._dirty_segments)
self._dirty_segments.clear()
logging.info(
'promoting %s trough segments', len(dirty_segments))
for segment in dirty_segments:
try:
self.promote(segment)
except:
logging.error(
'problem promoting segment %s', exc_info=True)
except:
logging.error(
'caught exception doing segment promotion',
exc_info=True)
def promote(self, segment_id):
url = os.path.join(self.segment_manager_url(), 'promote')
payload_dict = {'segment': segment_id}
response = requests.post(url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, url,
json.dumps(payload_dict)))
@staticmethod
def sql_value(x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# py3: repr(u'abc') => 'abc'
# repr(b'abc') => b'abc'
# py2: repr(u'abc') => u'abc'
# repr(b'abc') => 'abc'
# Repr gives us a prefix we don't want in different situations
# depending on whether this is py2 or py3. Chop it off either way.
r = repr(x)
if r[:1] == "'":
return r
else:
return r[1:]
elif isinstance(x, (int, float)):
return x
else:
raise Exception(
"don't know how to make an sql value from %r (%r)" % (
x, type(x)))
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url_nocache(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url_nocache(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
self.logger.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def write_url(self, segment_id, schema_id='default'):
if not segment_id in self._write_url_cache:
self._write_url_cache[segment_id] = self.write_url_nocache(
segment_id, schema_id)
self.logger.info(
'segment %r write url is %r', segment_id,
self._write_url_cache[segment_id])
return self._write_url_cache[segment_id]
def read_url(self, segment_id):
if not self._read_url_cache.get(segment_id):
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
self.logger.info(
'segment %r read url is %r', segment_id,
self._read_url_cache[segment_id])
return self._read_url_cache[segment_id]
def write(self, segment_id, sql_tmpl, values=(), schema_id='default'):
write_url = self.write_url(segment_id, schema_id)
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(write_url, sql)
if segment_id not in self._dirty_segments:
with self._dirty_segments_lock:
self._dirty_segments.add(segment_id)
except:
del self._write_url_cache[segment_id]
self.logger.error(
'problem with trough write url %r', write_url,
exc_info=True)
return
if response.status_code != 200:
del self._write_url_cache[segment_id]
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
write_url, sql)
return
self.logger.debug('posted %r to %s', sql, write_url)
def read(self, segment_id, sql_tmpl, values=()):
read_url = self.read_url(segment_id)
if not read_url:
return None
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(read_url, sql)
except:
del self._read_url_cache[segment_id]
self.logger.error(
'problem with trough read url %r', read_url, exc_info=True)
return None
if response.status_code != 200:
del self._read_url_cache[segment_id]
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
read_url, sql)
return None
self.logger.trace(
'got %r from posting query %r to %r', response.text, sql,
read_url)
results = json.loads(response.text)
return results
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = os.path.join(
self.segment_manager_url(), 'schema', schema_id, 'sql')
response = requests.put(url, sql)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))