get rid of dbm, switch to sqlite, for easier portability, clarity around threading

This commit is contained in:
Noah Levitt 2017-05-24 13:57:09 -07:00
parent 99dd840d20
commit 95dfa54968
11 changed files with 195 additions and 372 deletions

View File

@ -18,8 +18,6 @@ matrix:
addons: addons:
apt: apt:
packages: packages:
- python-gdbm
- python3-gdbm
- tor - tor
services: services:

View File

@ -15,7 +15,7 @@ To install latest release run:
:: ::
# apt-get install libffi-dev libssl-dev python3-gdbm # apt-get install libffi-dev libssl-dev
pip install warcprox pip install warcprox
You can also install the latest bleeding edge code: You can also install the latest bleeding edge code:

View File

@ -24,7 +24,6 @@ import sys
import setuptools import setuptools
import setuptools.command.test import setuptools.command.test
# special class needs to be added to support the pytest written dump-anydbm tests
class PyTest(setuptools.command.test.test): class PyTest(setuptools.command.test.test):
def finalize_options(self): def finalize_options(self):
setuptools.command.test.test.finalize_options(self) setuptools.command.test.test.finalize_options(self)
@ -68,7 +67,6 @@ setuptools.setup(
'warcprox=warcprox.main:main', 'warcprox=warcprox.main:main',
('warcprox-ensure-rethinkdb-tables=' ('warcprox-ensure-rethinkdb-tables='
'warcprox.main:ensure_rethinkdb_tables'), 'warcprox.main:ensure_rethinkdb_tables'),
'dump-anydbm=warcprox.dump_anydbm:main',
], ],
}, },
zip_safe=False, zip_safe=False,

View File

@ -39,9 +39,8 @@ RUN mkdir -vp /etc/service/rethinkdb \
&& chmod a+x /etc/service/rethinkdb/run && chmod a+x /etc/service/rethinkdb/run
RUN apt-get -y install git RUN apt-get -y install git
RUN apt-get -y install python-gdbm python3-gdbm libpython2.7-dev \ RUN apt-get -y install libpython2.7-dev libpython3-dev libffi-dev libssl-dev \
libpython3-dev libffi-dev libssl-dev python-setuptools \ python-setuptools python3-setuptools
python3-setuptools
RUN apt-get -y install gcc RUN apt-get -y install gcc
RUN echo '57ff41e99cb01b6a1c2b0999161589b726f0ec8b /tmp/pip-9.0.1.tar.gz' > /tmp/sha1sums.txt RUN echo '57ff41e99cb01b6a1c2b0999161589b726f0ec8b /tmp/pip-9.0.1.tar.gz' > /tmp/sha1sums.txt

View File

@ -1,154 +0,0 @@
#!/usr/bin/env python
#
# tests/test_dump-anydbm.py - tests for dump-anydbm
#
# Copyright (C) 2013-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
import pytest
import os
import tempfile
import subprocess # to access the script from shell
import sys
import glob
import distutils
# will try as python 3 then default to python 2 modules
try:
import dbm
from dbm import ndbm
from dbm import gnu as gdbm
from dbm import dumb
whichdb = dbm.whichdb
ndbm_type = b"dbm.ndbm"
gdbm_type = b"dbm.gnu"
dumb_type = b"dbm.dumb"
except:
import dbm as ndbm
import gdbm
import dumbdbm as dumb
from whichdb import whichdb
ndbm_type = b"dbm"
gdbm_type = b"gdbm"
dumb_type = b"dumbdbm"
#global settings
key1 = 'very first key'
key2 = 'second key'
val1 = 'very first value'
val2 = 'second value'
py = sys.executable
dump_anydbm_loc = distutils.spawn.find_executable("dump-anydbm")
@pytest.fixture(scope="function")
def gdbm_test_db(request):
temp_file = tempfile.NamedTemporaryFile(delete=False)
print("creating test gdbm file {}".format(temp_file.name))
test_db = gdbm.open(temp_file.name, "n")
test_db[key1] = val1
test_db[key2] = val2
test_db.close()
def delete_gdbm_test_db():
temp_file.close()
for f in glob.glob("{}*".format(temp_file.name)):
print("deleting test gdbm file {}".format(f))
os.remove(f)
request.addfinalizer(delete_gdbm_test_db)
return temp_file.name
@pytest.fixture(scope="function")
def ndbm_test_db(request):
temp_file = tempfile.NamedTemporaryFile(delete=False)
test_db = ndbm.open(temp_file.name, "n")
test_db[key1] = val1
test_db[key2] = val2
test_db.close()
def delete_test_ndbm():
temp_file.close()
for f in glob.glob("{}*".format(temp_file.name)):
print("deleting test ndbm file {}".format(f))
os.remove(f)
request.addfinalizer(delete_test_ndbm)
return temp_file.name
@pytest.fixture(scope="function")
def dumbdbm_test_db(request):
temp_file = tempfile.NamedTemporaryFile(delete=False)
print("creating test dumbdbm file {}".format(temp_file.name))
test_db = dumb.open(temp_file.name, "n")
test_db[key1] = val1
test_db[key2] = val2
test_db.close()
def delete_dumbdbm_test_db():
temp_file.close()
for f in glob.glob("{}*".format(temp_file.name)):
print("deleting test dumbdbm file {}".format(f))
os.remove(f)
request.addfinalizer(delete_dumbdbm_test_db)
return temp_file.name
def test_dumpanydbm_identify_gdbm(gdbm_test_db):
print("running test_dumpanydbm_identify_gdbm")
output = subprocess.check_output([py, dump_anydbm_loc, gdbm_test_db])
print("script printout: ")
print(output)
print("check_one: ")
print(gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nvery first key:very first value\nsecond key:second value\n')
assert (output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or
output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nsecond key:second value\nvery first key:very first value\n')
def test_dumpanydbm_identify_ndbm(ndbm_test_db):
print("running test_dumpanydbm_identify_ndbm")
output = subprocess.check_output([py, dump_anydbm_loc, ndbm_test_db])
print("script printout: ")
print(output)
print("check_one: ")
print(ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nvery first key:very first value\nsecond key:second value\n')
assert (output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or
output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nsecond key:second value\nvery first key:very first value\n')
def test_dumpanydbm_identify_dumbdbm(dumbdbm_test_db):
print("running test_dumpanydbm_identify_dumbdbm")
output = subprocess.check_output([py, dump_anydbm_loc, dumbdbm_test_db])
print("script printout: ")
print(output)
print("check_one: ")
print(dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nvery first key:very first value\nsecond key:second value\n')
assert (output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nvery first key:very first value\nsecond key:second value\n' or
output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nsecond key:second value\nvery first key:very first value\n')

View File

@ -83,7 +83,7 @@ def _send(self, data):
# http_client.HTTPConnection.send = _send # http_client.HTTPConnection.send = _send
logging.basicConfig( logging.basicConfig(
stream=sys.stdout, level=logging.INFO, # level=warcprox.TRACE, stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
@ -322,9 +322,9 @@ def stats_db(request, rethinkdb_servers):
logging.info('dropping rethinkdb database {}'.format(db)) logging.info('dropping rethinkdb database {}'.format(db))
result = sdb.rr.db_drop(db).run() result = sdb.rr.db_drop(db).run()
logging.info("result=%s", result) logging.info("result=%s", result)
else: # else:
logging.info('deleting file {}'.format(stats_db_file)) # logging.info('deleting file {}'.format(stats_db_file))
os.unlink(stats_db_file) # os.unlink(stats_db_file)
request.addfinalizer(fin) request.addfinalizer(fin)
return sdb return sdb

View File

@ -1,5 +1,5 @@
''' '''
warcprox/dedup.py - identical payload digest deduplication warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2017 Internet Archive
@ -27,61 +27,71 @@ import json
from hanzo import warctools from hanzo import warctools
import warcprox import warcprox
import random import random
import sqlite3
import threading
class DedupDb(object): class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb") logger = logging.getLogger("warcprox.dedup.DedupDb")
def __init__(self, dbm_file='./warcprox-dedup.db', options=warcprox.Options()): def __init__(
try: self, file='./warcprox.sqlite', options=warcprox.Options()):
import dbm.gnu as dbm_gnu self.file = file
except ImportError:
try:
import gdbm as dbm_gnu
except ImportError:
import anydbm as dbm_gnu
if os.path.exists(dbm_file):
self.logger.info('opening existing deduplication database {}'.format(dbm_file))
else:
self.logger.info('creating new deduplication database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options self.options = options
def start(self): def start(self):
pass if os.path.exists(self.file):
self.logger.info(
'opening existing deduplication database %s',
self.file)
else:
self.logger.info(
'creating new deduplication database %s', self.file)
conn = sqlite3.connect(self.file)
conn.execute(
'create table if not exists dedup ('
' key varchar(300) primary key,'
' value varchar(4000)'
');')
conn.commit()
conn.close()
def stop(self): def stop(self):
self.close() pass
def close(self): def close(self):
self.db.close() pass
def sync(self): def sync(self):
try: pass
self.db.sync()
except:
pass
def save(self, digest_key, response_record, bucket=""): def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
key = digest_key + b"|" + bucket.encode("utf-8") key = digest_key.decode('utf-8') + "|" + bucket
py_value = {'id':record_id, 'url':url, 'date':date} py_value = {'id':record_id, 'url':url, 'date':date}
json_value = json.dumps(py_value, separators=(',',':')) json_value = json.dumps(py_value, separators=(',',':'))
self.db[key] = json_value.encode('utf-8') conn = sqlite3.connect(self.file)
conn.execute(
'insert into dedup (key, value) values (?, ?);',
(key, json_value))
conn.commit()
conn.close()
self.logger.debug('dedup db saved %s:%s', key, json_value) self.logger.debug('dedup db saved %s:%s', key, json_value)
def lookup(self, digest_key, bucket=""): def lookup(self, digest_key, bucket=""):
result = None result = None
key = digest_key + b"|" + bucket.encode("utf-8") key = digest_key.decode('utf-8') + '|' + bucket
if key in self.db: conn = sqlite3.connect(self.file)
json_result = self.db[key] cursor = conn.execute('select value from dedup where key = ?', (key,))
result = json.loads(json_result.decode('utf-8')) result_tuple = cursor.fetchone()
conn.close()
if result_tuple:
result = json.loads(result_tuple[0])
result['id'] = result['id'].encode('latin1') result['id'] = result['id'].encode('latin1')
result['url'] = result['url'].encode('latin1') result['url'] = result['url'].encode('latin1')
result['date'] = result['date'].encode('latin1') result['date'] = result['date'].encode('latin1')
@ -91,10 +101,13 @@ class DedupDb(object):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, digest_key = warcprox.digest_str(
self.options.base32) recorded_url.response_recorder.payload_digest,
self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta["captures-bucket"])
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python
'''
dump-anydbm - dumps contents of dbm file to stdout
Dump contents of database to stdout. Database can be any file that the anydbm
module can read. Included with warcprox because it's useful for inspecting a
deduplication database or a playback index database, but it is a generic tool.
Copyright (C) 2013-2016 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
try:
import dbm
from dbm import ndbm
whichdb = dbm.whichdb
except:
import anydbm
dbm = anydbm
from whichdb import whichdb
import sys
import os.path
if __name__ == "__main__":
main()
def main():
if len(sys.argv) != 2:
sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0]))
exit(1)
filename = sys.argv[1]
which = whichdb(filename)
# if which returns none and the file does not exist, print usage line
if which == None and not os.path.exists(sys.argv[1]):
sys.stderr.write('No such file {}\n\n'.format(sys.argv[1]))
sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0]))
exit(1)
# covers case where an ndbm is checked with its extension & identified incorrectly
elif 'bsd' in which:
correct_file = filename.split(".db")[0]
correct_which = whichdb(correct_file)
if correct_which in ('dbm', 'dbm.ndbm'):
filename = correct_file
which = correct_which
elif which == '':
sys.stderr.write("{} is an unrecognized database type\n".format(sys.argv[1]))
sys.stderr.write("Try the file again by removing the extension\n")
exit(1)
try:
out = sys.stdout.buffer
except AttributeError:
out = sys.stdout
out.write(filename.encode('UTF-8') + b' is a ' + which.encode('UTF-8') + b' db\n')
db = dbm.open(filename, 'r')
for key in db.keys():
out.write(key + b":" + db[key] + b"\n")

View File

@ -97,7 +97,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)') action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file', arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port', arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback') type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
@ -105,7 +105,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
help='playback index database file (only used if --playback-port is specified)') help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group() group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',

View File

@ -2,7 +2,7 @@
warcprox/playback.py - rudimentary support for playback of urls archived by warcprox/playback.py - rudimentary support for playback of urls archived by
warcprox (not much used or maintained) warcprox (not much used or maintained)
Copyright (C) 2013-2016 Internet Archive Copyright (C) 2013-2017 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -40,6 +40,7 @@ import traceback
import re import re
from warcprox.mitmproxy import MitmProxyHandler from warcprox.mitmproxy import MitmProxyHandler
import warcprox import warcprox
import sqlite3
class PlaybackProxyHandler(MitmProxyHandler): class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler")
@ -49,10 +50,9 @@ class PlaybackProxyHandler(MitmProxyHandler):
# don't connect to any remote server! # don't connect to any remote server!
pass pass
# @Override # @Override
def _proxy_request(self): def _proxy_request(self):
date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8')) date, location = self.server.playback_index_db.lookup_latest(self.url)
self.logger.debug('lookup_latest returned {}:{}'.format(date, location)) self.logger.debug('lookup_latest returned {}:{}'.format(date, location))
status = None status = None
@ -82,7 +82,8 @@ class PlaybackProxyHandler(MitmProxyHandler):
sz = len(headers) + len(payload) sz = len(headers) + len(payload)
self.log_message('"%s" %s %s %s', self.log_message('"%s" %s %s %s',
self.requestline, str(status), str(sz), repr(location) if location else '-') self.requestline, str(status), str(sz),
repr(location) if location else '-')
def _open_warc_at_offset(self, warcfilename, offset): def _open_warc_at_offset(self, warcfilename, offset):
@ -99,7 +100,6 @@ class PlaybackProxyHandler(MitmProxyHandler):
return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset) return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset)
def _send_response(self, headers, payload_fh): def _send_response(self, headers, payload_fh):
status = '-' status = '-'
m = re.match(br'^HTTP/\d\.\d (\d{3})', headers) m = re.match(br'^HTTP/\d\.\d (\d{3})', headers)
@ -118,8 +118,10 @@ class PlaybackProxyHandler(MitmProxyHandler):
return status, sz return status, sz
def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date): def _send_headers_and_refd_payload(
location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to) self, headers, refers_to, refers_to_target_uri, refers_to_date):
location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, record_id=refers_to)
self.logger.debug('loading http payload from {}'.format(location)) self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o']) fh = self._open_warc_at_offset(location['f'], location['o'])
@ -174,12 +176,20 @@ class PlaybackProxyHandler(MitmProxyHandler):
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile)) raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(warctools.WarcRecord.REFERS_TO) refers_to = record.get_header(
refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI) warctools.WarcRecord.REFERS_TO).decode('latin1')
refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE) refers_to_target_uri = record.get_header(
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
'latin1')
refers_to_date = record.get_header(
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri)) self.logger.debug(
return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date) 'revisit record references %s:%s capture of %s',
refers_to_date, refers_to, refers_to_target_uri)
return self._send_headers_and_refd_payload(
record.content[1], refers_to, refers_to_target_uri,
refers_to_date)
else: else:
# send it back raw, whatever it is # send it back raw, whatever it is
@ -220,30 +230,30 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
class PlaybackIndexDb(object): class PlaybackIndexDb(object):
logger = logging.getLogger("warcprox.playback.PlaybackIndexDb") logger = logging.getLogger("warcprox.playback.PlaybackIndexDb")
def __init__(self, dbm_file='./warcprox-playback-index.db'): def __init__(self, file='./warcprox.sqlite'):
try: self.file = file
import dbm.gnu as dbm_gnu
except ImportError:
try:
import gdbm as dbm_gnu
except ImportError:
import anydbm as dbm_gnu
if os.path.exists(dbm_file): if os.path.exists(self.file):
self.logger.info('opening existing playback index database {}'.format(dbm_file)) self.logger.info(
'opening existing playback index database %s', self.file)
else: else:
self.logger.info('creating new playback index database {}'.format(dbm_file)) self.logger.info(
'creating new playback index database %s', self.file)
self.db = dbm_gnu.open(dbm_file, 'c') conn = sqlite3.connect(self.file)
conn.execute(
'create table if not exists playback ('
' url varchar(4000) primary key,'
' value varchar(4000)'
');')
conn.commit()
conn.close()
def close(self): def close(self):
self.db.close() pass
def sync(self): def sync(self):
try: pass
self.db.sync()
except:
pass
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
self.save(records[0].warc_filename, records, records[0].offset) self.save(records[0].warc_filename, records, records[0].offset)
@ -251,7 +261,7 @@ class PlaybackIndexDb(object):
def save(self, warcfile, recordset, offset): def save(self, warcfile, recordset, offset):
response_record = recordset[0] response_record = recordset[0]
# XXX canonicalize url? # XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL) url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
@ -259,9 +269,13 @@ class PlaybackIndexDb(object):
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
if url in self.db:
existing_json_value = self.db[url].decode('utf-8') conn = sqlite3.connect(self.file)
py_value = json.loads(existing_json_value) cursor = conn.execute(
'select value from playback where url = ?', (url,))
result_tuple = cursor.fetchone()
if result_tuple:
py_value = json.loads(result_tuple[0])
else: else:
py_value = {} py_value = {}
@ -272,16 +286,25 @@ class PlaybackIndexDb(object):
json_value = json.dumps(py_value, separators=(',',':')) json_value = json.dumps(py_value, separators=(',',':'))
self.db[url] = json_value.encode('utf-8') conn.execute(
'insert or replace into playback (url, value) values (?, ?)',
(url, json_value))
conn.commit()
conn.close()
self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) self.logger.debug('playback index saved: {}:{}'.format(url, json_value))
def lookup_latest(self, url): def lookup_latest(self, url):
if url not in self.db: conn = sqlite3.connect(self.file)
cursor = conn.execute(
'select value from playback where url = ?', (url,))
result_tuple = cursor.fetchone()
conn.close()
if not result_tuple:
return None, None return None, None
json_value = self.db[url].decode('utf-8') json_value = result_tuple[0]
self.logger.debug("{}:{}".format(repr(url), repr(json_value))) self.logger.debug("{}:{}".format(repr(url), repr(json_value)))
py_value = json.loads(json_value) py_value = json.loads(json_value)
@ -290,26 +313,33 @@ class PlaybackIndexDb(object):
result['i'] = result['i'].encode('ascii') result['i'] = result['i'].encode('ascii')
return latest_date, result return latest_date, result
# in python3 params are bytes # in python3 params are bytes
def lookup_exact(self, url, warc_date, record_id): def lookup_exact(self, url, warc_date, record_id):
if url not in self.db: conn = sqlite3.connect(self.file)
cursor = conn.execute(
'select value from playback where url = ?', (url,))
result_tuple = cursor.fetchone()
conn.close()
if not result_tuple:
return None return None
json_value = self.db[url].decode('utf-8') json_value = result_tuple[0]
self.logger.debug("{}:{}".format(repr(url), repr(json_value))) self.logger.debug("%s:%s", repr(url), repr(json_value))
py_value = json.loads(json_value) py_value = json.loads(json_value)
warc_date_str = warc_date.decode('ascii') if warc_date in py_value:
for record in py_value[warc_date]:
if warc_date_str in py_value: if record['i'] == record_id:
for record in py_value[warc_date_str]: self.logger.debug(
if record['i'].encode('ascii') == record_id: "found exact match for (%s,%s,%s)",
self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) repr(warc_date), repr(record_id), repr(url))
record['i'] = record['i'].encode('ascii') record['i'] = record['i']
return record return record
else: else:
self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) self.logger.info(
"match not found for (%s,%s,%s)", repr(warc_date),
repr(record_id), repr(url))
return None return None

View File

@ -31,6 +31,7 @@ import threading
import rethinkdb as r import rethinkdb as r
import datetime import datetime
import urlcanon import urlcanon
import sqlite3
def _empty_bucket(bucket): def _empty_bucket(bucket):
return { return {
@ -52,53 +53,52 @@ def _empty_bucket(bucket):
class StatsDb: class StatsDb:
logger = logging.getLogger("warcprox.stats.StatsDb") logger = logging.getLogger("warcprox.stats.StatsDb")
def __init__(self, dbm_file='./warcprox-stats.db', options=warcprox.Options()): def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()):
try: self.file = file
import dbm.gnu as dbm_gnu
except ImportError:
try:
import gdbm as dbm_gnu
except ImportError:
import anydbm as dbm_gnu
if os.path.exists(dbm_file):
self.logger.info('opening existing stats database {}'.format(dbm_file))
else:
self.logger.info('creating new stats database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options self.options = options
def start(self): def start(self):
# method only exists to match RethinkStatsDb if os.path.exists(self.file):
pass self.logger.info(
'opening existing stats database %s', self.file)
else:
self.logger.info(
'creating new stats database %s', self.file)
conn = sqlite3.connect(self.file)
conn.execute(
'create table if not exists buckets_of_stats ('
' bucket varchar(300) primary key,'
' stats varchar(4000)'
');')
conn.commit()
conn.close()
self.logger.info('created table buckets_of_stats in %s', self.file)
def stop(self): def stop(self):
self.close() pass
def close(self): def close(self):
self.db.close() pass
def sync(self): def sync(self):
try: pass
self.db.sync()
except:
pass
def value(self, bucket0="__all__", bucket1=None, bucket2=None): def value(self, bucket0="__all__", bucket1=None, bucket2=None):
# Gdbm wants str/bytes keys in python2, str/unicode keys in python3. conn = sqlite3.connect(self.file)
# This ugliness deals with keys that arrive as unicode in py2. cursor = conn.execute(
b0 = bucket0.encode("utf-8") if bucket0 and not isinstance(bucket0, str) else bucket0 'select stats from buckets_of_stats where bucket = ?',
b1 = bucket1.encode("utf-8") if bucket1 and not isinstance(bucket1, str) else bucket1 (bucket0,))
b2 = bucket2.encode("utf-8") if bucket2 and not isinstance(bucket2, str) else bucket2 result_tuple = cursor.fetchone()
conn.close()
if b0 in self.db: if result_tuple:
bucket0_stats = json.loads(self.db[b0].decode("utf-8")) bucket0_stats = json.loads(result_tuple[0])
if b1: if bucket1:
if b2: if bucket2:
return bucket0_stats[b1][b2] return bucket0_stats[bucket1][bucket2]
else: else:
return bucket0_stats[b1] return bucket0_stats[bucket1]
else: else:
return bucket0_stats return bucket0_stats
else: else:
@ -115,7 +115,7 @@ class StatsDb:
with key 'bucket' whose value is the name of the bucket. The other with key 'bucket' whose value is the name of the bucket. The other
currently recognized item is 'tally-domains', which if supplied should currently recognized item is 'tally-domains', which if supplied should
be a list of domains. This instructs warcprox to additionally tally be a list of domains. This instructs warcprox to additionally tally
substats of the given bucket by domain. Host stats are stored in the substats of the given bucket by domain. Host stats are stored in the
stats table under the key '{parent-bucket}:{domain(normalized)}'. stats table under the key '{parent-bucket}:{domain(normalized)}'.
Example Warcprox-Meta header (a real one will likely have other Example Warcprox-Meta header (a real one will likely have other
@ -150,14 +150,27 @@ class StatsDb:
return buckets return buckets
def tally(self, recorded_url, records): def tally(self, recorded_url, records):
conn = sqlite3.connect(self.file)
i = 0
for bucket in self.buckets(recorded_url): for bucket in self.buckets(recorded_url):
# Gdbm wants str/bytes keys in python2, str/unicode keys in python3. try:
# This ugliness deals with keys that arrive as unicode in py2. cursor = conn.execute(
b = bucket.encode("utf-8") if bucket and not isinstance(bucket, str) else bucket 'select stats from buckets_of_stats where bucket=?',
if b in self.db: (bucket,))
bucket_stats = json.loads(self.db[b].decode("utf-8")) except:
logging.info(
'i=%s bucket=%s self.file=%s', i, repr(bucket),
repr(self.file), exc_info=1)
raise
i += 1
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
bucket_stats = json.loads(result_tuple[0])
else: else:
bucket_stats = _empty_bucket(b) bucket_stats = _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size bucket_stats["total"]["wire_bytes"] += recorded_url.size
@ -169,7 +182,13 @@ class StatsDb:
bucket_stats["new"]["urls"] += 1 bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size bucket_stats["new"]["wire_bytes"] += recorded_url.size
self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
'insert or replace into buckets_of_stats(bucket, stats) '
'values (?, ?)', (bucket, json_value))
conn.commit()
conn.close()
class RethinkStatsDb(StatsDb): class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds""" """Updates database in batch every 2.0 seconds"""