diff --git a/setup.py b/setup.py index 42d6a3e..a7b06b9 100755 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ deps = [ 'warctools', 'kafka-python>=1.0.1', 'surt>=0.3b4', - 'rethinkstuff', + 'doublethink>=0.2.0.dev69', 'PySocks', ] try: @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev52', + version='2.1b1.dev53', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index e1bb957..0ba3c8b 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -36,7 +36,7 @@ import requests import re import json import random -import rethinkstuff +import doublethink from hanzo import warctools import warnings import pprint @@ -245,15 +245,15 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): servers = rethinkdb_servers.split(",") if rethinkdb_big_table: db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - r = rethinkstuff.Rethinker(servers, db) - captures_db = warcprox.bigtable.RethinkCaptures(r) + rr = doublethink.Rethinker(servers, db) + captures_db = warcprox.bigtable.RethinkCaptures(rr) captures_db.start() def fin(): if captures_db: captures_db.close() # logging.info('dropping rethinkdb database {}'.format(db)) - # result = captures_db.r.db_drop(db).run() + # result = captures_db.rr.db_drop(db).run() # logging.info("result=%s", result) request.addfinalizer(fin) @@ -268,15 +268,15 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): else: servers = rethinkdb_servers.split(",") db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - r = rethinkstuff.Rethinker(servers, db) - ddb = warcprox.dedup.RethinkDedupDb(r) + rr = doublethink.Rethinker(servers, db) + ddb = warcprox.dedup.RethinkDedupDb(rr) def fin(): if rethinkdb_servers: ddb.close() if not captures_db: logging.info('dropping rethinkdb database {}'.format(db)) - result = ddb.r.db_drop(db).run() + result = ddb.rr.db_drop(db).run() logging.info("result=%s", result) request.addfinalizer(fin) @@ -305,8 +305,8 @@ def stats_db(request, rethinkdb_servers): if rethinkdb_servers: servers = rethinkdb_servers.split(",") db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - r = rethinkstuff.Rethinker(servers, db) - sdb = warcprox.stats.RethinkStatsDb(r) + rr = doublethink.Rethinker(servers, db) + sdb = warcprox.stats.RethinkStatsDb(rr) sdb.start() else: f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) @@ -318,7 +318,7 @@ def stats_db(request, rethinkdb_servers): sdb.close() if rethinkdb_servers: logging.info('dropping rethinkdb database {}'.format(db)) - result = sdb.r.db_drop(db).run() + result = sdb.rr.db_drop(db).run() logging.info("result=%s", result) else: logging.info('deleting file {}'.format(stats_db_file)) @@ -332,15 +332,15 @@ def service_registry(request, rethinkdb_servers): if rethinkdb_servers: servers = rethinkdb_servers.split(",") db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - r = rethinkstuff.Rethinker(servers, db) + rr = doublethink.Rethinker(servers, db) def fin(): logging.info('dropping rethinkdb database {}'.format(db)) - result = r.db_drop(db).run() + result = rr.db_drop(db).run() logging.info("result=%s", result) request.addfinalizer(fin) - return rethinkstuff.ServiceRegistry(r) + return doublethink.ServiceRegistry(rr) else: return None @@ -1265,7 +1265,7 @@ def test_dedup_ok_flag( # inspect what's in rethinkdb more closely rethink_captures = warcprox_.warc_writer_thread.dedup_db.captures_db - results_iter = rethink_captures.r.table(rethink_captures.table).get_all( + results_iter = rethink_captures.rr.table(rethink_captures.table).get_all( ['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response', 'test_dedup_ok_flag'], index='sha1_warc_type').order_by( 'timestamp').run() diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 67638d4..e32bdf7 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -34,19 +34,20 @@ import os import hashlib import threading import datetime -import rethinkstuff +import doublethink +import rethinkdb as r class RethinkCaptures: """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") def __init__( - self, r, table="captures", shards=None, replicas=None, + self, rr, table="captures", shards=None, replicas=None, options=warcprox.Options()): - self.r = r + self.rr = rr self.table = table - self.shards = shards or len(r.servers) - self.replicas = replicas or min(3, len(r.servers)) + self.shards = shards or len(rr.servers) + self.replicas = replicas or min(3, len(rr.servers)) self.options = options self._ensure_db_table() @@ -64,7 +65,7 @@ class RethinkCaptures: try: with self._batch_lock: if len(self._batch) > 0: - result = self.r.table(self.table).insert( + result = self.rr.table(self.table).insert( self._batch, conflict="replace").run() if (result["inserted"] + result["replaced"] + result["unchanged"] != len(self._batch)): @@ -99,16 +100,22 @@ class RethinkCaptures: self.logger.info("finished") def _ensure_db_table(self): - dbs = self.r.db_list().run() - if not self.r.dbname in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.r.dbname)) - self.r.db_create(self.r.dbname).run() - tables = self.r.table_list().run() + dbs = self.rr.db_list().run() + if not self.rr.dbname in dbs: + self.logger.info( + "creating rethinkdb database %s", repr(self.rr.dbname)) + self.rr.db_create(self.rr.dbname).run() + tables = self.rr.table_list().run() if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.dbname)) - self.r.table_create(self.table, shards=self.shards, replicas=self.replicas).run() - self.r.table(self.table).index_create("abbr_canon_surt_timestamp", [self.r.row["abbr_canon_surt"], self.r.row["timestamp"]]).run() - self.r.table(self.table).index_create("sha1_warc_type", [self.r.row["sha1base32"], self.r.row["warc_type"], self.r.row["bucket"]]).run() + self.logger.info( + "creating rethinkdb table %s in database %s", + repr(self.table), repr(self.rr.dbname)) + self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run() + self.rr.table(self.table).index_create( + "abbr_canon_surt_timestamp", + [r.row["abbr_canon_surt"], r.row["timestamp"]]).run() + self.rr.table(self.table).index_create("sha1_warc_type", [ + r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run() def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": @@ -116,10 +123,10 @@ class RethinkCaptures: "digest type is %s but big captures table is indexed by " "sha1" % algo) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") - results_iter = self.r.table(self.table).get_all( + results_iter = self.rr.table(self.table).get_all( [sha1base32, "response", bucket], index="sha1_warc_type").filter( - self.r.row["dedup_ok"], default=True).run() + r.row["dedup_ok"], default=True).run() results = list(results_iter) if len(results) > 0: if len(results) > 1: @@ -162,7 +169,7 @@ class RethinkCaptures: "abbr_canon_surt": canon_surt[:150], "canon_surt": canon_surt, "timestamp": recorded_url.timestamp.replace( - tzinfo=rethinkstuff.UTC), + tzinfo=doublethink.UTC), "url": recorded_url.url.decode("utf-8"), "offset": records[0].offset, "filename": os.path.basename(records[0].warc_filename), diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c5080d3..3555ef4 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,23 +1,23 @@ -# -# warcprox/dedup.py - identical payload digest deduplication -# -# Copyright (C) 2013-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +''' +warcprox/dedup.py - identical payload digest deduplication + +Copyright (C) 2013-2017 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +''' from __future__ import absolute_import @@ -112,24 +112,29 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") - def __init__(self, r, table="dedup", shards=None, replicas=None, options=warcprox.Options()): - self.r = r + def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()): + self.rr = rr self.table = table - self.shards = shards or len(r.servers) - self.replicas = replicas or min(3, len(r.servers)) + self.shards = shards or len(rr.servers) + self.replicas = replicas or min(3, len(rr.servers)) self._ensure_db_table() self.options = options def _ensure_db_table(self): - dbs = self.r.db_list().run() - if not self.r.dbname in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.r.dbname)) - self.r.db_create(self.r.dbname).run() - tables = self.r.table_list().run() + dbs = self.rr.db_list().run() + if not self.rr.dbname in dbs: + self.logger.info( + "creating rethinkdb database %s", repr(self.rr.dbname)) + self.rr.db_create(self.rr.dbname).run() + tables = self.rr.table_list().run() if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s", - repr(self.table), repr(self.r.dbname), self.shards, self.replicas) - self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run() + self.logger.info( + "creating rethinkdb table %s in database %s shards=%s " + "replicas=%s", repr(self.table), repr(self.rr.dbname), + self.shards, self.replicas) + self.rr.table_create( + self.table, primary_key="key", shards=self.shards, + replicas=self.replicas).run() def start(self): @@ -151,7 +156,8 @@ class RethinkDedupDb: url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') record = {'key':k,'url':url,'date':date,'id':record_id} - result = self.r.table(self.table).insert(record,conflict="replace").run() + result = self.rr.table(self.table).insert( + record, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) @@ -159,7 +165,7 @@ class RethinkDedupDb: def lookup(self, digest_key, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) - result = self.r.table(self.table).get(k).run() + result = self.rr.table(self.table).get(k).run() if result: for x in result: result[x] = result[x].encode("utf-8") @@ -175,3 +181,4 @@ class RethinkDedupDb: self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) else: self.save(digest_key, records[0]) + diff --git a/warcprox/main.py b/warcprox/main.py index 5045298..13cb004 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -40,7 +40,7 @@ import threading import certauth.certauth import warcprox import re -import rethinkstuff +import doublethink import cryptography.hazmat.backends.openssl def _build_arg_parser(prog=os.path.basename(sys.argv[0])): @@ -149,13 +149,15 @@ def init_controller(args): listeners = [] if args.rethinkdb_servers: - r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db) + rr = doublethink.Rethinker( + args.rethinkdb_servers.split(","), args.rethinkdb_db) if args.rethinkdb_big_table: - captures_db = warcprox.bigtable.RethinkCaptures(r, options=options) - dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options) + captures_db = warcprox.bigtable.RethinkCaptures(rr, options=options) + dedup_db = warcprox.bigtable.RethinkCapturesDedup( + captures_db, options=options) listeners.append(captures_db) else: - dedup_db = warcprox.dedup.RethinkDedupDb(r, options=options) + dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') @@ -165,7 +167,7 @@ def init_controller(args): listeners.append(dedup_db) if args.rethinkdb_servers: - stats_db = warcprox.stats.RethinkStatsDb(r, options=options) + stats_db = warcprox.stats.RethinkStatsDb(rr, options=options) listeners.append(stats_db) elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') @@ -205,7 +207,7 @@ def init_controller(args): dedup_db=dedup_db, listeners=listeners, options=options) if args.rethinkdb_servers: - svcreg = rethinkstuff.ServiceRegistry(r) + svcreg = doublethink.ServiceRegistry(rr) else: svcreg = None @@ -286,17 +288,17 @@ def ensure_rethinkdb_tables(): '%(asctime)s %(levelname)s %(name)s.%(funcName)s' '(%(filename)s:%(lineno)d) %(message)s')) - r = rethinkstuff.Rethinker( + rr = doublethink.Rethinker( args.rethinkdb_servers.split(','), args.rethinkdb_db) # services table - rethinkstuff.ServiceRegistry(r) + doublethink.ServiceRegistry(rr) # stats table - warcprox.stats.RethinkStatsDb(r) + warcprox.stats.RethinkStatsDb(rr) # captures table - warcprox.bigtable.RethinkCaptures(r) + warcprox.bigtable.RethinkCaptures(rr) if __name__ == '__main__': main() diff --git a/warcprox/stats.py b/warcprox/stats.py index 9fd892d..db1884d 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -1,7 +1,7 @@ ''' warcprox/stats.py - keeps statistics on what has been archived -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -176,10 +176,10 @@ class RethinkStatsDb(StatsDb): logger = logging.getLogger("warcprox.stats.RethinkStatsDb") def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): - self.r = rethinker + self.rr = rethinker self.table = table self.shards = shards or 1 # 1 shard by default because it's probably a small table - self.replicas = replicas or min(3, len(self.r.servers)) + self.replicas = replicas or min(3, len(self.rr.servers)) self._ensure_db_table() self.options = options @@ -194,7 +194,7 @@ class RethinkStatsDb(StatsDb): self._update_batch() # starts repeating timer def _bucket_batch_update_reql(self, bucket): - return self.r.table(self.table).get(bucket).replace( + return self.rr.table(self.table).get(bucket).replace( lambda old: r.branch( old.eq(None), self._batch[bucket], old.merge({ "total": { @@ -239,18 +239,18 @@ class RethinkStatsDb(StatsDb): self.logger.info("finished") def _ensure_db_table(self): - dbs = self.r.db_list().run() - if not self.r.dbname in dbs: + dbs = self.rr.db_list().run() + if not self.rr.dbname in dbs: self.logger.info( - "creating rethinkdb database %s", repr(self.r.dbname)) - self.r.db_create(self.r.dbname).run() - tables = self.r.table_list().run() + "creating rethinkdb database %s", repr(self.rr.dbname)) + self.rr.db_create(self.rr.dbname).run() + tables = self.rr.table_list().run() if not self.table in tables: self.logger.info( "creating rethinkdb table %s in database %s shards=%s " - "replicas=%s", repr(self.table), repr(self.r.dbname), + "replicas=%s", repr(self.table), repr(self.rr.dbname), self.shards, self.replicas) - self.r.table_create( + self.rr.table_create( self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() @@ -267,7 +267,7 @@ class RethinkStatsDb(StatsDb): pass def value(self, bucket0="__all__", bucket1=None, bucket2=None): - bucket0_stats = self.r.table(self.table).get(bucket0).run() + bucket0_stats = self.rr.table(self.table).get(bucket0).run() self.logger.debug( 'stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)