rethinkstuff -> doublethink

This commit is contained in:
Noah Levitt 2017-03-02 15:06:26 -08:00
parent 3a80fde50c
commit 842bfd651c
6 changed files with 107 additions and 91 deletions

View File

@ -41,7 +41,7 @@ deps = [
'warctools',
'kafka-python>=1.0.1',
'surt>=0.3b4',
'rethinkstuff',
'doublethink>=0.2.0.dev69',
'PySocks',
]
try:
@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.1b1.dev52',
version='2.1b1.dev53',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -36,7 +36,7 @@ import requests
import re
import json
import random
import rethinkstuff
import doublethink
from hanzo import warctools
import warnings
import pprint
@ -245,15 +245,15 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
servers = rethinkdb_servers.split(",")
if rethinkdb_big_table:
db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
captures_db = warcprox.bigtable.RethinkCaptures(r)
rr = doublethink.Rethinker(servers, db)
captures_db = warcprox.bigtable.RethinkCaptures(rr)
captures_db.start()
def fin():
if captures_db:
captures_db.close()
# logging.info('dropping rethinkdb database {}'.format(db))
# result = captures_db.r.db_drop(db).run()
# result = captures_db.rr.db_drop(db).run()
# logging.info("result=%s", result)
request.addfinalizer(fin)
@ -268,15 +268,15 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db):
else:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
ddb = warcprox.dedup.RethinkDedupDb(r)
rr = doublethink.Rethinker(servers, db)
ddb = warcprox.dedup.RethinkDedupDb(rr)
def fin():
if rethinkdb_servers:
ddb.close()
if not captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
result = ddb.r.db_drop(db).run()
result = ddb.rr.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
@ -305,8 +305,8 @@ def stats_db(request, rethinkdb_servers):
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
sdb = warcprox.stats.RethinkStatsDb(r)
rr = doublethink.Rethinker(servers, db)
sdb = warcprox.stats.RethinkStatsDb(rr)
sdb.start()
else:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
@ -318,7 +318,7 @@ def stats_db(request, rethinkdb_servers):
sdb.close()
if rethinkdb_servers:
logging.info('dropping rethinkdb database {}'.format(db))
result = sdb.r.db_drop(db).run()
result = sdb.rr.db_drop(db).run()
logging.info("result=%s", result)
else:
logging.info('deleting file {}'.format(stats_db_file))
@ -332,15 +332,15 @@ def service_registry(request, rethinkdb_servers):
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
rr = doublethink.Rethinker(servers, db)
def fin():
logging.info('dropping rethinkdb database {}'.format(db))
result = r.db_drop(db).run()
result = rr.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
return rethinkstuff.ServiceRegistry(r)
return doublethink.ServiceRegistry(rr)
else:
return None
@ -1265,7 +1265,7 @@ def test_dedup_ok_flag(
# inspect what's in rethinkdb more closely
rethink_captures = warcprox_.warc_writer_thread.dedup_db.captures_db
results_iter = rethink_captures.r.table(rethink_captures.table).get_all(
results_iter = rethink_captures.rr.table(rethink_captures.table).get_all(
['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response',
'test_dedup_ok_flag'], index='sha1_warc_type').order_by(
'timestamp').run()

View File

@ -34,19 +34,20 @@ import os
import hashlib
import threading
import datetime
import rethinkstuff
import doublethink
import rethinkdb as r
class RethinkCaptures:
"""Inserts in batches every 0.5 seconds"""
logger = logging.getLogger("warcprox.bigtable.RethinkCaptures")
def __init__(
self, r, table="captures", shards=None, replicas=None,
self, rr, table="captures", shards=None, replicas=None,
options=warcprox.Options()):
self.r = r
self.rr = rr
self.table = table
self.shards = shards or len(r.servers)
self.replicas = replicas or min(3, len(r.servers))
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
self.options = options
self._ensure_db_table()
@ -64,7 +65,7 @@ class RethinkCaptures:
try:
with self._batch_lock:
if len(self._batch) > 0:
result = self.r.table(self.table).insert(
result = self.rr.table(self.table).insert(
self._batch, conflict="replace").run()
if (result["inserted"] + result["replaced"]
+ result["unchanged"] != len(self._batch)):
@ -99,16 +100,22 @@ class RethinkCaptures:
self.logger.info("finished")
def _ensure_db_table(self):
dbs = self.r.db_list().run()
if not self.r.dbname in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.dbname))
self.r.db_create(self.r.dbname).run()
tables = self.r.table_list().run()
dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs:
self.logger.info(
"creating rethinkdb database %s", repr(self.rr.dbname))
self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.dbname))
self.r.table_create(self.table, shards=self.shards, replicas=self.replicas).run()
self.r.table(self.table).index_create("abbr_canon_surt_timestamp", [self.r.row["abbr_canon_surt"], self.r.row["timestamp"]]).run()
self.r.table(self.table).index_create("sha1_warc_type", [self.r.row["sha1base32"], self.r.row["warc_type"], self.r.row["bucket"]]).run()
self.logger.info(
"creating rethinkdb table %s in database %s",
repr(self.table), repr(self.rr.dbname))
self.rr.table_create(self.table, shards=self.shards, replicas=self.replicas).run()
self.rr.table(self.table).index_create(
"abbr_canon_surt_timestamp",
[r.row["abbr_canon_surt"], r.row["timestamp"]]).run()
self.rr.table(self.table).index_create("sha1_warc_type", [
r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run()
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
if algo != "sha1":
@ -116,10 +123,10 @@ class RethinkCaptures:
"digest type is %s but big captures table is indexed by "
"sha1" % algo)
sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
results_iter = self.r.table(self.table).get_all(
results_iter = self.rr.table(self.table).get_all(
[sha1base32, "response", bucket],
index="sha1_warc_type").filter(
self.r.row["dedup_ok"], default=True).run()
r.row["dedup_ok"], default=True).run()
results = list(results_iter)
if len(results) > 0:
if len(results) > 1:
@ -162,7 +169,7 @@ class RethinkCaptures:
"abbr_canon_surt": canon_surt[:150],
"canon_surt": canon_surt,
"timestamp": recorded_url.timestamp.replace(
tzinfo=rethinkstuff.UTC),
tzinfo=doublethink.UTC),
"url": recorded_url.url.decode("utf-8"),
"offset": records[0].offset,
"filename": os.path.basename(records[0].warc_filename),

View File

@ -1,23 +1,23 @@
#
# warcprox/dedup.py - identical payload digest deduplication
#
# Copyright (C) 2013-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
'''
warcprox/dedup.py - identical payload digest deduplication
Copyright (C) 2013-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
@ -112,24 +112,29 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, r, table="dedup", shards=None, replicas=None, options=warcprox.Options()):
self.r = r
def __init__(self, rr, table="dedup", shards=None, replicas=None, options=warcprox.Options()):
self.rr = rr
self.table = table
self.shards = shards or len(r.servers)
self.replicas = replicas or min(3, len(r.servers))
self.shards = shards or len(rr.servers)
self.replicas = replicas or min(3, len(rr.servers))
self._ensure_db_table()
self.options = options
def _ensure_db_table(self):
dbs = self.r.db_list().run()
if not self.r.dbname in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.dbname))
self.r.db_create(self.r.dbname).run()
tables = self.r.table_list().run()
dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs:
self.logger.info(
"creating rethinkdb database %s", repr(self.rr.dbname))
self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s",
repr(self.table), repr(self.r.dbname), self.shards, self.replicas)
self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run()
self.logger.info(
"creating rethinkdb table %s in database %s shards=%s "
"replicas=%s", repr(self.table), repr(self.rr.dbname),
self.shards, self.replicas)
self.rr.table_create(
self.table, primary_key="key", shards=self.shards,
replicas=self.replicas).run()
def start(self):
@ -151,7 +156,8 @@ class RethinkDedupDb:
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record = {'key':k,'url':url,'date':date,'id':record_id}
result = self.r.table(self.table).insert(record,conflict="replace").run()
result = self.rr.table(self.table).insert(
record, conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', k, record)
@ -159,7 +165,7 @@ class RethinkDedupDb:
def lookup(self, digest_key, bucket=""):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
result = self.r.table(self.table).get(k).run()
result = self.rr.table(self.table).get(k).run()
if result:
for x in result:
result[x] = result[x].encode("utf-8")
@ -175,3 +181,4 @@ class RethinkDedupDb:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
else:
self.save(digest_key, records[0])

View File

@ -40,7 +40,7 @@ import threading
import certauth.certauth
import warcprox
import re
import rethinkstuff
import doublethink
import cryptography.hazmat.backends.openssl
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
@ -149,13 +149,15 @@ def init_controller(args):
listeners = []
if args.rethinkdb_servers:
r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(r, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options)
captures_db = warcprox.bigtable.RethinkCaptures(rr, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(
captures_db, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(r, options=options)
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
@ -165,7 +167,7 @@ def init_controller(args):
listeners.append(dedup_db)
if args.rethinkdb_servers:
stats_db = warcprox.stats.RethinkStatsDb(r, options=options)
stats_db = warcprox.stats.RethinkStatsDb(rr, options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
@ -205,7 +207,7 @@ def init_controller(args):
dedup_db=dedup_db, listeners=listeners, options=options)
if args.rethinkdb_servers:
svcreg = rethinkstuff.ServiceRegistry(r)
svcreg = doublethink.ServiceRegistry(rr)
else:
svcreg = None
@ -286,17 +288,17 @@ def ensure_rethinkdb_tables():
'%(asctime)s %(levelname)s %(name)s.%(funcName)s'
'(%(filename)s:%(lineno)d) %(message)s'))
r = rethinkstuff.Rethinker(
rr = doublethink.Rethinker(
args.rethinkdb_servers.split(','), args.rethinkdb_db)
# services table
rethinkstuff.ServiceRegistry(r)
doublethink.ServiceRegistry(rr)
# stats table
warcprox.stats.RethinkStatsDb(r)
warcprox.stats.RethinkStatsDb(rr)
# captures table
warcprox.bigtable.RethinkCaptures(r)
warcprox.bigtable.RethinkCaptures(rr)
if __name__ == '__main__':
main()

View File

@ -1,7 +1,7 @@
'''
warcprox/stats.py - keeps statistics on what has been archived
Copyright (C) 2013-2016 Internet Archive
Copyright (C) 2013-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -176,10 +176,10 @@ class RethinkStatsDb(StatsDb):
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()):
self.r = rethinker
self.rr = rethinker
self.table = table
self.shards = shards or 1 # 1 shard by default because it's probably a small table
self.replicas = replicas or min(3, len(self.r.servers))
self.replicas = replicas or min(3, len(self.rr.servers))
self._ensure_db_table()
self.options = options
@ -194,7 +194,7 @@ class RethinkStatsDb(StatsDb):
self._update_batch() # starts repeating timer
def _bucket_batch_update_reql(self, bucket):
return self.r.table(self.table).get(bucket).replace(
return self.rr.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), self._batch[bucket], old.merge({
"total": {
@ -239,18 +239,18 @@ class RethinkStatsDb(StatsDb):
self.logger.info("finished")
def _ensure_db_table(self):
dbs = self.r.db_list().run()
if not self.r.dbname in dbs:
dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs:
self.logger.info(
"creating rethinkdb database %s", repr(self.r.dbname))
self.r.db_create(self.r.dbname).run()
tables = self.r.table_list().run()
"creating rethinkdb database %s", repr(self.rr.dbname))
self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run()
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %s in database %s shards=%s "
"replicas=%s", repr(self.table), repr(self.r.dbname),
"replicas=%s", repr(self.table), repr(self.rr.dbname),
self.shards, self.replicas)
self.r.table_create(
self.rr.table_create(
self.table, primary_key="bucket", shards=self.shards,
replicas=self.replicas).run()
@ -267,7 +267,7 @@ class RethinkStatsDb(StatsDb):
pass
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
bucket0_stats = self.r.table(self.table).get(bucket0).run()
bucket0_stats = self.rr.table(self.table).get(bucket0).run()
self.logger.debug(
'stats db lookup of bucket=%s returned %s',
bucket0, bucket0_stats)