diff --git a/bin/dump-anydbm b/bin/dump-anydbm index f1f8a42..1d1ae4b 100755 --- a/bin/dump-anydbm +++ b/bin/dump-anydbm @@ -10,7 +10,9 @@ deduplication database or a playback index database, but it is a generic tool. try: import dbm + from dbm import ndbm whichdb = dbm.whichdb + except: import anydbm dbm = anydbm @@ -24,15 +26,36 @@ if __name__ == "__main__": sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0])) exit(1) - if not os.path.exists(sys.argv[1]): + filename = sys.argv[1] + which = whichdb(filename) + + # if which returns none and the file does not exist, print usage line + if which == None and not os.path.exists(sys.argv[1]): sys.stderr.write('No such file {}\n\n'.format(sys.argv[1])) sys.stderr.write("usage: {} DBM_FILE\n".format(sys.argv[0])) exit(1) - which = whichdb(sys.argv[1]) - print('{} is a {} db'.format(sys.argv[1], which)) + # covers case where an ndbm is checked with its extension & identified incorrectly + elif 'bsd' in which: + correct_file = filename.split(".db")[0] + correct_which = whichdb(correct_file) + if correct_which in ('dbm', 'dbm.ndbm'): + filename = correct_file + which = correct_which - db = dbm.open(sys.argv[1], 'r') + elif which == '': + sys.stderr.write("{} is an unrecognized database type\n".format(sys.argv[1])) + sys.stderr.write("Try the file again by removing the extension\n") + exit(1) + try: + out = sys.stdout.buffer + + except AttributeError: + out = sys.stdout + + out.write(filename.encode('UTF-8') + b' is a ' + which.encode('UTF-8') + b' db\n') + + db = dbm.open(filename, 'r') for key in db.keys(): - print("{}:{}".format(key, db[key])) + out.write(key + b":" + db[key] + b"\n") diff --git a/tox.ini b/tox.ini index 2403045..8bdedc6 100644 --- a/tox.ini +++ b/tox.ini @@ -7,5 +7,7 @@ envlist = py27, py32, py33 [testenv] -commands = {envpython} setup.py test - +commands = py.test +deps = + pytest + requests diff --git a/warcprox/tests/test_dump-anydbm.py b/warcprox/tests/test_dump-anydbm.py new file mode 100644 index 0000000..1fddefc --- /dev/null +++ b/warcprox/tests/test_dump-anydbm.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python + +import pytest +import os +import tempfile +import subprocess # to access the script from shell + +# will try as python 3 then default to python 2 modules +try: + import dbm + from dbm import ndbm + from dbm import gnu as gdbm + from dbm import dumb + + whichdb = dbm.whichdb + + ndbm_type = b"dbm.ndbm" + gdbm_type = b"dbm.gnu" + dumb_type = b"dbm.dumb" + +except: + import dbm as ndbm + import gdbm + import dumbdbm as dumb + + from whichdb import whichdb + + ndbm_type = b"dbm" + gdbm_type = b"gdbm" + dumb_type = b"dumbdbm" + +#global settings +key1 = 'very first key' +key2 = 'second key' +val1 = 'very first value' +val2 = 'second value' +dump_anydbm = "dump-anydbm" + + +@pytest.fixture(scope="function") +def gdbm_test_db(request): + print("creating test gdbm file") + temp_file = tempfile.NamedTemporaryFile() + test_db = gdbm.open(temp_file.name, "n") + test_db[key1] = val1 + test_db[key2] = val2 + test_db.close() + + def delete_gdbm_test_db(): + print("deleting test gdbm file") + temp_file.close() + + request.addfinalizer(delete_gdbm_test_db) + return temp_file.name + + +@pytest.fixture(scope="function") +def ndbm_test_db(request): + print("creating test ndbm file") + temp_file = tempfile.NamedTemporaryFile() + test_db = ndbm.open(temp_file.name, "n") + test_db[key1] = val1 + test_db[key2] = val2 + test_db.close() + + def delete_test_ndbm(): + print("deleting test ndbm file") + temp_file.close() + os.remove(temp_file.name + ".db") + + request.addfinalizer(delete_test_ndbm) + return temp_file.name + + +@pytest.fixture(scope="function") +def dumbdbm_test_db(request): + print("creating test dumbdbm file") + temp_file = tempfile.NamedTemporaryFile() + test_db = dumb.open(temp_file.name, "n") + test_db[key1] = val1 + test_db[key2] = val2 + test_db.close() + + def delete_dumbdbm_test_db(): + print("deleting test dumbdbm file") + temp_file.close() + os.remove(temp_file.name + ".dir") + os.remove(temp_file.name + ".bak") + os.remove(temp_file.name + ".dat") + + request.addfinalizer(delete_dumbdbm_test_db) + return temp_file.name + + +def test_dumpanydbm_identify_gdbm(gdbm_test_db): + print("running test_dumpanydbm_identify_gdbm") + output = subprocess.check_output([dump_anydbm, gdbm_test_db]) + print(b"script printout: \n" + output) + + assert (output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or + output == gdbm_test_db.encode(encoding='UTF-8') + b' is a ' + gdbm_type + b' db\nsecond key:second value\nvery first key:very first value\n') + + +def test_dumpanydbm_identify_ndbm(ndbm_test_db): + print("running test_dumpanydbm_identify_ndbm") + output = subprocess.check_output([dump_anydbm, ndbm_test_db]) + print(b"script printout: \n" + output) + + assert (output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nvery first key:very first value\nsecond key:second value\n' or + output == ndbm_test_db.encode(encoding='UTF-8') + b' is a ' + ndbm_type + b' db\nsecond key:second value\nvery first key:very first value\n') + + +def test_dumpanydbm_identify_dumbdbm(dumbdbm_test_db): + print("running test_dumpanydbm_identify_dumbdbm") + output = subprocess.check_output([dump_anydbm, dumbdbm_test_db]) + print(b"script printout: \n" + output) + + assert (output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nvery first key:very first value\nsecond key:second value\n' or + output == dumbdbm_test_db.encode(encoding='UTF-8') + b' is a ' + dumb_type + b' db\nsecond key:second value\nvery first key:very first value\n')