diff --git a/.travis.yml b/.travis.yml index 3d02ebf..2f0dab6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,8 +18,6 @@ matrix: addons: apt: packages: - - python-gdbm - - python3-gdbm - tor services: diff --git a/README.rst b/README.rst index 766b0a6..2e0edb2 100644 --- a/README.rst +++ b/README.rst @@ -15,7 +15,7 @@ To install latest release run: :: - # apt-get install libffi-dev libssl-dev python3-gdbm + # apt-get install libffi-dev libssl-dev pip install warcprox You can also install the latest bleeding edge code: diff --git a/setup.py b/setup.py index 0f8ba4c..c86d831 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,6 @@ import sys import setuptools import setuptools.command.test -# special class needs to be added to support the pytest written dump-anydbm tests class PyTest(setuptools.command.test.test): def finalize_options(self): setuptools.command.test.test.finalize_options(self) @@ -68,7 +67,6 @@ setuptools.setup( 'warcprox=warcprox.main:main', ('warcprox-ensure-rethinkdb-tables=' 'warcprox.main:ensure_rethinkdb_tables'), - 'dump-anydbm=warcprox.dump_anydbm:main', ], }, zip_safe=False, diff --git a/tests/Dockerfile b/tests/Dockerfile index 8e57149..04c6d72 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -39,9 +39,8 @@ RUN mkdir -vp /etc/service/rethinkdb \ && chmod a+x /etc/service/rethinkdb/run RUN apt-get -y install git -RUN apt-get -y install python-gdbm python3-gdbm libpython2.7-dev \ - libpython3-dev libffi-dev libssl-dev python-setuptools \ - python3-setuptools +RUN apt-get -y install libpython2.7-dev libpython3-dev libffi-dev libssl-dev \ + python-setuptools python3-setuptools RUN apt-get -y install gcc RUN echo '57ff41e99cb01b6a1c2b0999161589b726f0ec8b /tmp/pip-9.0.1.tar.gz' > /tmp/sha1sums.txt diff --git a/tests/test_dump-anydbm.py b/tests/test_dump-anydbm.py deleted file mode 100644 index 1bc6ccc..0000000 --- a/tests/test_dump-anydbm.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python -# -# tests/test_dump-anydbm.py - tests for dump-anydbm -# -# Copyright (C) 2013-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# - -import pytest -import os -import tempfile -import subprocess # to access the script from shell -import sys -import glob -import distutils - -# will try as python 3 then default to python 2 modules -try: - import dbm - from dbm import ndbm - from dbm import gnu as gdbm - from dbm import dumb - - whichdb = dbm.whichdb - - ndbm_type = b"dbm.ndbm" - gdbm_type = b"dbm.gnu" - dumb_type = b"dbm.dumb" - -except: - import dbm as ndbm - import gdbm - import dumbdbm as dumb - - from whichdb import whichdb - - ndbm_type = b"dbm" - gdbm_type = b"gdbm" - dumb_type = b"dumbdbm" - -#global settings -key1 = 'very first key' -key2 = 'second key' -val1 = 'very first value' -val2 = 'second value' - -py = sys.executable -dump_anydbm_loc = distutils.spawn.find_executable("dump-anydbm") - -@pytest.fixture(scope="function") -def gdbm_test_db(request): - temp_file = tempfile.NamedTemporaryFile(delete=False) - print("creating test gdbm file {}".format(temp_file.name)) - test_db = gdbm.open(temp_file.name, "n") - test_db[key1] = val1 - test_db[key2] = val2 - test_db.close() - - def delete_gdbm_test_db(): - temp_file.close() - for f in glob.glob("{}*".format(temp_file.name)): - print("deleting test gdbm file {}".format(f)) - os.remove(f) - - request.addfinalizer(delete_gdbm_test_db) - return temp_file.name - - -@pytest.fixture(scope="function") -def ndbm_test_db(request): - temp_file = tempfile.NamedTemporaryFile(delete=False) - test_db = ndbm.open(temp_file.name, "n") - test_db[key1] = val1 - test_db[key2] = val2 - test_db.close() - - def delete_test_ndbm(): - temp_file.close() - for f in glob.glob("{}*".format(temp_file.name)): - print("deleting test ndbm file {}".format(f)) - os.remove(f) - - request.addfinalizer(delete_test_ndbm) - return temp_file.name - - -@pytest.fixture(scope="function") -def dumbdbm_test_db(request): - temp_file = tempfile.NamedTemporaryFile(delete=False) - print("creating test dumbdbm file {}".format(temp_file.name)) - test_db = dumb.open(temp_file.name, "n") - test_db[key1] = val1 - test_db[key2] = val2 - test_db.close() - - def delete_dumbdbm_test_db(): - temp_file.close() - for f in glob.glob("{}*".format(temp_file.name)): - print("deleting test dumbdbm file {}".format(f)) - os.remove(f) - - request.addfinalizer(delete_dumbdbm_test_db) - return temp_file.name - - -def test_dumpanydbm_identify_gdbm(gdbm_test_db): - print("running test_dumpanydbm_identify_gdbm") - output = subprocess.check_output([py, dump_anydbm_loc, gdbm_test_db]) - print("script printout: ") - print(output) - print("check_one: ") - print(gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nvery first key:very first value\nsecond key:second value\n') - - assert (output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or - output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nsecond key:second value\nvery first key:very first value\n') - - -def test_dumpanydbm_identify_ndbm(ndbm_test_db): - print("running test_dumpanydbm_identify_ndbm") - output = subprocess.check_output([py, dump_anydbm_loc, ndbm_test_db]) - print("script printout: ") - print(output) - print("check_one: ") - print(ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nvery first key:very first value\nsecond key:second value\n') - - assert (output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or - output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nsecond key:second value\nvery first key:very first value\n') - - -def test_dumpanydbm_identify_dumbdbm(dumbdbm_test_db): - print("running test_dumpanydbm_identify_dumbdbm") - - output = subprocess.check_output([py, dump_anydbm_loc, dumbdbm_test_db]) - print("script printout: ") - print(output) - print("check_one: ") - print(dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nvery first key:very first value\nsecond key:second value\n') - - assert (output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nvery first key:very first value\nsecond key:second value\n' or - output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nsecond key:second value\nvery first key:very first value\n') diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index c226ae3..91ba6c7 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -83,7 +83,7 @@ def _send(self, data): # http_client.HTTPConnection.send = _send logging.basicConfig( - stream=sys.stdout, level=logging.INFO, # level=warcprox.TRACE, + stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) @@ -322,9 +322,9 @@ def stats_db(request, rethinkdb_servers): logging.info('dropping rethinkdb database {}'.format(db)) result = sdb.rr.db_drop(db).run() logging.info("result=%s", result) - else: - logging.info('deleting file {}'.format(stats_db_file)) - os.unlink(stats_db_file) + # else: + # logging.info('deleting file {}'.format(stats_db_file)) + # os.unlink(stats_db_file) request.addfinalizer(fin) return sdb diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 3555ef4..4508e71 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,5 +1,5 @@ ''' -warcprox/dedup.py - identical payload digest deduplication +warcprox/dedup.py - identical payload digest deduplication using sqlite db Copyright (C) 2013-2017 Internet Archive @@ -27,61 +27,71 @@ import json from hanzo import warctools import warcprox import random +import sqlite3 +import threading class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") - def __init__(self, dbm_file='./warcprox-dedup.db', options=warcprox.Options()): - try: - import dbm.gnu as dbm_gnu - except ImportError: - try: - import gdbm as dbm_gnu - except ImportError: - import anydbm as dbm_gnu - - if os.path.exists(dbm_file): - self.logger.info('opening existing deduplication database {}'.format(dbm_file)) - else: - self.logger.info('creating new deduplication database {}'.format(dbm_file)) - - self.db = dbm_gnu.open(dbm_file, 'c') + def __init__( + self, file='./warcprox.sqlite', options=warcprox.Options()): + self.file = file self.options = options def start(self): - pass + if os.path.exists(self.file): + self.logger.info( + 'opening existing deduplication database %s', + self.file) + else: + self.logger.info( + 'creating new deduplication database %s', self.file) + + conn = sqlite3.connect(self.file) + conn.execute( + 'create table if not exists dedup (' + ' key varchar(300) primary key,' + ' value varchar(4000)' + ');') + conn.commit() + conn.close() def stop(self): - self.close() + pass def close(self): - self.db.close() + pass def sync(self): - try: - self.db.sync() - except: - pass + pass def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - key = digest_key + b"|" + bucket.encode("utf-8") + key = digest_key.decode('utf-8') + "|" + bucket py_value = {'id':record_id, 'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) - self.db[key] = json_value.encode('utf-8') + conn = sqlite3.connect(self.file) + conn.execute( + 'insert into dedup (key, value) values (?, ?);', + (key, json_value)) + conn.commit() + conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) def lookup(self, digest_key, bucket=""): result = None - key = digest_key + b"|" + bucket.encode("utf-8") - if key in self.db: - json_result = self.db[key] - result = json.loads(json_result.decode('utf-8')) + key = digest_key.decode('utf-8') + '|' + bucket + conn = sqlite3.connect(self.file) + cursor = conn.execute('select value from dedup where key = ?', (key,)) + result_tuple = cursor.fetchone() + conn.close() + if result_tuple: + result = json.loads(result_tuple[0]) result['id'] = result['id'].encode('latin1') result['url'] = result['url'].encode('latin1') result['date'] = result['date'].encode('latin1') @@ -91,10 +101,13 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, - self.options.base32) + digest_key = warcprox.digest_str( + recorded_url.response_recorder.payload_digest, + self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) + self.save( + digest_key, records[0], + bucket=recorded_url.warcprox_meta["captures-bucket"]) else: self.save(digest_key, records[0]) diff --git a/warcprox/dump_anydbm.py b/warcprox/dump_anydbm.py deleted file mode 100755 index 6de00c6..0000000 --- a/warcprox/dump_anydbm.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python -''' -dump-anydbm - dumps contents of dbm file to stdout - -Dump contents of database to stdout. Database can be any file that the anydbm -module can read. Included with warcprox because it's useful for inspecting a -deduplication database or a playback index database, but it is a generic tool. - -Copyright (C) 2013-2016 Internet Archive - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -USA. -''' - -try: - import dbm - from dbm import ndbm - whichdb = dbm.whichdb - -except: - import anydbm - dbm = anydbm - from whichdb import whichdb - -import sys -import os.path - -if __name__ == "__main__": - main() - -def main(): - if len(sys.argv) != 2: - sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0])) - exit(1) - - filename = sys.argv[1] - which = whichdb(filename) - - # if which returns none and the file does not exist, print usage line - if which == None and not os.path.exists(sys.argv[1]): - sys.stderr.write('No such file {}\n\n'.format(sys.argv[1])) - sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0])) - exit(1) - - # covers case where an ndbm is checked with its extension & identified incorrectly - elif 'bsd' in which: - correct_file = filename.split(".db")[0] - correct_which = whichdb(correct_file) - if correct_which in ('dbm', 'dbm.ndbm'): - filename = correct_file - which = correct_which - - elif which == '': - sys.stderr.write("{} is an unrecognized database type\n".format(sys.argv[1])) - sys.stderr.write("Try the file again by removing the extension\n") - exit(1) - - try: - out = sys.stdout.buffer - - except AttributeError: - out = sys.stdout - - out.write(filename.encode('UTF-8') + b' is a ' + which.encode('UTF-8') + b' db\n') - - db = dbm.open(filename, 'r') - for key in db.keys(): - out.write(key + b":" + db[key] + b"\n") diff --git a/warcprox/main.py b/warcprox/main.py index efba06d..c2f26b3 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -97,7 +97,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', action='append', help='only record requests with the given http method(s) (can be used more than once)') arg_parser.add_argument('--stats-db-file', dest='stats_db_file', - default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') arg_parser.add_argument('-P', '--playback-port', dest='playback_port', type=int, default=None, help='port to listen on for instant playback') arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', @@ -105,7 +105,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', - default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', diff --git a/warcprox/playback.py b/warcprox/playback.py index c244843..a913da7 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -2,7 +2,7 @@ warcprox/playback.py - rudimentary support for playback of urls archived by warcprox (not much used or maintained) -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -40,6 +40,7 @@ import traceback import re from warcprox.mitmproxy import MitmProxyHandler import warcprox +import sqlite3 class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -49,10 +50,9 @@ class PlaybackProxyHandler(MitmProxyHandler): # don't connect to any remote server! pass - # @Override def _proxy_request(self): - date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8')) + date, location = self.server.playback_index_db.lookup_latest(self.url) self.logger.debug('lookup_latest returned {}:{}'.format(date, location)) status = None @@ -82,7 +82,8 @@ class PlaybackProxyHandler(MitmProxyHandler): sz = len(headers) + len(payload) self.log_message('"%s" %s %s %s', - self.requestline, str(status), str(sz), repr(location) if location else '-') + self.requestline, str(status), str(sz), + repr(location) if location else '-') def _open_warc_at_offset(self, warcfilename, offset): @@ -99,7 +100,6 @@ class PlaybackProxyHandler(MitmProxyHandler): return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset) - def _send_response(self, headers, payload_fh): status = '-' m = re.match(br'^HTTP/\d\.\d (\d{3})', headers) @@ -118,8 +118,10 @@ class PlaybackProxyHandler(MitmProxyHandler): return status, sz - def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date): - location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to) + def _send_headers_and_refd_payload( + self, headers, refers_to, refers_to_target_uri, refers_to_date): + location = self.server.playback_index_db.lookup_exact( + refers_to_target_uri, refers_to_date, record_id=refers_to) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -174,12 +176,20 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) - refers_to = record.get_header(warctools.WarcRecord.REFERS_TO) - refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI) - refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE) + refers_to = record.get_header( + warctools.WarcRecord.REFERS_TO).decode('latin1') + refers_to_target_uri = record.get_header( + warctools.WarcRecord.REFERS_TO_TARGET_URI).decode( + 'latin1') + refers_to_date = record.get_header( + warctools.WarcRecord.REFERS_TO_DATE).decode('latin1') - self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri)) - return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date) + self.logger.debug( + 'revisit record references %s:%s capture of %s', + refers_to_date, refers_to, refers_to_target_uri) + return self._send_headers_and_refd_payload( + record.content[1], refers_to, refers_to_target_uri, + refers_to_date) else: # send it back raw, whatever it is @@ -220,30 +230,30 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): class PlaybackIndexDb(object): logger = logging.getLogger("warcprox.playback.PlaybackIndexDb") - def __init__(self, dbm_file='./warcprox-playback-index.db'): - try: - import dbm.gnu as dbm_gnu - except ImportError: - try: - import gdbm as dbm_gnu - except ImportError: - import anydbm as dbm_gnu + def __init__(self, file='./warcprox.sqlite'): + self.file = file - if os.path.exists(dbm_file): - self.logger.info('opening existing playback index database {}'.format(dbm_file)) + if os.path.exists(self.file): + self.logger.info( + 'opening existing playback index database %s', self.file) else: - self.logger.info('creating new playback index database {}'.format(dbm_file)) + self.logger.info( + 'creating new playback index database %s', self.file) - self.db = dbm_gnu.open(dbm_file, 'c') + conn = sqlite3.connect(self.file) + conn.execute( + 'create table if not exists playback (' + ' url varchar(4000) primary key,' + ' value varchar(4000)' + ');') + conn.commit() + conn.close() def close(self): - self.db.close() + pass def sync(self): - try: - self.db.sync() - except: - pass + pass def notify(self, recorded_url, records): self.save(records[0].warc_filename, records, records[0].offset) @@ -251,7 +261,7 @@ class PlaybackIndexDb(object): def save(self, warcfile, recordset, offset): response_record = recordset[0] # XXX canonicalize url? - url = response_record.get_header(warctools.WarcRecord.URL) + url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') @@ -259,9 +269,13 @@ class PlaybackIndexDb(object): # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} - if url in self.db: - existing_json_value = self.db[url].decode('utf-8') - py_value = json.loads(existing_json_value) + + conn = sqlite3.connect(self.file) + cursor = conn.execute( + 'select value from playback where url = ?', (url,)) + result_tuple = cursor.fetchone() + if result_tuple: + py_value = json.loads(result_tuple[0]) else: py_value = {} @@ -272,16 +286,25 @@ class PlaybackIndexDb(object): json_value = json.dumps(py_value, separators=(',',':')) - self.db[url] = json_value.encode('utf-8') + conn.execute( + 'insert or replace into playback (url, value) values (?, ?)', + (url, json_value)) + conn.commit() + conn.close() self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) - def lookup_latest(self, url): - if url not in self.db: + conn = sqlite3.connect(self.file) + cursor = conn.execute( + 'select value from playback where url = ?', (url,)) + result_tuple = cursor.fetchone() + conn.close() + + if not result_tuple: return None, None - json_value = self.db[url].decode('utf-8') + json_value = result_tuple[0] self.logger.debug("{}:{}".format(repr(url), repr(json_value))) py_value = json.loads(json_value) @@ -290,26 +313,33 @@ class PlaybackIndexDb(object): result['i'] = result['i'].encode('ascii') return latest_date, result - # in python3 params are bytes def lookup_exact(self, url, warc_date, record_id): - if url not in self.db: + conn = sqlite3.connect(self.file) + cursor = conn.execute( + 'select value from playback where url = ?', (url,)) + result_tuple = cursor.fetchone() + conn.close() + + if not result_tuple: return None - json_value = self.db[url].decode('utf-8') - self.logger.debug("{}:{}".format(repr(url), repr(json_value))) + json_value = result_tuple[0] + self.logger.debug("%s:%s", repr(url), repr(json_value)) py_value = json.loads(json_value) - warc_date_str = warc_date.decode('ascii') - - if warc_date_str in py_value: - for record in py_value[warc_date_str]: - if record['i'].encode('ascii') == record_id: - self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) - record['i'] = record['i'].encode('ascii') + if warc_date in py_value: + for record in py_value[warc_date]: + if record['i'] == record_id: + self.logger.debug( + "found exact match for (%s,%s,%s)", + repr(warc_date), repr(record_id), repr(url)) + record['i'] = record['i'] return record else: - self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) + self.logger.info( + "match not found for (%s,%s,%s)", repr(warc_date), + repr(record_id), repr(url)) return None diff --git a/warcprox/stats.py b/warcprox/stats.py index 4baf939..905b077 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -31,6 +31,7 @@ import threading import rethinkdb as r import datetime import urlcanon +import sqlite3 def _empty_bucket(bucket): return { @@ -52,53 +53,52 @@ def _empty_bucket(bucket): class StatsDb: logger = logging.getLogger("warcprox.stats.StatsDb") - def __init__(self, dbm_file='./warcprox-stats.db', options=warcprox.Options()): - try: - import dbm.gnu as dbm_gnu - except ImportError: - try: - import gdbm as dbm_gnu - except ImportError: - import anydbm as dbm_gnu - - if os.path.exists(dbm_file): - self.logger.info('opening existing stats database {}'.format(dbm_file)) - else: - self.logger.info('creating new stats database {}'.format(dbm_file)) - - self.db = dbm_gnu.open(dbm_file, 'c') + def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()): + self.file = file self.options = options def start(self): - # method only exists to match RethinkStatsDb - pass + if os.path.exists(self.file): + self.logger.info( + 'opening existing stats database %s', self.file) + else: + self.logger.info( + 'creating new stats database %s', self.file) + + conn = sqlite3.connect(self.file) + conn.execute( + 'create table if not exists buckets_of_stats (' + ' bucket varchar(300) primary key,' + ' stats varchar(4000)' + ');') + conn.commit() + conn.close() + + self.logger.info('created table buckets_of_stats in %s', self.file) def stop(self): - self.close() + pass def close(self): - self.db.close() + pass def sync(self): - try: - self.db.sync() - except: - pass + pass def value(self, bucket0="__all__", bucket1=None, bucket2=None): - # Gdbm wants str/bytes keys in python2, str/unicode keys in python3. - # This ugliness deals with keys that arrive as unicode in py2. - b0 = bucket0.encode("utf-8") if bucket0 and not isinstance(bucket0, str) else bucket0 - b1 = bucket1.encode("utf-8") if bucket1 and not isinstance(bucket1, str) else bucket1 - b2 = bucket2.encode("utf-8") if bucket2 and not isinstance(bucket2, str) else bucket2 - - if b0 in self.db: - bucket0_stats = json.loads(self.db[b0].decode("utf-8")) - if b1: - if b2: - return bucket0_stats[b1][b2] + conn = sqlite3.connect(self.file) + cursor = conn.execute( + 'select stats from buckets_of_stats where bucket = ?', + (bucket0,)) + result_tuple = cursor.fetchone() + conn.close() + if result_tuple: + bucket0_stats = json.loads(result_tuple[0]) + if bucket1: + if bucket2: + return bucket0_stats[bucket1][bucket2] else: - return bucket0_stats[b1] + return bucket0_stats[bucket1] else: return bucket0_stats else: @@ -115,7 +115,7 @@ class StatsDb: with key 'bucket' whose value is the name of the bucket. The other currently recognized item is 'tally-domains', which if supplied should be a list of domains. This instructs warcprox to additionally tally - substats of the given bucket by domain. Host stats are stored in the + substats of the given bucket by domain. Host stats are stored in the stats table under the key '{parent-bucket}:{domain(normalized)}'. Example Warcprox-Meta header (a real one will likely have other @@ -150,14 +150,27 @@ class StatsDb: return buckets def tally(self, recorded_url, records): + conn = sqlite3.connect(self.file) + + i = 0 for bucket in self.buckets(recorded_url): - # Gdbm wants str/bytes keys in python2, str/unicode keys in python3. - # This ugliness deals with keys that arrive as unicode in py2. - b = bucket.encode("utf-8") if bucket and not isinstance(bucket, str) else bucket - if b in self.db: - bucket_stats = json.loads(self.db[b].decode("utf-8")) + try: + cursor = conn.execute( + 'select stats from buckets_of_stats where bucket=?', + (bucket,)) + except: + logging.info( + 'i=%s bucket=%s self.file=%s', i, repr(bucket), + repr(self.file), exc_info=1) + raise + i += 1 + + result_tuple = cursor.fetchone() + cursor.close() + if result_tuple: + bucket_stats = json.loads(result_tuple[0]) else: - bucket_stats = _empty_bucket(b) + bucket_stats = _empty_bucket(bucket) bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size @@ -169,7 +182,13 @@ class StatsDb: bucket_stats["new"]["urls"] += 1 bucket_stats["new"]["wire_bytes"] += recorded_url.size - self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") + json_value = json.dumps(bucket_stats, separators=(',',':')) + conn.execute( + 'insert or replace into buckets_of_stats(bucket, stats) ' + 'values (?, ?)', (bucket, json_value)) + conn.commit() + + conn.close() class RethinkStatsDb(StatsDb): """Updates database in batch every 2.0 seconds"""