mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
rethinkdb dedup
This commit is contained in:
parent
0e7a7fdd69
commit
e66dc3a9fb
2
setup.py
2
setup.py
@ -47,7 +47,7 @@ setuptools.setup(name='warcprox',
|
|||||||
license='GPL',
|
license='GPL',
|
||||||
packages=['warcprox'],
|
packages=['warcprox'],
|
||||||
package_data={'warcprox':['version.txt']},
|
package_data={'warcprox':['version.txt']},
|
||||||
install_requires=['certauth>=1.1.0', 'warctools>=4.8.3'], # gdbm not in pip :(
|
install_requires=['certauth>=1.1.0', 'warctools>=4.8.3', 'rethinkdb'], # gdbm not in pip :(
|
||||||
dependency_links=['git+https://github.com/internetarchive/warctools.git#egg=warctools-4.8.3'],
|
dependency_links=['git+https://github.com/internetarchive/warctools.git#egg=warctools-4.8.3'],
|
||||||
tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
||||||
cmdclass = {'test': PyTest},
|
cmdclass = {'test': PyTest},
|
||||||
|
@ -15,6 +15,9 @@ import os
|
|||||||
import json
|
import json
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
|
import rethinkdb
|
||||||
|
r = rethinkdb
|
||||||
|
import random
|
||||||
|
|
||||||
class DedupDb(object):
|
class DedupDb(object):
|
||||||
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
||||||
@ -36,12 +39,12 @@ class DedupDb(object):
|
|||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def save(self, key, response_record, offset):
|
def save(self, key, response_record):
|
||||||
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
|
|
||||||
py_value = {'i':record_id, 'u':url, 'd':date}
|
py_value = {'id':record_id, 'url':url, 'date':date}
|
||||||
json_value = json.dumps(py_value, separators=(',',':'))
|
json_value = json.dumps(py_value, separators=(',',':'))
|
||||||
|
|
||||||
self.db[key] = json_value.encode('utf-8')
|
self.db[key] = json_value.encode('utf-8')
|
||||||
@ -52,9 +55,9 @@ class DedupDb(object):
|
|||||||
if key in self.db:
|
if key in self.db:
|
||||||
json_result = self.db[key]
|
json_result = self.db[key]
|
||||||
result = json.loads(json_result.decode('utf-8'))
|
result = json.loads(json_result.decode('utf-8'))
|
||||||
result['i'] = result['i'].encode('latin1')
|
result['id'] = result['id'].encode('latin1')
|
||||||
result['u'] = result['u'].encode('latin1')
|
result['url'] = result['url'].encode('latin1')
|
||||||
result['d'] = result['d'].encode('latin1')
|
result['date'] = result['date'].encode('latin1')
|
||||||
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@ -63,3 +66,62 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
|
|||||||
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
||||||
recorded_url.dedup_info = dedup_db.lookup(key)
|
recorded_url.dedup_info = dedup_db.lookup(key)
|
||||||
|
|
||||||
|
class RethinkDedupDb:
|
||||||
|
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
||||||
|
|
||||||
|
def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3):
|
||||||
|
self.servers = servers
|
||||||
|
self.db = db
|
||||||
|
self.table = table
|
||||||
|
self.shards = shards
|
||||||
|
self.replicas = replicas
|
||||||
|
self._ensure_db_table()
|
||||||
|
|
||||||
|
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||||
|
# "Best practices: Managing connections: a connection per request"
|
||||||
|
def _random_server_connection(self):
|
||||||
|
server = random.choice(self.servers)
|
||||||
|
try:
|
||||||
|
host, port = server.split(":")
|
||||||
|
return r.connect(host=host, port=port)
|
||||||
|
except ValueError:
|
||||||
|
return r.connect(host=server)
|
||||||
|
|
||||||
|
def _ensure_db_table(self):
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
dbs = r.db_list().run(conn)
|
||||||
|
if not self.db in dbs:
|
||||||
|
self.logger.info("creating rethinkdb database %s", repr(self.db))
|
||||||
|
r.db_create(self.db).run(conn)
|
||||||
|
tables = r.db(self.db).table_list().run(conn)
|
||||||
|
if not self.table in tables:
|
||||||
|
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
|
||||||
|
r.db(self.db).table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run(conn)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def save(self, key, response_record):
|
||||||
|
k = key.decode("utf-8") if isinstance(key, bytes) else key
|
||||||
|
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
||||||
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
|
record = {'key':k,'url':url,'date':date,'id':record_id}
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn)
|
||||||
|
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||||
|
raise Exception("unexpected result %s saving %s", result, record)
|
||||||
|
self.logger.debug('dedup db saved %s:%s', key, record)
|
||||||
|
|
||||||
|
def lookup(self, key):
|
||||||
|
k = key.decode("utf-8") if isinstance(key, bytes) else key
|
||||||
|
with self._random_server_connection() as conn:
|
||||||
|
result = r.db(self.db).table(self.table).get(k).run(conn)
|
||||||
|
if result:
|
||||||
|
for x in result:
|
||||||
|
result[x] = result[x].encode("utf-8")
|
||||||
|
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
||||||
|
return result
|
||||||
|
@ -20,6 +20,7 @@ import signal
|
|||||||
import threading
|
import threading
|
||||||
import certauth.certauth
|
import certauth.certauth
|
||||||
import warcprox
|
import warcprox
|
||||||
|
import re
|
||||||
|
|
||||||
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
||||||
arg_parser = argparse.ArgumentParser(prog=prog,
|
arg_parser = argparse.ArgumentParser(prog=prog,
|
||||||
@ -55,7 +56,10 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
|
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
|
||||||
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
arg_parser.add_argument('--base32', dest='base32', action='store_true',
|
||||||
default=False, help='write digests in Base32 instead of hex')
|
default=False, help='write digests in Base32 instead of hex')
|
||||||
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
group = arg_parser.add_mutually_exclusive_group()
|
||||||
|
group.add_argument('--dedup-rethinkdb-url', dest='dedup_rethinkdb_url',
|
||||||
|
help='persistent deduplication rethink db url, e.g. rethinkdb://db0.foo.org,db0.foo.org:38015,db1.foo.org/warcprox/dedup')
|
||||||
|
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
||||||
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
||||||
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
|
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
|
||||||
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
|
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
|
||||||
@ -108,7 +112,17 @@ def main(argv=sys.argv):
|
|||||||
logging.fatal(e)
|
logging.fatal(e)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
if args.dedup_db_file in (None, '', '/dev/null'):
|
if args.dedup_rethinkdb_url:
|
||||||
|
m = re.fullmatch(r"rethinkdb://([^/]+)/([^/]+)/([^/]+)", args.dedup_rethinkdb_url)
|
||||||
|
if m:
|
||||||
|
servers = m.group(1).split(",")
|
||||||
|
db = m.group(2)
|
||||||
|
table = m.group(3)
|
||||||
|
dedup_db = warcprox.dedup.RethinkDedupDb(servers, db, table)
|
||||||
|
else:
|
||||||
|
logging.fatal("failed to parse dedup rethinkdb url %s", args.dedup_rethinkdb_url)
|
||||||
|
exit(1)
|
||||||
|
elif args.dedup_db_file in (None, '', '/dev/null'):
|
||||||
logging.info('deduplication disabled')
|
logging.info('deduplication disabled')
|
||||||
dedup_db = None
|
dedup_db = None
|
||||||
else:
|
else:
|
||||||
|
@ -32,9 +32,9 @@ class WarcRecordBuilder:
|
|||||||
url=recorded_url.url, warc_date=warc_date,
|
url=recorded_url.url, warc_date=warc_date,
|
||||||
data=response_header_block,
|
data=response_header_block,
|
||||||
warc_type=warctools.WarcRecord.REVISIT,
|
warc_type=warctools.WarcRecord.REVISIT,
|
||||||
refers_to=recorded_url.dedup_info['i'],
|
refers_to=recorded_url.dedup_info['id'],
|
||||||
refers_to_target_uri=recorded_url.dedup_info['u'],
|
refers_to_target_uri=recorded_url.dedup_info['url'],
|
||||||
refers_to_date=recorded_url.dedup_info['d'],
|
refers_to_date=recorded_url.dedup_info['date'],
|
||||||
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
||||||
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
|
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
|
||||||
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
|
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
|
||||||
|
@ -70,7 +70,7 @@ class WarcWriterThread(threading.Thread):
|
|||||||
and recorded_url.response_recorder.payload_size() > 0):
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
|
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
|
||||||
self.writer_pool.default_warc_writer.record_builder.base32)
|
self.writer_pool.default_warc_writer.record_builder.base32)
|
||||||
self.dedup_db.save(key, records[0], records[0].offset)
|
self.dedup_db.save(key, records[0])
|
||||||
|
|
||||||
def _save_playback_info(self, recorded_url, records):
|
def _save_playback_info(self, recorded_url, records):
|
||||||
if self.playback_index_db is not None:
|
if self.playback_index_db is not None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user