From 4eda89f23264bcd0ab8040a1e408bf71c4825710 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 3 Oct 2017 12:41:04 -0700 Subject: [PATCH 01/35] trough for deduplication initial proof-of-concept-ish code --- warcprox/dedup.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++ warcprox/main.py | 6 ++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..41439c2 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -27,6 +27,7 @@ import json from hanzo import warctools import warcprox import sqlite3 +import requests class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -174,3 +175,107 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) +class TroughDedupDb(object): + ''' + https://github.com/jkafader/trough + ''' + logger = logging.getLogger("warcprox.dedup.TroughDedupDb") + + def __init__(self, options=warcprox.Options()): + self.options = options + + def start(self): + pass + + def stop(self): + pass + + def _write_url(self, bucket): + import doublethink + segment_id = 'warcprox-trough-%s' % bucket + rr = doublethink.Rethinker( + servers=['localhost'], db='trough_configuration') + services = doublethink.ServiceRegistry(rr) + master_node = services.unique_service('trough-sync-master') + response = requests.post(master_node['url'], segment_id) + response.raise_for_status() + write_url = response.text.strip() + return write_url + + def _read_url(self, bucket): + import doublethink + import rethinkdb as r + segment_id = 'warcprox-trough-%s' % bucket + rr = doublethink.Rethinker( + servers=['localhost'], db='trough_configuration') + reql = rr.table('services').get_all(segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') + logging.debug('querying rethinkdb: %r', reql) + results = reql.run() + if results: + return results[0]['url'] + else: + return None + + def save(self, digest_key, response_record, bucket='__unspecified__'): + write_url = self._write_url(bucket) + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('ascii') + url = response_record.get_header(warctools.WarcRecord.URL).decode('ascii') + warc_date = response_record.get_header(warctools.WarcRecord.DATE).decode('ascii') + + # XXX create table statement here is a temporary hack, + # see https://webarchive.jira.com/browse/AITFIVE-1465 + sql = ('create table if not exists dedup (\n' + ' digest_key varchar(100) primary key,\n' + ' url varchar(2100) not null,\n' + ' date datetime not null,\n' + ' id varchar(100));\n' # warc record id + 'insert into dedup (digest_key, url, date, id) ' + 'values (%r, %r, %r, %r);') % ( + digest_key.decode('ascii'), url, warc_date, record_id) + response = requests.post(write_url, sql) + if response.status_code != 200: + logging.warn( + 'unexpected response %r %r %r to sql=%r', + response.status_code, response.reason, response.text, sql) + + def lookup(self, digest_key, bucket='__unspecified__'): + read_url = self._read_url(bucket) + if not read_url: + return None + sql = 'select * from dedup where digest_key=%r;' % digest_key.decode('ascii') + response = requests.post(read_url, sql) + if response.status_code != 200: + logging.warn( + 'unexpected response %r %r %r to sql=%r', + response.status_code, response.reason, response.text, sql) + return None + logging.debug('got %r from query %r', response.text, sql) + results = json.loads(response.text) + assert len(results) <= 1 # sanity check (digest_key is primary key) + if results: + result = results[0] + result['id'] = result['id'].encode('ascii') + result['url'] = result['url'].encode('ascii') + result['date'] = result['date'].encode('ascii') + self.logger.debug( + 'trough lookup of key=%r returning %r', digest_key, result) + return result + else: + return None + + def notify(self, recorded_url, records): + if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + digest_key = warcprox.digest_str( + recorded_url.response_recorder.payload_digest, + self.options.base32) + if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + self.save( + digest_key, records[0], + bucket=recorded_url.warcprox_meta['captures-bucket']) + else: + self.save(digest_key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..f4ac391 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -108,6 +108,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument('--rethinkdb-big-table', @@ -177,7 +178,10 @@ def init_controller(args): exit(1) listeners = [] - if args.rethinkdb_servers: + if args.trough: + dedup_db = warcprox.dedup.TroughDedupDb(options) + listeners.append(dedup_db) + elif args.rethinkdb_servers: rr = doublethink.Rethinker( args.rethinkdb_servers.split(","), args.rethinkdb_db) if args.rethinkdb_big_table: From d177b3b80da66319be0d7c0c817160cc06faf25b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Oct 2017 12:06:19 -0700 Subject: [PATCH 02/35] change rethinkdb-related command line options to use "rethinkdb urls" (parser just added to doublethink) to reduce the proliferation of rethinkdb options, and add --rethinkdb-trough-db-url option --- .travis.yml | 4 +-- setup.py | 2 +- tests/conftest.py | 70 ++++++++++++++++++------------------ tests/run-tests.sh | 6 ++-- tests/test_warcprox.py | 37 +++++++++++++------ warcprox/bigtable.py | 25 +++++++------ warcprox/dedup.py | 47 ++++++++++++------------- warcprox/main.py | 80 +++++++++++++++++++++++++----------------- warcprox/stats.py | 16 +++++---- 9 files changed, 164 insertions(+), 123 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5f7d8b3..a1848b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,8 +34,8 @@ before_script: script: - py.test -v tests -- py.test -v --rethinkdb-servers=localhost tests -- py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests +- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests +- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests notifications: slack: diff --git a/setup.py b/setup.py index 89f37dc..47852c9 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', - 'doublethink>=0.2.0.dev81', + 'doublethink>=0.2.0.dev87', 'PySocks', ] try: diff --git a/tests/conftest.py b/tests/conftest.py index 27d4141..d130491 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,39 +1,41 @@ -# -# tests/conftest.py - command line options for warcprox tests -# -# Copyright (C) 2015-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +# vim: set fileencoding=utf-8: +''' +tests/conftest.py - command line options for warcprox tests + +Copyright (C) 2015-2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' import pytest def pytest_addoption(parser): - parser.addoption('--rethinkdb-servers', dest='rethinkdb_servers', - help='rethink db servers for dedup, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - parser.addoption('--rethinkdb-big-table', - dest='rethinkdb_big_table', action='store_true', default=False, - help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') - -@pytest.fixture(scope="module") -def rethinkdb_servers(request): - return request.config.getoption("--rethinkdb-servers") - -@pytest.fixture(scope="module") -def rethinkdb_big_table(request): - return request.config.getoption("--rethinkdb-big-table") - + parser.addoption( + '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=( + 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_dedup_table')) + parser.addoption( + '--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=( + 'rethinkdb big table url (table will be populated with ' + 'various capture information and is suitable for use as ' + 'index for playback), e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/captures')) + parser.addoption( + '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=( + '🐷   url pointing to trough configuration rethinkdb database, ' + 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' + '/trough_configuration')) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..747f042 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -42,7 +42,9 @@ do && source /tmp/venv/bin/activate \ && pip --log-file /tmp/pip.log install . pytest requests warcio \ && py.test -v tests \ - && py.test -v --rethinkdb-servers=localhost tests \ - && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" + && py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \ + && py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \ + && py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/test3 tests \ + " done diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b24a5c8..53b6f71 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -242,7 +242,7 @@ def https_daemon(request, cert): return https_daemon @pytest.fixture(scope="module") -def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): +def warcprox_(request): orig_dir = os.getcwd() work_dir = tempfile.mkdtemp() logging.info('changing to working directory %r', work_dir) @@ -254,12 +254,15 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): '--port=0', '--playback-port=0', '--onion-tor-socks-proxy=localhost:9050'] - if rethinkdb_servers: - rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) - argv.append('--rethinkdb-db=%s' % rethinkdb_db) - if rethinkdb_big_table: - argv.append('--rethinkdb-big-table') + if request.config.getoption('--rethinkdb-dedup-url'): + argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) + # test these here only + argv.append('--rethinkdb-stats-url=rethinkdb://localhost/test0/stats') + argv.append('--rethinkdb-services-url=rethinkdb://localhost/test0/services') + elif request.config.getoption('--rethinkdb-big-table-url'): + argv.append('--rethinkdb-big-table-url=%s' % request.config.getoption('--rethinkdb-big-table-url')) + elif request.config.getoption('--rethinkdb-trough-db-url'): + argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url')) args = warcprox.main.parse_args(argv) warcprox_ = warcprox.main.init_controller(args) @@ -272,10 +275,22 @@ def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): def fin(): warcprox_.stop.set() warcprox_thread.join() - if rethinkdb_servers: - logging.info('dropping rethinkdb database %r', rethinkdb_db) - rr = doublethink.Rethinker(rethinkdb_servers) - result = rr.db_drop(rethinkdb_db).run() + for rethinkdb_url in ( + warcprox_.options.rethinkdb_big_table_url, + warcprox_.options.rethinkdb_dedup_url, + warcprox_.options.rethinkdb_services_url, + warcprox_.options.rethinkdb_stats_url): + if not rethinkdb_url: + continue + parsed = doublethink.parse_rethinkdb_url(rethinkdb_url) + rr = doublethink.Rethinker(servers=parsed.hosts) + try: + logging.info('dropping rethinkdb database %r', parsed.database) + rr.db_drop(parsed.database).run() + except Exception as e: + logging.warn( + 'problem deleting rethinkdb database %r: %s', + parsed.database, e) logging.info('deleting working directory %r', work_dir) os.chdir(orig_dir) shutil.rmtree(work_dir) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..d2147b8 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -39,13 +39,12 @@ class RethinkCaptures: """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") - def __init__( - self, rr, table="captures", shards=None, replicas=None, - options=warcprox.Options()): - self.rr = rr - self.table = table - self.shards = shards or len(rr.servers) - self.replicas = replicas or min(3, len(rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_big_table_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table self.options = options self._ensure_db_table() @@ -107,7 +106,9 @@ class RethinkCaptures: self.logger.info( "creating rethinkdb table %r in database %r", self.table, self.rr.dbname) - self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run() + self.rr.table_create( + self.table, shards=len(self.rr.servers), + replicas=min(3, len(self.rr.servers))).run() self.rr.table(self.table).index_create( "abbr_canon_surt_timestamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run() @@ -216,8 +217,8 @@ class RethinkCaptures: class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") - def __init__(self, captures_db, options=warcprox.Options()): - self.captures_db = captures_db + def __init__(self, options=warcprox.Options()): + self.captures_db = RethinkCaptures(options=options) self.options = options def lookup(self, digest_key, bucket="__unspecified__"): @@ -247,3 +248,7 @@ class RethinkCapturesDedup: def close(self): self.captures_db.close() + + def notify(self, recorded_url, records): + self.captures_db.notify(recorded_url, records) + diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 41439c2..0a45b7c 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -28,6 +28,8 @@ from hanzo import warctools import warcprox import sqlite3 import requests +import doublethink +import rethinkdb as r class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -115,11 +117,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") - def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()): - self.rr = rr - self.table = table - self.shards = shards or len(rr.servers) - self.replicas = replicas or min(3, len(rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table self._ensure_db_table() self.options = options @@ -132,12 +134,11 @@ class RethinkDedupDb: if not self.table in tables: self.logger.info( "creating rethinkdb table %r in database %r shards=%r " - "replicas=%r", self.table, self.rr.dbname, self.shards, - self.replicas) + "replicas=%r", self.table, self.rr.dbname, + len(self.rr.servers), min(3, len(self.rr.servers))) self.rr.table_create( - self.table, primary_key="key", shards=self.shards, - replicas=self.replicas).run() - + self.table, primary_key="key", shards=len(self.rr.servers), + replicas=min(3, len(self.rr.servers))).run() def start(self): pass @@ -182,6 +183,11 @@ class TroughDedupDb(object): logger = logging.getLogger("warcprox.dedup.TroughDedupDb") def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_trough_db_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.svcreg = doublethink.ServiceRegistry(self.rr) self.options = options def start(self): @@ -191,28 +197,21 @@ class TroughDedupDb(object): pass def _write_url(self, bucket): - import doublethink segment_id = 'warcprox-trough-%s' % bucket - rr = doublethink.Rethinker( - servers=['localhost'], db='trough_configuration') - services = doublethink.ServiceRegistry(rr) - master_node = services.unique_service('trough-sync-master') + master_node = self.svcreg.unique_service('trough-sync-master') response = requests.post(master_node['url'], segment_id) response.raise_for_status() write_url = response.text.strip() return write_url def _read_url(self, bucket): - import doublethink - import rethinkdb as r segment_id = 'warcprox-trough-%s' % bucket - rr = doublethink.Rethinker( - servers=['localhost'], db='trough_configuration') - reql = rr.table('services').get_all(segment_id, index='segment').filter( - {'role':'trough-read'}).filter( - lambda svc: r.now().sub( - svc['last_heartbeat']).lt(svc['ttl']) - ).order_by('load') + reql = self.rr.table('services').get_all( + segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') logging.debug('querying rethinkdb: %r', reql) results = reql.run() if results: diff --git a/warcprox/main.py b/warcprox/main.py index f4ac391..b7f3ec6 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# vim: set fileencoding=utf-8: ''' warcprox/main.py - entrypoint for warcprox executable, parses command line arguments, initializes components, starts controller, handles signals @@ -42,6 +43,7 @@ import warcprox import doublethink import cryptography.hazmat.backends.openssl import importlib +import doublethink class BetterArgumentDefaultsHelpFormatter( argparse.ArgumentDefaultsHelpFormatter, @@ -96,8 +98,18 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default=False, help='write digests in Base32 instead of hex') arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', action='append', help='only record requests with the given http method(s) (can be used more than once)') - arg_parser.add_argument('--stats-db-file', dest='stats_db_file', - default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + + group = arg_parser.add_mutually_exclusive_group() + group.add_argument( + '--stats-db-file', dest='stats_db_file', + default='./warcprox.sqlite', help=( + 'persistent statistics database file; empty string or ' + '/dev/null disables statistics tracking')) + group.add_argument( + '--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=( + 'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_stats_table')) + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', type=int, default=None, help='port to listen on for instant playback') arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', @@ -106,17 +118,25 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') - group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', - help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - group.add_argument('--trough', help='use trough for deduplication 🐷 🐷 🐷 🐷', action='store_true') - arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', - help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument('--rethinkdb-big-table', - dest='rethinkdb_big_table', action='store_true', default=False, - help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') + group.add_argument( + '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=( + 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_dedup_table')) + group.add_argument( + '--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=( + 'rethinkdb big table url (table will be populated with ' + 'various capture information and is suitable for use as ' + 'index for playback), e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/captures')) + group.add_argument( + '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=( + '🐷   url pointing to trough configuration rethinkdb database, ' + 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' + '/trough_configuration')) arg_parser.add_argument( - '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', - default='captures', help=argparse.SUPPRESS) + '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( + 'rethinkdb service registry table url; if provided, warcprox ' + 'will create and heartbeat entry for itself')) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, @@ -178,30 +198,23 @@ def init_controller(args): exit(1) listeners = [] - if args.trough: + + if args.rethinkdb_dedup_url: + dedup_db = warcprox.dedup.RethinkDedupDb(options=options) + elif args.rethinkdb_big_table_url: + dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) + elif args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) - listeners.append(dedup_db) - elif args.rethinkdb_servers: - rr = doublethink.Rethinker( - args.rethinkdb_servers.split(","), args.rethinkdb_db) - if args.rethinkdb_big_table: - captures_db = warcprox.bigtable.RethinkCaptures( - rr, table=args.rethinkdb_big_table_name, options=options) - dedup_db = warcprox.bigtable.RethinkCapturesDedup( - captures_db, options=options) - listeners.append(captures_db) - else: - dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) - listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None else: dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) + if dedup_db: listeners.append(dedup_db) - if args.rethinkdb_servers: - stats_db = warcprox.stats.RethinkStatsDb(rr, options=options) + if args.rethinkdb_stats_url: + stats_db = warcprox.stats.RethinkStatsDb(options=options) listeners.append(stats_db) elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') @@ -252,8 +265,11 @@ def init_controller(args): listeners=listeners, options=options) for i in range(int(proxy.max_threads ** 0.5))] - if args.rethinkdb_servers: - svcreg = doublethink.ServiceRegistry(rr) + if args.rethinkdb_services_url: + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_services_url) + rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) else: svcreg = None @@ -303,8 +319,7 @@ def main(argv=sys.argv): loglevel = logging.INFO logging.basicConfig( - stream=sys.stdout, level=loglevel, - format=( + stream=sys.stdout, level=loglevel, format=( '%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) @@ -318,6 +333,7 @@ def ensure_rethinkdb_tables(): tables. So it's a good idea to use this utility at an early step when spinning up a cluster. ''' + raise Exception('adjust my args') arg_parser = argparse.ArgumentParser( prog=os.path.basename(sys.argv[0]), formatter_class=BetterArgumentDefaultsHelpFormatter) diff --git a/warcprox/stats.py b/warcprox/stats.py index 55693a2..5a87461 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -32,6 +32,7 @@ import datetime import urlcanon import sqlite3 import copy +import doublethink def _empty_bucket(bucket): return { @@ -189,11 +190,12 @@ class RethinkStatsDb(StatsDb): """Updates database in batch every 2.0 seconds""" logger = logging.getLogger("warcprox.stats.RethinkStatsDb") - def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): - self.rr = rethinker - self.table = table - self.shards = shards or 1 # 1 shard by default because it's probably a small table - self.replicas = replicas or min(3, len(self.rr.servers)) + def __init__(self, options=warcprox.Options()): + parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.table = parsed.table + self.replicas = min(3, len(self.rr.servers)) self._ensure_db_table() self.options = options @@ -271,10 +273,10 @@ class RethinkStatsDb(StatsDb): if not self.table in tables: self.logger.info( "creating rethinkdb table %r in database %r shards=%r " - "replicas=%r", self.table, self.rr.dbname, self.shards, + "replicas=%r", self.table, self.rr.dbname, 1, self.replicas) self.rr.table_create( - self.table, primary_key="bucket", shards=self.shards, + self.table, primary_key="bucket", shards=1, replicas=self.replicas).run() def close(self): From 369dc5c124fac8fba5fb532e51205a68010f4f71 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Oct 2017 17:28:47 -0700 Subject: [PATCH 03/35] install and run trough in docker container for testing --- tests/Dockerfile | 59 ++++++++++++++++++++++++++++++++++++++++++---- tests/run-tests.sh | 4 +--- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/tests/Dockerfile b/tests/Dockerfile index 04c6d72..2bb46b0 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -1,7 +1,7 @@ # # Dockerfile for warcprox tests # -# Copyright (C) 2015-2016 Internet Archive +# Copyright (C) 2015-2017 Internet Archive # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -23,19 +23,19 @@ FROM phusion/baseimage MAINTAINER Noah Levitt # see https://github.com/stuartpb/rethinkdb-dockerfiles/blob/master/trusty/2.1.3/Dockerfile +# and https://github.com/chali/hadoop-cdh-pseudo-docker/blob/master/Dockerfile ENV LANG=C.UTF-8 RUN apt-get update && apt-get --auto-remove -y dist-upgrade # Add the RethinkDB repository and public key -# "RethinkDB Packaging " http://download.rethinkdb.com/apt/pubkey.gpg -RUN apt-key adv --keyserver pgp.mit.edu --recv-keys 1614552E5765227AEC39EFCFA7E00EF33A8F2399 \ +RUN curl -s https://download.rethinkdb.com/apt/pubkey.gpg | apt-key add - \ && echo "deb http://download.rethinkdb.com/apt xenial main" > /etc/apt/sources.list.d/rethinkdb.list \ && apt-get update && apt-get -y install rethinkdb RUN mkdir -vp /etc/service/rethinkdb \ - && echo "#!/bin/sh\nrethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \ + && echo "#!/bin/bash\nexec rethinkdb --bind 0.0.0.0 --directory /tmp/rethink-data --runuser rethinkdb --rungroup rethinkdb\n" > /etc/service/rethinkdb/run \ && chmod a+x /etc/service/rethinkdb/run RUN apt-get -y install git @@ -53,6 +53,55 @@ RUN pip install virtualenv RUN apt-get -y install tor RUN mkdir -vp /etc/service/tor \ - && echo "#!/bin/sh\ntor\n" > /etc/service/tor/run \ + && echo "#!/bin/sh\nexec tor\n" > /etc/service/tor/run \ && chmod a+x /etc/service/tor/run +# hadoop hdfs for trough +RUN curl -s https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key | apt-key add - \ + && . /etc/lsb-release \ + && echo "deb [arch=amd64] http://archive.cloudera.com/cdh5/ubuntu/$DISTRIB_CODENAME/amd64/cdh $DISTRIB_CODENAME-cdh5 contrib" >> /etc/apt/sources.list.d/cloudera.list + +RUN apt-get update +RUN apt-get install -y openjdk-8-jdk hadoop-conf-pseudo + +RUN su hdfs -c 'hdfs namenode -format' + +RUN mv -v /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/core-site.xml.orig \ + && cat /etc/hadoop/conf/core-site.xml.orig | sed 's,localhost:8020,0.0.0.0:8020,' > /etc/hadoop/conf/core-site.xml + +RUN mv -v /etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml.orig \ + && cat /etc/hadoop/conf/hdfs-site.xml.orig | sed 's,^$, \n dfs.permissions.enabled\n false\n \n,' > /etc/hadoop/conf/hdfs-site.xml + +RUN echo '#!/bin/bash\nservice hadoop-hdfs-namenode start\nservice hadoop-hdfs-datanode start' > /etc/my_init.d/50_start_hdfs.sh \ + && chmod a+x /etc/my_init.d/50_start_hdfs.sh + +# trough itself +RUN virtualenv -p python3 /opt/trough-ve3 \ + && . /opt/trough-ve3/bin/activate \ + && pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \ + && pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup + +RUN mkdir -vp /etc/service/trough-sync-local \ + && echo "#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nexec sync.py >>/tmp/trough-sync-local.out 2>&1" > /etc/service/trough-sync-local/run \ + && chmod a+x /etc/service/trough-sync-local/run + +RUN mkdir -vp /etc/service/trough-sync-server \ + && echo '#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec sync.py --server >>/tmp/trough-sync-server.out 2>&1' > /etc/service/trough-sync-server/run \ + && chmod a+x /etc/service/trough-sync-server/run + +RUN mkdir -vp /etc/service/trough-read \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/reader.py >>/tmp/trough-read.out 2>&1' > /etc/service/trough-read/run \ + && chmod a+x /etc/service/trough-read/run + +RUN mkdir -vp /etc/service/trough-write \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \ + && chmod a+x /etc/service/trough-write/run + +RUN mkdir -vp /etc/service/trough-write-provisioner-local \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \ + && chmod a+x /etc/service/trough-write-provisioner-local/run + +RUN mkdir -vp /etc/service/trough-write-provisioner-server \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \ + && chmod a+x /etc/service/trough-write-provisioner-server/run + diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 747f042..68d77a4 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -5,8 +5,6 @@ # features enabled, against that instance of rethinkdb, and also run without # rethinkdb features enabled. With python 2.7 and 3.4. # -# tests/conftest.py - command line options for warcprox tests -# # Copyright (C) 2015-2017 Internet Archive # # This program is free software; you can redistribute it and/or @@ -44,7 +42,7 @@ do && py.test -v tests \ && py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \ && py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \ - && py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/test3 tests \ + && py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \ " done From 828a2c3dcf12aae498c1210dba3296e55e87795d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 13 Oct 2017 15:54:05 -0700 Subject: [PATCH 04/35] get all the tests to pass with ./tests/run-tests.sh --- .travis.yml | 1 + tests/Dockerfile | 10 +++++----- tests/run-tests.sh | 2 +- warcprox/dedup.py | 37 +++++++++++++++++++++++++++++++------ warcprox/main.py | 2 +- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index a1848b7..a9e844f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,6 +36,7 @@ script: - py.test -v tests - py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests - py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests +- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests notifications: slack: diff --git a/tests/Dockerfile b/tests/Dockerfile index 2bb46b0..5e380d8 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -86,22 +86,22 @@ RUN mkdir -vp /etc/service/trough-sync-local \ && chmod a+x /etc/service/trough-sync-local/run RUN mkdir -vp /etc/service/trough-sync-server \ - && echo '#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec sync.py --server >>/tmp/trough-sync-server.out 2>&1' > /etc/service/trough-sync-server/run \ + && echo '#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec sync.py --server >>/tmp/trough-sync-server.out 2>&1' > /etc/service/trough-sync-server/run \ && chmod a+x /etc/service/trough-sync-server/run RUN mkdir -vp /etc/service/trough-read \ - && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/reader.py >>/tmp/trough-read.out 2>&1' > /etc/service/trough-read/run \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/reader.py >>/tmp/trough-read.out 2>&1' > /etc/service/trough-read/run \ && chmod a+x /etc/service/trough-read/run RUN mkdir -vp /etc/service/trough-write \ - && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \ && chmod a+x /etc/service/trough-write/run RUN mkdir -vp /etc/service/trough-write-provisioner-local \ - && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \ && chmod a+x /etc/service/trough-write-provisioner-local/run RUN mkdir -vp /etc/service/trough-write-provisioner-server \ - && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 1\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \ + && echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \ && chmod a+x /etc/service/trough-write-provisioner-server/run diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 68d77a4..f962ca8 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -31,7 +31,7 @@ script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" docker build -t internetarchive/warcprox-tests $script_dir -for python in python2.7 python3 +for python in python3 python2.7 do docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \ bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \ diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0a45b7c..21c89f8 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -30,6 +30,7 @@ import sqlite3 import requests import doublethink import rethinkdb as r +import datetime class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -219,11 +220,33 @@ class TroughDedupDb(object): else: return None + def sql_value(self, x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + else: + raise Exception("don't know how to make an sql value from %r" % x) + def save(self, digest_key, response_record, bucket='__unspecified__'): write_url = self._write_url(bucket) - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('ascii') - url = response_record.get_header(warctools.WarcRecord.URL).decode('ascii') - warc_date = response_record.get_header(warctools.WarcRecord.DATE).decode('ascii') + record_id = response_record.get_header(warctools.WarcRecord.ID) + url = response_record.get_header(warctools.WarcRecord.URL) + warc_date = response_record.get_header(warctools.WarcRecord.DATE) # XXX create table statement here is a temporary hack, # see https://webarchive.jira.com/browse/AITFIVE-1465 @@ -233,8 +256,9 @@ class TroughDedupDb(object): ' date datetime not null,\n' ' id varchar(100));\n' # warc record id 'insert into dedup (digest_key, url, date, id) ' - 'values (%r, %r, %r, %r);') % ( - digest_key.decode('ascii'), url, warc_date, record_id) + 'values (%s, %s, %s, %s);') % ( + self.sql_value(digest_key), self.sql_value(url), + self.sql_value(warc_date), self.sql_value(record_id)) response = requests.post(write_url, sql) if response.status_code != 200: logging.warn( @@ -245,7 +269,8 @@ class TroughDedupDb(object): read_url = self._read_url(bucket) if not read_url: return None - sql = 'select * from dedup where digest_key=%r;' % digest_key.decode('ascii') + sql = 'select * from dedup where digest_key=%s;' % ( + self.sql_value(digest_key)) response = requests.post(read_url, sql) if response.status_code != 200: logging.warn( diff --git a/warcprox/main.py b/warcprox/main.py index b7f3ec6..c8c0ae8 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -60,7 +60,7 @@ class BetterArgumentDefaultsHelpFormatter( if isinstance(action, argparse._StoreConstAction): return action.help else: - return super()._get_help_string(action) + return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, From 892960d41af3b409eaad503bf5ea283d883b7d50 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 13 Oct 2017 16:26:33 -0700 Subject: [PATCH 05/35] first attempt to run trough on travis-ci --- .travis.yml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index a9e844f..7d86248 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,23 @@ services: before_install: - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker run -d --publish=28015:28015 rethinkdb +- docker run -d --publish=8020:8020 --publish=50070:50070 --publish=50010:50010 --publish=50020:50020 --publish=50075:50075 chalimartines/cdh5-pseudo-distributed +- virtualenv -p python3 /opt/trough-ve3 +- . /opt/trough-ve3/bin/activate +- pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string +- pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup +- 'sync.py >>/tmp/trough-sync-local.out 2>&1 &' +- sleep 5 ; 'python -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"' +- 'sync.py --server >>/tmp/trough-sync-server.out 2>&1 &' +- 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' +- 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' +- 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' + +install: +- pip install . pytest requests warcio before_script: -- pip install . pytest requests warcio +- ps -fHe script: - py.test -v tests @@ -38,6 +52,9 @@ script: - py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests - py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests +after_script: +- 'cat /tmp/*.out' + notifications: slack: secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= From 166aaab3e57e5f2818fa42f1512ee8ff7688604e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 13 Oct 2017 16:40:08 -0700 Subject: [PATCH 06/35] banging on travis-ci --- .travis.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 7d86248..a0c13b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,19 +26,26 @@ services: - docker before_install: +- ps -fHe - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker run -d --publish=28015:28015 rethinkdb - docker run -d --publish=8020:8020 --publish=50070:50070 --publish=50010:50010 --publish=50020:50020 --publish=50075:50075 chalimartines/cdh5-pseudo-distributed +- ps -fHe - virtualenv -p python3 /opt/trough-ve3 - . /opt/trough-ve3/bin/activate +- 'env | sort' - pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string - pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup - 'sync.py >>/tmp/trough-sync-local.out 2>&1 &' -- sleep 5 ; 'python -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"' +- date +- sleep 5 +- date +- python -c "import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS']) ; rr.db('trough_configuration').wait().run()" - 'sync.py --server >>/tmp/trough-sync-server.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' +- ps -fHe install: - pip install . pytest requests warcio From 0e78140d479253886055c28a2f5bd5a47a3032ae Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 13 Oct 2017 16:52:08 -0700 Subject: [PATCH 07/35] cryptography 2.1.1 seems to be the problem --- .travis.yml | 8 ++++---- setup.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index a0c13b6..c75c6f9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,11 +26,11 @@ services: - docker before_install: -- ps -fHe +- ps ww -fHe - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker run -d --publish=28015:28015 rethinkdb - docker run -d --publish=8020:8020 --publish=50070:50070 --publish=50010:50010 --publish=50020:50020 --publish=50075:50075 chalimartines/cdh5-pseudo-distributed -- ps -fHe +- ps ww -fHe - virtualenv -p python3 /opt/trough-ve3 - . /opt/trough-ve3/bin/activate - 'env | sort' @@ -45,13 +45,13 @@ before_install: - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' -- ps -fHe +- ps ww -fHe install: - pip install . pytest requests warcio before_script: -- ps -fHe +- ps ww -fHe script: - py.test -v tests diff --git a/setup.py b/setup.py index 47852c9..aeb9455 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ deps = [ 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev87', 'PySocks', + 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu ] try: import concurrent.futures From ddc88cda0f19dab34d48877a221f437d17f67743 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 16 Oct 2017 16:05:23 -0700 Subject: [PATCH 08/35] more banging on travis-ci --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index c75c6f9..b499a59 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,6 +26,11 @@ services: - docker before_install: +- hostname -s +- hostname -f +- ping -c3 `hostname -s` +- ping -c3 `hostname -f` +- ping -c3 localhost - ps ww -fHe - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker run -d --publish=28015:28015 rethinkdb From 994eda70a8ad8a00811402f889795d7b4a6ab3a4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Oct 2017 14:33:36 -0700 Subject: [PATCH 09/35] banging --- .travis.yml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b499a59..69b105a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,7 +50,14 @@ before_install: - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' +- sleep 3 - ps ww -fHe +- cat /tmp/trough-write.out +- cat /tmp/trough-write-provisioner-server.out +- cat /tmp/trough-write-provisioner-local.out +- cat /tmp/trough-sync-server.out +- cat /tmp/trough-sync-local.out +- cat /tmp/trough-read.out install: - pip install . pytest requests warcio @@ -65,7 +72,12 @@ script: - py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests after_script: -- 'cat /tmp/*.out' +- cat /tmp/trough-write.out +- cat /tmp/trough-write-provisioner-server.out +- cat /tmp/trough-write-provisioner-local.out +- cat /tmp/trough-sync-server.out +- cat /tmp/trough-sync-local.out +- cat /tmp/trough-read.out notifications: slack: From 73d4a19c0adafc73c23ee6a4177638e47cafce25 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Oct 2017 14:42:54 -0700 Subject: [PATCH 10/35] bangin (is the problem that we didn't start trough-read? --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 69b105a..486d62b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,6 +50,7 @@ before_install: - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' +- 'uwsgi --venv=/opt/trough-ve3 --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/reader.py >>/tmp/trough-read.out 2>&1' - sleep 3 - ps ww -fHe - cat /tmp/trough-write.out From 4c4f8ead09369186a77c117828ac965347ce2e90 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 17 Oct 2017 14:58:46 -0700 Subject: [PATCH 11/35] missed an ampersand --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 486d62b..214daff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,7 +50,7 @@ before_install: - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' -- 'uwsgi --venv=/opt/trough-ve3 --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/reader.py >>/tmp/trough-read.out 2>&1' +- 'uwsgi --venv=/opt/trough-ve3 --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/reader.py >>/tmp/trough-read.out 2>&1 &' - sleep 3 - ps ww -fHe - cat /tmp/trough-write.out From d4b39f3fcc5d6f2afbdf2abd170e6e1ef0f45277 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Oct 2017 09:45:06 -0700 Subject: [PATCH 12/35] remove some debugging from .travis.yml and importantly, deactivate the trough virtualenv before installing warcprox and running tests (otherwise it uses the wrong version of python) --- .travis.yml | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/.travis.yml b/.travis.yml index 214daff..70245ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,39 +26,22 @@ services: - docker before_install: -- hostname -s -- hostname -f -- ping -c3 `hostname -s` -- ping -c3 `hostname -f` -- ping -c3 localhost -- ps ww -fHe - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker run -d --publish=28015:28015 rethinkdb - docker run -d --publish=8020:8020 --publish=50070:50070 --publish=50010:50010 --publish=50020:50020 --publish=50075:50075 chalimartines/cdh5-pseudo-distributed -- ps ww -fHe - virtualenv -p python3 /opt/trough-ve3 - . /opt/trough-ve3/bin/activate -- 'env | sort' - pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string - pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup - 'sync.py >>/tmp/trough-sync-local.out 2>&1 &' -- date - sleep 5 -- date - python -c "import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS']) ; rr.db('trough_configuration').wait().run()" - 'sync.py --server >>/tmp/trough-sync-server.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' - 'uwsgi --venv=/opt/trough-ve3 --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/reader.py >>/tmp/trough-read.out 2>&1 &' -- sleep 3 -- ps ww -fHe -- cat /tmp/trough-write.out -- cat /tmp/trough-write-provisioner-server.out -- cat /tmp/trough-write-provisioner-local.out -- cat /tmp/trough-sync-server.out -- cat /tmp/trough-sync-local.out -- cat /tmp/trough-read.out +- deactivate install: - pip install . pytest requests warcio @@ -73,6 +56,7 @@ script: - py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests after_script: +- ps ww -fHe - cat /tmp/trough-write.out - cat /tmp/trough-write-provisioner-server.out - cat /tmp/trough-write-provisioner-local.out From a64a12289ef5642b2a6d00ebc3fa181046dbef8d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Oct 2017 15:21:53 -0700 Subject: [PATCH 13/35] in travis-ci, run trough in another docker container, so that its version of python can be independent of the one used to run the warcprox tests --- .travis.yml | 19 ++++--------------- tests/run-trough.sh | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 15 deletions(-) create mode 100644 tests/run-trough.sh diff --git a/.travis.yml b/.travis.yml index 70245ef..a2e00f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,21 +27,10 @@ services: before_install: - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 -- docker run -d --publish=28015:28015 rethinkdb -- docker run -d --publish=8020:8020 --publish=50070:50070 --publish=50010:50010 --publish=50020:50020 --publish=50075:50075 chalimartines/cdh5-pseudo-distributed -- virtualenv -p python3 /opt/trough-ve3 -- . /opt/trough-ve3/bin/activate -- pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string -- pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup -- 'sync.py >>/tmp/trough-sync-local.out 2>&1 &' -- sleep 5 -- python -c "import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS']) ; rr.db('trough_configuration').wait().run()" -- 'sync.py --server >>/tmp/trough-sync-server.out 2>&1 &' -- 'uwsgi --venv=/opt/trough-ve3 --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/writer.py >>/tmp/trough-write.out 2>&1 &' -- 'uwsgi --venv=/opt/trough-ve3 --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 &' -- 'uwsgi --venv=/opt/trough-ve3 --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 &' -- 'uwsgi --venv=/opt/trough-ve3 --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /opt/trough-ve3/bin/reader.py >>/tmp/trough-read.out 2>&1 &' -- deactivate +- docker network create --driver=bridge trough +- docker run --rm --detach --network=trough --name=rethinkdb rethinkdb +- docker run --rm --detach --network=trough --name=hadoop chalimartines/cdh5-pseudo-distributed +- docker run --rm --detach --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh install: - pip install . pytest requests warcio diff --git a/tests/run-trough.sh b/tests/run-trough.sh new file mode 100644 index 0000000..d5fbcb7 --- /dev/null +++ b/tests/run-trough.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# +# this is used by .travis.yml +# + +pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string +pip install git+https://github.com/nlevitt/trough.git@9ae5f477a28f22 + +mkdir /etc/trough + +# hello docker user-defined bridge networking +echo ' +HDFS_HOST: hadoop +RETHINKDB_HOSTS: +- rethinkdb +' > /etc/trough/settings.yml + +sync.py >>/tmp/trough-sync-local.out 2>&1 & + +sleep 5 +python -c " +import doublethink +from trough.settings import settings +rr = doublethink.Rethinker(settings['RETHINKDB_HOSTS']) +rr.db('trough_configuration').wait().run()" + +sync.py --server >>/tmp/trough-sync-server.out 2>&1 & +uwsgi --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/writer.py >>/tmp/trough-write.out 2>&1 & +uwsgi --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 & +uwsgi --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 & +uwsgi --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/reader.py >>/tmp/trough-read.out 2>&1 & + +wait From 1b172f37e9c48050cf33e90f55e135d949576064 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Oct 2017 15:28:18 -0700 Subject: [PATCH 14/35] apparently you can't use docker run options --rm and --detach together --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index a2e00f8..612f04a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,9 @@ services: before_install: - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker network create --driver=bridge trough -- docker run --rm --detach --network=trough --name=rethinkdb rethinkdb -- docker run --rm --detach --network=trough --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --rm --detach --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- docker run --detach --network=trough --name=rethinkdb rethinkdb +- docker run --detach --network=trough --name=hadoop chalimartines/cdh5-pseudo-distributed +- docker run --detach --network=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh install: - pip install . pytest requests warcio From 158c4513110355a1833fc58b2d225ecbc2b7f470 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Oct 2017 15:47:24 -0700 Subject: [PATCH 15/35] need docker to publish the rethinkdb port for --rethinkdb-dedup-url and --rethinkdb-big-table-url tests --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 612f04a..475a8cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ services: before_install: - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker network create --driver=bridge trough -- docker run --detach --network=trough --name=rethinkdb rethinkdb +- docker run --detach --network=trough --name=rethinkdb --publish=28015:28015 rethinkdb - docker run --detach --network=trough --name=hadoop chalimartines/cdh5-pseudo-distributed - docker run --detach --network=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh From 7b5fe4475eee0d5086af9cac101a445bc5be82b0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 18 Oct 2017 17:38:27 -0700 Subject: [PATCH 16/35] trough logs are inside the docker container now --- .travis.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 475a8cc..0ba9de8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,12 +46,12 @@ script: after_script: - ps ww -fHe -- cat /tmp/trough-write.out -- cat /tmp/trough-write-provisioner-server.out -- cat /tmp/trough-write-provisioner-local.out -- cat /tmp/trough-sync-server.out -- cat /tmp/trough-sync-local.out -- cat /tmp/trough-read.out +- docker exec trough cat /tmp/trough-write.out +- docker exec trough cat /tmp/trough-write-provisioner-server.out +- docker exec trough cat /tmp/trough-write-provisioner-local.out +- docker exec trough cat /tmp/trough-sync-server.out +- docker exec trough cat /tmp/trough-sync-local.out +- docker exec trough cat /tmp/trough-read.out notifications: slack: From 81497088e45242d21ea025d681bd787bec8d03b3 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 19 Oct 2017 10:20:51 -0700 Subject: [PATCH 17/35] docker container for trough needs a hostname that works from outside the container (since it registers itself in the service registry) --- .travis.yml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0ba9de8..121d563 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,11 +26,16 @@ services: - docker before_install: +- cat /etc/hosts - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker network create --driver=bridge trough -- docker run --detach --network=trough --name=rethinkdb --publish=28015:28015 rethinkdb -- docker run --detach --network=trough --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --detach --network=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- docker run --detach --network=trough --host=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb +- docker run --detach --network=trough --host=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed +- docker run --detach --network=trough --host=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- echo 127.0.0.1 rethinkdb >> /etc/hosts +- echo 127.0.0.1 hadoop >> /etc/hosts +- echo 127.0.0.1 trough >> /etc/hosts +- ping -c2 trough install: - pip install . pytest requests warcio From 7b1d2d8c5d58dfb488eb7b26ffb9bea6c973f984 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 19 Oct 2017 10:44:53 -0700 Subject: [PATCH 18/35] ugh fix docker command line arg --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 121d563..df0b016 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,9 +29,9 @@ before_install: - cat /etc/hosts - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker network create --driver=bridge trough -- docker run --detach --network=trough --host=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb -- docker run --detach --network=trough --host=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --detach --network=trough --host=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb +- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed +- docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh - echo 127.0.0.1 rethinkdb >> /etc/hosts - echo 127.0.0.1 hadoop >> /etc/hosts - echo 127.0.0.1 trough >> /etc/hosts From 0a16c0ad845e4e41d6d54b7bed5a8405854586e5 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 19 Oct 2017 10:54:47 -0700 Subject: [PATCH 19/35] can we edit /etc/hosts in travis-ci? --- .travis.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index df0b016..e157b37 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ group: deprecated-2017Q2 +sudo: required + language: python python: - 3.6 @@ -32,9 +34,10 @@ before_install: - docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb - docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed - docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh -- echo 127.0.0.1 rethinkdb >> /etc/hosts -- echo 127.0.0.1 hadoop >> /etc/hosts -- echo 127.0.0.1 trough >> /etc/hosts +- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts +- echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts +- echo 127.0.0.1 trough | sudo tee -a /etc/hosts +- cat /etc/hosts - ping -c2 trough install: From dfecfc2e45da81980b837709d8630554a0d3a292 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 19 Oct 2017 11:10:58 -0700 Subject: [PATCH 20/35] it finally works! another travis tweak though --- .travis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index e157b37..20b15f2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,3 @@ -group: deprecated-2017Q2 - sudo: required language: python @@ -28,12 +26,13 @@ services: - docker before_install: -- cat /etc/hosts - sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 - docker network create --driver=bridge trough - docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb - docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed - docker run --detach --network=trough --hostname=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3 bash /run-trough.sh +- cat /etc/hosts +- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄 - echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts - echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts - echo 127.0.0.1 trough | sudo tee -a /etc/hosts From ab99fe52b9c1865cb871ef9d04fd33fefde147d0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 3 Nov 2017 12:39:26 -0700 Subject: [PATCH 21/35] update trough dedup to use new segment manager api to register schema sql --- warcprox/dedup.py | 100 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 31 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 6c76ab0..4206f26 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -246,36 +246,35 @@ class CdxServerDedup(object): """ pass -class TroughDedupDb(object): - ''' - https://github.com/jkafader/trough - ''' - logger = logging.getLogger("warcprox.dedup.TroughDedupDb") +class TroughClient(object): + logger = logging.getLogger("warcprox.dedup.TroughClient") - def __init__(self, options=warcprox.Options()): - parsed = doublethink.parse_rethinkdb_url( - options.rethinkdb_trough_db_url) + def __init__(self, rethinkdb_trough_db_url): + parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) - self.options = options - def start(self): - pass - - def stop(self): - pass - - def _write_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket + def segment_manager_url(self): + # XXX cache until expired (check last_heartbeat and ttl) master_node = self.svcreg.unique_service('trough-sync-master') - response = requests.post(master_node['url'], segment_id) - response.raise_for_status() - write_url = response.text.strip() - return write_url + assert master_node + return master_node['url'] - def _read_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket + def write_url(self, segment_id, schema_id='default'): + provision_url = os.path.join(self.segment_manager_url(), 'provision') + payload_dict = {'segment': segment_id, 'schema': schema_id} + response = requests.post(provision_url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, provision_url, + json.dumps(payload_dict))) + result_dict = response.json() + # assert result_dict['schema'] == schema_id # previously provisioned? + return result_dict['write_url'] + + def read_url(self, segment_id): reql = self.rr.table('services').get_all( segment_id, index='segment').filter( {'role':'trough-read'}).filter( @@ -289,6 +288,52 @@ class TroughDedupDb(object): else: return None + def schema_exists(self, schema_id): + url = os.path.join(self.segment_manager_url(), 'schema', schema_id) + response = requests.get(url) + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + response.raise_for_status() + + def register_schema(self, schema_id, sql): + url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + response = requests.put(url, sql) + if response.status_code not in (201, 204): + raise Exception( + 'Received %s: %r in response to PUT %r with data %r' % ( + response.status_code, response.text, sql, url)) + +class TroughDedupDb(object): + ''' + https://github.com/internetarchive/trough + ''' + logger = logging.getLogger("warcprox.dedup.TroughDedupDb") + + SCHEMA_ID = 'warcprox-dedup-v1' + SCHEMA_SQL = ('create table dedup (\n' + ' digest_key varchar(100) primary key,\n' + ' url varchar(2100) not null,\n' + ' date datetime not null,\n' + ' id varchar(100));\n') # warc record id + + def __init__(self, options=warcprox.Options()): + self.options = options + self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) + + def start(self): + self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) + + def _write_url(self, bucket): + segment_id = 'warcprox-trough-%s' % bucket + return self._trough_cli.write_url(segment_id, self.SCHEMA_ID) + + def _read_url(self, bucket): + segment_id = 'warcprox-trough-%s' % bucket + return self._trough_cli.read_url(segment_id) + def sql_value(self, x): if x is None: return 'null' @@ -317,14 +362,7 @@ class TroughDedupDb(object): url = response_record.get_header(warctools.WarcRecord.URL) warc_date = response_record.get_header(warctools.WarcRecord.DATE) - # XXX create table statement here is a temporary hack, - # see https://webarchive.jira.com/browse/AITFIVE-1465 - sql = ('create table if not exists dedup (\n' - ' digest_key varchar(100) primary key,\n' - ' url varchar(2100) not null,\n' - ' date datetime not null,\n' - ' id varchar(100));\n' # warc record id - 'insert into dedup (digest_key, url, date, id) ' + sql = ('insert into dedup (digest_key, url, date, id) ' 'values (%s, %s, %s, %s);') % ( self.sql_value(digest_key), self.sql_value(url), self.sql_value(warc_date), self.sql_value(record_id)) From 147b097a53e7b3079f4862bfa3d6d28f977f5dd5 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 3 Nov 2017 13:48:00 -0700 Subject: [PATCH 22/35] cache trough read and write urls --- tests/test_warcprox.py | 5 ++++- warcprox/dedup.py | 23 +++++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 95c8aba..abc373a 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -84,7 +84,8 @@ def _send(self, data): # http_client.HTTPConnection.send = _send logging.basicConfig( - stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, + # stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, + stream=sys.stdout, level=warcprox.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -424,6 +425,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -497,6 +499,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 4206f26..78a8214 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -322,17 +322,30 @@ class TroughDedupDb(object): def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) + self._write_url_cache = {} + self._read_url_cache = {} def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) def _write_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket - return self._trough_cli.write_url(segment_id, self.SCHEMA_ID) + if not bucket in self._write_url_cache: + segment_id = 'warcprox-trough-%s' % bucket + self._write_url_cache[bucket] = self._trough_cli.write_url( + segment_id, self.SCHEMA_ID) + logging.info( + 'bucket %r write url is %r', bucket, + self._write_url_cache[bucket]) + return self._write_url_cache[bucket] def _read_url(self, bucket): - segment_id = 'warcprox-trough-%s' % bucket - return self._trough_cli.read_url(segment_id) + if not self._read_url_cache.get(bucket): + segment_id = 'warcprox-trough-%s' % bucket + self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id) + logging.info( + 'bucket %r read url is %r', bucket, + self._read_url_cache[bucket]) + return self._read_url_cache[bucket] def sql_value(self, x): if x is None: @@ -371,6 +384,8 @@ class TroughDedupDb(object): logging.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) + else: + logging.trace('posted %r to %s', sql, write_url) def lookup(self, digest_key, bucket='__unspecified__', url=None): read_url = self._read_url(bucket) From 3dbfc06e6870de8ff3fc9f1146d6c7edab49e650 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 3 Nov 2017 14:16:09 -0700 Subject: [PATCH 23/35] on error from trough read or write url, delete read/write url from cache, so next request will retrieve a fresh, hopefully working, url (n.b. not covered by automated tests at this point) --- warcprox/dedup.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78a8214..8388344 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -256,7 +256,6 @@ class TroughClient(object): self.svcreg = doublethink.ServiceRegistry(self.rr) def segment_manager_url(self): - # XXX cache until expired (check last_heartbeat and ttl) master_node = self.svcreg.unique_service('trough-sync-master') assert master_node return master_node['url'] @@ -379,8 +378,16 @@ class TroughDedupDb(object): 'values (%s, %s, %s, %s);') % ( self.sql_value(digest_key), self.sql_value(url), self.sql_value(warc_date), self.sql_value(record_id)) - response = requests.post(write_url, sql) + try: + response = requests.post(write_url, sql) + except: + logging.error( + 'problem with trough write url %r', write_url, + exc_info=True) + del self._write_url_cache[bucket] + return if response.status_code != 200: + del self._write_url_cache[bucket] logging.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) @@ -393,8 +400,15 @@ class TroughDedupDb(object): return None sql = 'select * from dedup where digest_key=%s;' % ( self.sql_value(digest_key)) - response = requests.post(read_url, sql) + try: + response = requests.post(read_url, sql) + except: + logging.error( + 'problem with trough read url %r', read_url, exc_info=True) + del self._read_url_cache[bucket] + return None if response.status_code != 200: + del self._read_url_cache[bucket] logging.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) From ba7497525ae2824920f453a98b20888feafe5e36 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 3 Nov 2017 14:21:39 -0700 Subject: [PATCH 24/35] update travis-ci trough deployment --- tests/run-trough.sh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/run-trough.sh b/tests/run-trough.sh index d5fbcb7..c2319a0 100644 --- a/tests/run-trough.sh +++ b/tests/run-trough.sh @@ -4,7 +4,7 @@ # pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string -pip install git+https://github.com/nlevitt/trough.git@9ae5f477a28f22 +pip install git+https://github.com/internetarchive/trough.git@toward-warcprox-dedup mkdir /etc/trough @@ -26,8 +26,9 @@ rr.db('trough_configuration').wait().run()" sync.py --server >>/tmp/trough-sync-server.out 2>&1 & uwsgi --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/writer.py >>/tmp/trough-write.out 2>&1 & -uwsgi --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1 & -uwsgi --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1 & +uwsgi --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:local >>/tmp/trough-segment-manager-local.out 2>&1 & +uwsgi --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:server >>/tmp/trough-segment-manager-server.out 2>&1 & uwsgi --http :6444 --master --processes=2 --harakiri=3200 --socket-timeout=3200 --max-requests=50000 --vacuum --die-on-term --wsgi-file /usr/local/bin/reader.py >>/tmp/trough-read.out 2>&1 & wait + From db39c4c10a49445a8db609ba0b92c316c36cbea5 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 8 Nov 2017 13:26:59 -0800 Subject: [PATCH 25/35] we depend on the requests library now in the main code, for trough dedup :-\ --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0e6b5f0..dae6b94 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ deps = [ 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev87', 'urllib3', + 'requests>=2.0.1', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu ] @@ -60,7 +61,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['mock', 'pytest', 'warcio'], cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ From cdd747f48e1e26b9426bce2c37e032fcc002ac55 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Nov 2017 13:37:09 -0800 Subject: [PATCH 26/35] eh, don't prefix sqlite filenames with 'warcprox-trough-'; logging tweaks --- warcprox/dedup.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78d35db..cb2cc2d 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -280,7 +280,7 @@ class TroughClient(object): lambda svc: r.now().sub( svc['last_heartbeat']).lt(svc['ttl']) ).order_by('load') - logging.debug('querying rethinkdb: %r', reql) + self.logger.debug('querying rethinkdb: %r', reql) results = reql.run() if results: return results[0]['url'] @@ -329,20 +329,18 @@ class TroughDedupDb(object): def _write_url(self, bucket): if not bucket in self._write_url_cache: - segment_id = 'warcprox-trough-%s' % bucket self._write_url_cache[bucket] = self._trough_cli.write_url( - segment_id, self.SCHEMA_ID) - logging.info( - 'bucket %r write url is %r', bucket, + bucket, self.SCHEMA_ID) + self.logger.info( + 'trough dedup bucket %r write url is %r', bucket, self._write_url_cache[bucket]) return self._write_url_cache[bucket] def _read_url(self, bucket): if not self._read_url_cache.get(bucket): - segment_id = 'warcprox-trough-%s' % bucket - self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id) - logging.info( - 'bucket %r read url is %r', bucket, + self._read_url_cache[bucket] = self._trough_cli.read_url(bucket) + self.logger.info( + 'trough dedup bucket %r read url is %r', bucket, self._read_url_cache[bucket]) return self._read_url_cache[bucket] @@ -381,18 +379,18 @@ class TroughDedupDb(object): try: response = requests.post(write_url, sql) except: - logging.error( + self.logger.error( 'problem with trough write url %r', write_url, exc_info=True) del self._write_url_cache[bucket] return if response.status_code != 200: del self._write_url_cache[bucket] - logging.warn( + self.logger.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) else: - logging.trace('posted %r to %s', sql, write_url) + self.logger.debug('posted %r to %s', sql, write_url) def lookup(self, digest_key, bucket='__unspecified__', url=None): read_url = self._read_url(bucket) @@ -403,17 +401,17 @@ class TroughDedupDb(object): try: response = requests.post(read_url, sql) except: - logging.error( + self.logger.error( 'problem with trough read url %r', read_url, exc_info=True) del self._read_url_cache[bucket] return None if response.status_code != 200: del self._read_url_cache[bucket] - logging.warn( + self.logger.warn( 'unexpected response %r %r %r to sql=%r', response.status_code, response.reason, response.text, sql) return None - logging.debug('got %r from query %r', response.text, sql) + self.logger.trace('got %r from query %r', response.text, sql) results = json.loads(response.text) assert len(results) <= 1 # sanity check (digest_key is primary key) if results: From 43c36cae102387ecbe4b96f18efdae9590b3cb38 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:27:31 -0800 Subject: [PATCH 27/35] update payload_digest reference in trough dedup for changes in commit 3a0f6e0947 --- warcprox/dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 842c9d3..9cb0bc5 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -428,7 +428,7 @@ class TroughDedupDb(object): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( - recorded_url.response_recorder.payload_digest, + recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: self.save( From 895683e062d487813536e9bc0d4804b6cabaf539 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:45:49 -0800 Subject: [PATCH 28/35] more cleanly separate trough client code from the rest of TroughDedup --- warcprox/dedup.py | 188 +++++++++++++++++++++++++--------------------- 1 file changed, 103 insertions(+), 85 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 9cb0bc5..5c56752 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -253,13 +253,42 @@ class TroughClient(object): self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) + self._write_url_cache = {} + self._read_url_cache = {} + + @staticmethod + def sql_value(x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + elif isinstance(x, (int, float)): + return x + else: + raise Exception( + "don't know how to make an sql value from %r (%r)" % ( + x, type(x))) def segment_manager_url(self): master_node = self.svcreg.unique_service('trough-sync-master') assert master_node return master_node['url'] - def write_url(self, segment_id, schema_id='default'): + def write_url_nocache(self, segment_id, schema_id='default'): provision_url = os.path.join(self.segment_manager_url(), 'provision') payload_dict = {'segment': segment_id, 'schema': schema_id} response = requests.post(provision_url, json=payload_dict) @@ -272,7 +301,7 @@ class TroughClient(object): # assert result_dict['schema'] == schema_id # previously provisioned? return result_dict['write_url'] - def read_url(self, segment_id): + def read_url_nocache(self, segment_id): reql = self.rr.table('services').get_all( segment_id, index='segment').filter( {'role':'trough-read'}).filter( @@ -286,6 +315,69 @@ class TroughClient(object): else: return None + def write_url(self, segment_id, schema_id='default'): + if not segment_id in self._write_url_cache: + self._write_url_cache[segment_id] = self.write_url_nocache( + segment_id, schema_id) + self.logger.info( + 'segment %r write url is %r', segment_id, + self._write_url_cache[segment_id]) + return self._write_url_cache[segment_id] + + def read_url(self, segment_id): + if not self._read_url_cache.get(segment_id): + self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) + self.logger.info( + 'segment %r read url is %r', segment_id, + self._read_url_cache[segment_id]) + return self._read_url_cache[segment_id] + + def write(self, segment_id, sql_tmpl, values, schema_id='default'): + write_url = self.write_url(segment_id, schema_id) + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + + try: + response = requests.post(write_url, sql) + except: + del self._write_url_cache[segment_id] + self.logger.error( + 'problem with trough write url %r', write_url, + exc_info=True) + return + if response.status_code != 200: + del self._write_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + write_url, sql) + return + self.logger.debug('posted %r to %s', sql, write_url) + + def read(self, segment_id, sql_tmpl, values): + read_url = self.read_url(segment_id) + if not read_url: + return None + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + try: + response = requests.post(read_url, sql) + except: + del self._read_url_cache[segment_id] + self.logger.error( + 'problem with trough read url %r', read_url, exc_info=True) + return None + if response.status_code != 200: + del self._read_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + read_url, sql) + return None + self.logger.trace( + 'got %r from posting query %r to %r', response.text, sql, + read_url) + results = json.loads(response.text) + return results + def schema_exists(self, schema_id): url = os.path.join(self.segment_manager_url(), 'schema', schema_id) response = requests.get(url) @@ -316,104 +408,30 @@ class TroughDedupDb(object): ' url varchar(2100) not null,\n' ' date datetime not null,\n' ' id varchar(100));\n') # warc record id + WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' + 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) - self._write_url_cache = {} - self._read_url_cache = {} def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) - def _write_url(self, bucket): - if not bucket in self._write_url_cache: - self._write_url_cache[bucket] = self._trough_cli.write_url( - bucket, self.SCHEMA_ID) - self.logger.info( - 'trough dedup bucket %r write url is %r', bucket, - self._write_url_cache[bucket]) - return self._write_url_cache[bucket] - - def _read_url(self, bucket): - if not self._read_url_cache.get(bucket): - self._read_url_cache[bucket] = self._trough_cli.read_url(bucket) - self.logger.info( - 'trough dedup bucket %r read url is %r', bucket, - self._read_url_cache[bucket]) - return self._read_url_cache[bucket] - - def sql_value(self, x): - if x is None: - return 'null' - elif isinstance(x, datetime.datetime): - return 'datetime(%r)' % x.isoformat() - elif isinstance(x, bool): - return int(x) - elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r - else: - return r[1:] - else: - raise Exception("don't know how to make an sql value from %r" % x) - def save(self, digest_key, response_record, bucket='__unspecified__'): - write_url = self._write_url(bucket) record_id = response_record.get_header(warctools.WarcRecord.ID) url = response_record.get_header(warctools.WarcRecord.URL) warc_date = response_record.get_header(warctools.WarcRecord.DATE) - - sql = ('insert into dedup (digest_key, url, date, id) ' - 'values (%s, %s, %s, %s);') % ( - self.sql_value(digest_key), self.sql_value(url), - self.sql_value(warc_date), self.sql_value(record_id)) - try: - response = requests.post(write_url, sql) - except: - self.logger.error( - 'problem with trough write url %r', write_url, - exc_info=True) - del self._write_url_cache[bucket] - return - if response.status_code != 200: - del self._write_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - else: - self.logger.debug('posted %r to %s', sql, write_url) + self._trough_cli.write( + bucket, self.WRITE_SQL_TMPL, + (digest_key, url, warc_date, record_id), self.SCHEMA_ID) def lookup(self, digest_key, bucket='__unspecified__', url=None): - read_url = self._read_url(bucket) - if not read_url: - return None - sql = 'select * from dedup where digest_key=%s;' % ( - self.sql_value(digest_key)) - try: - response = requests.post(read_url, sql) - except: - self.logger.error( - 'problem with trough read url %r', read_url, exc_info=True) - del self._read_url_cache[bucket] - return None - if response.status_code != 200: - del self._read_url_cache[bucket] - self.logger.warn( - 'unexpected response %r %r %r to sql=%r', - response.status_code, response.reason, response.text, sql) - return None - self.logger.trace('got %r from query %r', response.text, sql) - results = json.loads(response.text) - assert len(results) <= 1 # sanity check (digest_key is primary key) + results = self._trough_cli.read( + bucket, 'select * from dedup where digest_key=%s;', + (digest_key,)) if results: + assert len(results) == 1 # sanity check (digest_key is primary key) result = results[0] result['id'] = result['id'].encode('ascii') result['url'] = result['url'].encode('ascii') From 46797a5dce7000c7ce7495d1ddbd078c9f5f1378 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:52:29 -0800 Subject: [PATCH 29/35] pypy and pypy3 are passing at the moment, so why not :) --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 565ba13..b8a91e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,8 +13,6 @@ python: matrix: allow_failures: - - python: pypy - - python: pypy3 - python: nightly - python: 3.7-dev From d7aea40b054e45ecbd682ee501bb54e2dbd9e9e1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 12:52:45 -0800 Subject: [PATCH 30/35] move trough client into separate module --- warcprox/dedup.py | 157 +------------------------------------- warcprox/trough.py | 182 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 154 deletions(-) create mode 100644 warcprox/trough.py diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5c56752..2364d41 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -26,10 +26,9 @@ import os import json from hanzo import warctools import warcprox +import warcprox.trough import sqlite3 -import requests import doublethink -import rethinkdb as r import datetime import urllib3 from urllib3.exceptions import HTTPError @@ -245,157 +244,6 @@ class CdxServerDedup(object): """ pass -class TroughClient(object): - logger = logging.getLogger("warcprox.dedup.TroughClient") - - def __init__(self, rethinkdb_trough_db_url): - parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) - self.rr = doublethink.Rethinker( - servers=parsed.hosts, db=parsed.database) - self.svcreg = doublethink.ServiceRegistry(self.rr) - self._write_url_cache = {} - self._read_url_cache = {} - - @staticmethod - def sql_value(x): - if x is None: - return 'null' - elif isinstance(x, datetime.datetime): - return 'datetime(%r)' % x.isoformat() - elif isinstance(x, bool): - return int(x) - elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r - else: - return r[1:] - elif isinstance(x, (int, float)): - return x - else: - raise Exception( - "don't know how to make an sql value from %r (%r)" % ( - x, type(x))) - - def segment_manager_url(self): - master_node = self.svcreg.unique_service('trough-sync-master') - assert master_node - return master_node['url'] - - def write_url_nocache(self, segment_id, schema_id='default'): - provision_url = os.path.join(self.segment_manager_url(), 'provision') - payload_dict = {'segment': segment_id, 'schema': schema_id} - response = requests.post(provision_url, json=payload_dict) - if response.status_code != 200: - raise Exception( - 'Received %s: %r in response to POST %s with data %s' % ( - response.status_code, response.text, provision_url, - json.dumps(payload_dict))) - result_dict = response.json() - # assert result_dict['schema'] == schema_id # previously provisioned? - return result_dict['write_url'] - - def read_url_nocache(self, segment_id): - reql = self.rr.table('services').get_all( - segment_id, index='segment').filter( - {'role':'trough-read'}).filter( - lambda svc: r.now().sub( - svc['last_heartbeat']).lt(svc['ttl']) - ).order_by('load') - self.logger.debug('querying rethinkdb: %r', reql) - results = reql.run() - if results: - return results[0]['url'] - else: - return None - - def write_url(self, segment_id, schema_id='default'): - if not segment_id in self._write_url_cache: - self._write_url_cache[segment_id] = self.write_url_nocache( - segment_id, schema_id) - self.logger.info( - 'segment %r write url is %r', segment_id, - self._write_url_cache[segment_id]) - return self._write_url_cache[segment_id] - - def read_url(self, segment_id): - if not self._read_url_cache.get(segment_id): - self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) - self.logger.info( - 'segment %r read url is %r', segment_id, - self._read_url_cache[segment_id]) - return self._read_url_cache[segment_id] - - def write(self, segment_id, sql_tmpl, values, schema_id='default'): - write_url = self.write_url(segment_id, schema_id) - sql = sql_tmpl % tuple(self.sql_value(v) for v in values) - - try: - response = requests.post(write_url, sql) - except: - del self._write_url_cache[segment_id] - self.logger.error( - 'problem with trough write url %r', write_url, - exc_info=True) - return - if response.status_code != 200: - del self._write_url_cache[segment_id] - self.logger.warn( - 'unexpected response %r %r %r from %r to sql=%r', - response.status_code, response.reason, response.text, - write_url, sql) - return - self.logger.debug('posted %r to %s', sql, write_url) - - def read(self, segment_id, sql_tmpl, values): - read_url = self.read_url(segment_id) - if not read_url: - return None - sql = sql_tmpl % tuple(self.sql_value(v) for v in values) - try: - response = requests.post(read_url, sql) - except: - del self._read_url_cache[segment_id] - self.logger.error( - 'problem with trough read url %r', read_url, exc_info=True) - return None - if response.status_code != 200: - del self._read_url_cache[segment_id] - self.logger.warn( - 'unexpected response %r %r %r from %r to sql=%r', - response.status_code, response.reason, response.text, - read_url, sql) - return None - self.logger.trace( - 'got %r from posting query %r to %r', response.text, sql, - read_url) - results = json.loads(response.text) - return results - - def schema_exists(self, schema_id): - url = os.path.join(self.segment_manager_url(), 'schema', schema_id) - response = requests.get(url) - if response.status_code == 200: - return True - elif response.status_code == 404: - return False - else: - response.raise_for_status() - - def register_schema(self, schema_id, sql): - url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) - response = requests.put(url, sql) - if response.status_code not in (201, 204): - raise Exception( - 'Received %s: %r in response to PUT %r with data %r' % ( - response.status_code, response.text, sql, url)) - class TroughDedupDb(object): ''' https://github.com/internetarchive/trough @@ -413,7 +261,8 @@ class TroughDedupDb(object): def __init__(self, options=warcprox.Options()): self.options = options - self._trough_cli = TroughClient(options.rethinkdb_trough_db_url) + self._trough_cli = warcprox.trough.TroughClient( + options.rethinkdb_trough_db_url) def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) diff --git a/warcprox/trough.py b/warcprox/trough.py new file mode 100644 index 0000000..ec3a032 --- /dev/null +++ b/warcprox/trough.py @@ -0,0 +1,182 @@ +''' +warcprox/trough.py - trough client code + +Copyright (C) 2013-2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' + +from __future__ import absolute_import + +import logging +import os +import json +import requests +import doublethink +import rethinkdb as r +import datetime + +class TroughClient(object): + logger = logging.getLogger("warcprox.trough.TroughClient") + + def __init__(self, rethinkdb_trough_db_url): + parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) + self.rr = doublethink.Rethinker( + servers=parsed.hosts, db=parsed.database) + self.svcreg = doublethink.ServiceRegistry(self.rr) + self._write_url_cache = {} + self._read_url_cache = {} + + @staticmethod + def sql_value(x): + if x is None: + return 'null' + elif isinstance(x, datetime.datetime): + return 'datetime(%r)' % x.isoformat() + elif isinstance(x, bool): + return int(x) + elif isinstance(x, str) or isinstance(x, bytes): + # py3: repr(u'abc') => 'abc' + # repr(b'abc') => b'abc' + # py2: repr(u'abc') => u'abc' + # repr(b'abc') => 'abc' + # Repr gives us a prefix we don't want in different situations + # depending on whether this is py2 or py3. Chop it off either way. + r = repr(x) + if r[:1] == "'": + return r + else: + return r[1:] + elif isinstance(x, (int, float)): + return x + else: + raise Exception( + "don't know how to make an sql value from %r (%r)" % ( + x, type(x))) + + def segment_manager_url(self): + master_node = self.svcreg.unique_service('trough-sync-master') + assert master_node + return master_node['url'] + + def write_url_nocache(self, segment_id, schema_id='default'): + provision_url = os.path.join(self.segment_manager_url(), 'provision') + payload_dict = {'segment': segment_id, 'schema': schema_id} + response = requests.post(provision_url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, provision_url, + json.dumps(payload_dict))) + result_dict = response.json() + # assert result_dict['schema'] == schema_id # previously provisioned? + return result_dict['write_url'] + + def read_url_nocache(self, segment_id): + reql = self.rr.table('services').get_all( + segment_id, index='segment').filter( + {'role':'trough-read'}).filter( + lambda svc: r.now().sub( + svc['last_heartbeat']).lt(svc['ttl']) + ).order_by('load') + self.logger.debug('querying rethinkdb: %r', reql) + results = reql.run() + if results: + return results[0]['url'] + else: + return None + + def write_url(self, segment_id, schema_id='default'): + if not segment_id in self._write_url_cache: + self._write_url_cache[segment_id] = self.write_url_nocache( + segment_id, schema_id) + self.logger.info( + 'segment %r write url is %r', segment_id, + self._write_url_cache[segment_id]) + return self._write_url_cache[segment_id] + + def read_url(self, segment_id): + if not self._read_url_cache.get(segment_id): + self._read_url_cache[segment_id] = self.read_url_nocache(segment_id) + self.logger.info( + 'segment %r read url is %r', segment_id, + self._read_url_cache[segment_id]) + return self._read_url_cache[segment_id] + + def write(self, segment_id, sql_tmpl, values, schema_id='default'): + write_url = self.write_url(segment_id, schema_id) + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + + try: + response = requests.post(write_url, sql) + except: + del self._write_url_cache[segment_id] + self.logger.error( + 'problem with trough write url %r', write_url, + exc_info=True) + return + if response.status_code != 200: + del self._write_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + write_url, sql) + return + self.logger.debug('posted %r to %s', sql, write_url) + + def read(self, segment_id, sql_tmpl, values): + read_url = self.read_url(segment_id) + if not read_url: + return None + sql = sql_tmpl % tuple(self.sql_value(v) for v in values) + try: + response = requests.post(read_url, sql) + except: + del self._read_url_cache[segment_id] + self.logger.error( + 'problem with trough read url %r', read_url, exc_info=True) + return None + if response.status_code != 200: + del self._read_url_cache[segment_id] + self.logger.warn( + 'unexpected response %r %r %r from %r to sql=%r', + response.status_code, response.reason, response.text, + read_url, sql) + return None + self.logger.trace( + 'got %r from posting query %r to %r', response.text, sql, + read_url) + results = json.loads(response.text) + return results + + def schema_exists(self, schema_id): + url = os.path.join(self.segment_manager_url(), 'schema', schema_id) + response = requests.get(url) + if response.status_code == 200: + return True + elif response.status_code == 404: + return False + else: + response.raise_for_status() + + def register_schema(self, schema_id, sql): + url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + response = requests.put(url, sql) + if response.status_code not in (201, 204): + raise Exception( + 'Received %s: %r in response to PUT %r with data %r' % ( + response.status_code, response.text, sql, url)) + From f5351a43dfd18ea6e6296f726c0b2673a7bc5a54 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 14:22:17 -0800 Subject: [PATCH 31/35] automatic segment promotion every hour --- tests/test_warcprox.py | 18 ++++++++++++ warcprox/dedup.py | 2 +- warcprox/trough.py | 66 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 5587d8f..e8c140b 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1721,6 +1721,24 @@ def test_payload_digest(warcprox_, http_daemon): req, prox_rec_res = mitm.do_GET() assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1 +def test_trough_segment_promotion(warcprox_): + if not warcprox_.options.rethinkdb_trough_db_url: + return + cli = warcprox.trough.TroughClient( + warcprox_.options.rethinkdb_trough_db_url, 3) + promoted = [] + def mock(segment_id): + promoted.append(segment_id) + cli.promote = mock + cli.register_schema('default', 'create table foo (bar varchar(100))') + cli.write('my_seg', 'insert into foo (bar) values ("boof")') + assert promoted == [] + time.sleep(3) + assert promoted == ['my_seg'] + promoted = [] + time.sleep(3) + assert promoted == [] + if __name__ == '__main__': pytest.main() diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 2364d41..d1e456d 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -262,7 +262,7 @@ class TroughDedupDb(object): def __init__(self, options=warcprox.Options()): self.options = options self._trough_cli = warcprox.trough.TroughClient( - options.rethinkdb_trough_db_url) + options.rethinkdb_trough_db_url, promotion_interval=60*60) def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) diff --git a/warcprox/trough.py b/warcprox/trough.py index ec3a032..6cbe1dd 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -1,7 +1,7 @@ ''' warcprox/trough.py - trough client code -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -28,17 +28,69 @@ import requests import doublethink import rethinkdb as r import datetime +import threading +import time class TroughClient(object): logger = logging.getLogger("warcprox.trough.TroughClient") - def __init__(self, rethinkdb_trough_db_url): + def __init__(self, rethinkdb_trough_db_url, promotion_interval=None): + ''' + TroughClient constructor + + Args: + rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to + trough configuration database + promotion_interval: if specified, `TroughClient` will spawn a + thread that "promotes" (pushed to hdfs) "dirty" trough segments + (segments that have received writes) periodically, sleeping for + `promotion_interval` seconds between cycles (default None) + ''' parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) self.svcreg = doublethink.ServiceRegistry(self.rr) self._write_url_cache = {} self._read_url_cache = {} + self._dirty_segments = set() + self._dirty_segments_lock = threading.RLock() + + self.promotion_interval = promotion_interval + self._promoter_thread = None + if promotion_interval: + self._promoter_thread = threading.Thread( + target=self._promotrix, name='TroughClient-promoter', + daemon=True) + self._promoter_thread.start() + + def _promotrix(self): + while True: + time.sleep(self.promotion_interval) + try: + with self._dirty_segments_lock: + dirty_segments = list(self._dirty_segments) + self._dirty_segments.clear() + logging.info('promoting %s trough segments') + for segment in dirty_segments: + try: + self.promote(segment) + except: + logging.error( + 'problem promoting segment %s', exc_info=True) + except: + logging.error( + 'caught exception doing segment promotion', + exc_info=True) + + def promote(self, segment_id): + url = os.path.join(self.segment_manager_url(), 'promote') + payload_dict = {'segment': segment_id} + response = requests.post(url, json=payload_dict) + if response.status_code != 200: + raise Exception( + 'Received %s: %r in response to POST %s with data %s' % ( + response.status_code, response.text, url, + json.dumps(payload_dict))) @staticmethod def sql_value(x): @@ -116,12 +168,15 @@ class TroughClient(object): self._read_url_cache[segment_id]) return self._read_url_cache[segment_id] - def write(self, segment_id, sql_tmpl, values, schema_id='default'): + def write(self, segment_id, sql_tmpl, values=(), schema_id='default'): write_url = self.write_url(segment_id, schema_id) sql = sql_tmpl % tuple(self.sql_value(v) for v in values) try: response = requests.post(write_url, sql) + if segment_id not in self._dirty_segments: + with self._dirty_segments_lock: + self._dirty_segments.add(segment_id) except: del self._write_url_cache[segment_id] self.logger.error( @@ -137,7 +192,7 @@ class TroughClient(object): return self.logger.debug('posted %r to %s', sql, write_url) - def read(self, segment_id, sql_tmpl, values): + def read(self, segment_id, sql_tmpl, values=()): read_url = self.read_url(segment_id) if not read_url: return None @@ -173,7 +228,8 @@ class TroughClient(object): response.raise_for_status() def register_schema(self, schema_id, sql): - url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id) + url = os.path.join( + self.segment_manager_url(), 'schema', schema_id, 'sql') response = requests.put(url, sql) if response.status_code not in (201, 204): raise Exception( From ef590a2fec468a2466f8471f18d200cfc6e4b7aa Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 13 Nov 2017 15:07:47 -0800 Subject: [PATCH 32/35] py2 fix --- .travis.yml | 2 +- warcprox/trough.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index b8a91e5..d712f84 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ before_install: - ping -c2 trough install: -- pip install . pytest requests warcio +- pip install . pytest requests warcio mock before_script: - ps ww -fHe diff --git a/warcprox/trough.py b/warcprox/trough.py index 6cbe1dd..6638b24 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -59,8 +59,8 @@ class TroughClient(object): self._promoter_thread = None if promotion_interval: self._promoter_thread = threading.Thread( - target=self._promotrix, name='TroughClient-promoter', - daemon=True) + target=self._promotrix, name='TroughClient-promoter') + self._promoter_thread.setDaemon(True) self._promoter_thread.start() def _promotrix(self): From 61a7c234e81bbb0763b2a79f50d4b9f6d0c1c090 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 28 Nov 2017 10:38:38 -0800 Subject: [PATCH 33/35] fix warcprox-ensure-rethinkdb-tables and add tests --- tests/test_ensure_rethinkdb_tables.py | 109 ++++++++++++++++++++++++++ warcprox/main.py | 80 +++++++++++++------ 2 files changed, 165 insertions(+), 24 deletions(-) create mode 100644 tests/test_ensure_rethinkdb_tables.py diff --git a/tests/test_ensure_rethinkdb_tables.py b/tests/test_ensure_rethinkdb_tables.py new file mode 100644 index 0000000..030cddb --- /dev/null +++ b/tests/test_ensure_rethinkdb_tables.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8: +''' +tests/test_ensure_rethinkdb_tables.py - automated tests of +ensure-rethinkdb-tables utility + +Copyright (C) 2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' + +import warcprox.main +import pytest +import socket +import doublethink +import logging +import sys + +logging.basicConfig( + stream=sys.stdout, level=warcprox.TRACE, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +def rethinkdb_is_running(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.connect(('127.0.0.1', 28015)) + return True + except: + return False + +if_rethinkdb = pytest.mark.skipif( + not rethinkdb_is_running(), + reason='rethinkdb not listening at 127.0.0.1:28015') + +@if_rethinkdb +def test_individual_options(): + rr = doublethink.Rethinker(['127.0.0.1']) + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-stats-url=rethinkdb://127.0.0.1/db0/stats']) + assert rr.db('db0').table_list().run() == ['stats'] + finally: + rr.db_drop('db0').run() + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-services-url=rethinkdb://127.0.0.1/db1/services']) + assert rr.db('db1').table_list().run() == ['services'] + finally: + rr.db_drop('db1').run() + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-dedup-url=rethinkdb://127.0.0.1/db2/dedup']) + assert rr.db('db2').table_list().run() == ['dedup'] + finally: + rr.db_drop('db2').run() + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-big-table-url=rethinkdb://127.0.0.1/db3/captures']) + assert rr.db('db3').table_list().run() == ['captures'] + finally: + rr.db_drop('db3').run() + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-trough-db-url=rethinkdb://127.0.0.1/db4']) + assert rr.db('db4').table_list().run() == ['services'] + # ['assignment', 'lock', 'schema', 'services'] + finally: + rr.db_drop('db4').run() + +@if_rethinkdb +def test_combos(): + rr = doublethink.Rethinker(['127.0.0.1']) + + try: + warcprox.main.ensure_rethinkdb_tables([ + 'warcprox-ensure-rethinkdb-tables', + '--rethinkdb-stats-url=rethinkdb://127.0.0.1/db00/stats', + '--rethinkdb-trough-db-url=rethinkdb://127.0.0.1/db01', + ]) + assert rr.db('db00').table_list().run() == ['stats'] + assert rr.db('db01').table_list().run() == ['services'] + # ['assignment', 'lock', 'schema', 'services'] + finally: + rr.db_drop('db00').run() + rr.db_drop('db01').run() diff --git a/warcprox/main.py b/warcprox/main.py index a2fca9c..06cd176 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -43,7 +43,6 @@ import warcprox import doublethink import cryptography.hazmat.backends.openssl import importlib -import doublethink class BetterArgumentDefaultsHelpFormatter( argparse.ArgumentDefaultsHelpFormatter, @@ -62,7 +61,7 @@ class BetterArgumentDefaultsHelpFormatter( else: return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) -def _build_arg_parser(prog=os.path.basename(sys.argv[0])): +def _build_arg_parser(prog): arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=BetterArgumentDefaultsHelpFormatter) @@ -294,7 +293,7 @@ def init_controller(args): return controller -def parse_args(argv=sys.argv): +def parse_args(argv): ''' Parses command line arguments with argparse. ''' @@ -302,11 +301,11 @@ def parse_args(argv=sys.argv): args = arg_parser.parse_args(args=argv[1:]) return args -def main(argv=sys.argv): +def main(argv=None): ''' Main method, entry point of warcprox command. ''' - args = parse_args(argv) + args = parse_args(argv or sys.argv) if args.trace: loglevel = warcprox.TRACE @@ -337,7 +336,7 @@ def main(argv=sys.argv): controller.run_until_shutdown() -def ensure_rethinkdb_tables(): +def ensure_rethinkdb_tables(argv=None): ''' Creates rethinkdb tables if they don't already exist. Warcprox normally creates the tables it needs on demand at startup, but if multiple instances @@ -345,41 +344,74 @@ def ensure_rethinkdb_tables(): tables. So it's a good idea to use this utility at an early step when spinning up a cluster. ''' - raise Exception('adjust my args') + argv = argv or sys.argv arg_parser = argparse.ArgumentParser( - prog=os.path.basename(sys.argv[0]), + prog=os.path.basename(argv[0]), formatter_class=BetterArgumentDefaultsHelpFormatter) arg_parser.add_argument( - '--rethinkdb-servers', dest='rethinkdb_servers', default='localhost', - help='rethinkdb servers e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + '--rethinkdb-stats-url', dest='rethinkdb_stats_url', help=( + 'rethinkdb stats table url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_stats_table')) + group = arg_parser.add_mutually_exclusive_group() + group.add_argument( + '--rethinkdb-dedup-url', dest='rethinkdb_dedup_url', help=( + 'rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/my_dedup_table')) + group.add_argument( + '--rethinkdb-big-table-url', dest='rethinkdb_big_table_url', help=( + 'rethinkdb big table url (table will be populated with ' + 'various capture information and is suitable for use as ' + 'index for playback), e.g. rethinkdb://db0.foo.org,' + 'db1.foo.org:38015/my_warcprox_db/captures')) + group.add_argument( + '--rethinkdb-trough-db-url', dest='rethinkdb_trough_db_url', help=( + '🐷   url pointing to trough configuration rethinkdb database, ' + 'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015' + '/trough_configuration')) arg_parser.add_argument( - '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', - help='rethinkdb database name') + '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( + 'rethinkdb service registry table url; if provided, warcprox ' + 'will create and heartbeat entry for itself')) arg_parser.add_argument( '-q', '--quiet', dest='log_level', action='store_const', default=logging.INFO, const=logging.WARN) arg_parser.add_argument( '-v', '--verbose', dest='log_level', action='store_const', default=logging.INFO, const=logging.DEBUG) - args = arg_parser.parse_args(args=sys.argv[1:]) + args = arg_parser.parse_args(args=argv[1:]) logging.basicConfig( - stream=sys.stdout, level=args.log_level, - format=( + stream=sys.stdout, level=args.log_level, format=( '%(asctime)s %(levelname)s %(name)s.%(funcName)s' '(%(filename)s:%(lineno)d) %(message)s')) - rr = doublethink.Rethinker( - args.rethinkdb_servers.split(','), args.rethinkdb_db) + options = warcprox.Options(**vars(args)) - # services table - doublethink.ServiceRegistry(rr) + did_something = False + if args.rethinkdb_services_url: + parsed = doublethink.parse_rethinkdb_url( + options.rethinkdb_services_url) + rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database) + svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) + did_something = True + if args.rethinkdb_stats_url: + stats_db = warcprox.stats.RethinkStatsDb(options=options) + did_something = True + if args.rethinkdb_dedup_url: + dedup_db = warcprox.dedup.RethinkDedupDb(options=options) + did_something = True + if args.rethinkdb_big_table_url: + dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options) + did_something = True + if args.rethinkdb_trough_db_url: + dedup_db = warcprox.dedup.TroughDedupDb(options) + logging.warn( + 'trough it responsible for creating most of the rethinkdb ' + 'tables that it uses') + did_something = True - # stats table - warcprox.stats.RethinkStatsDb(rr) - - # captures table - warcprox.bigtable.RethinkCaptures(rr) + if not did_something: + logging.error('nothing to do, no --rethinkdb-* options supplied') if __name__ == '__main__': main() From c5f33bda7abd38b20101deaee079d9ece509b37a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 30 Nov 2017 12:55:39 -0800 Subject: [PATCH 34/35] trough dedup - handle case of no warc records written --- warcprox/dedup.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d1e456d..f21e1df 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -292,11 +292,10 @@ class TroughDedupDb(object): return None def notify(self, recorded_url, records): - if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( - recorded_url.payload_digest, - self.options.base32) + recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: self.save( digest_key, records[0], From 9d0367b96bc7a20789c957c0a4bf6f37e08aa076 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 30 Nov 2017 16:08:20 -0800 Subject: [PATCH 35/35] fix logging --- warcprox/trough.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warcprox/trough.py b/warcprox/trough.py index 6638b24..1edcc54 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -70,7 +70,8 @@ class TroughClient(object): with self._dirty_segments_lock: dirty_segments = list(self._dirty_segments) self._dirty_segments.clear() - logging.info('promoting %s trough segments') + logging.info( + 'promoting %s trough segments', len(dirty_segments)) for segment in dirty_segments: try: self.promote(segment)