diff -U3 -r -N no-tests/tests/crash_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py
--- no-tests/tests/crash_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py 2017-05-29 16:44:06.615481956 +0200
@@ -0,0 +1,237 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+# This is not a test suite! More like a collection of triggers for previously
+# observed crashes. Want to contribute to py-lmdb? Please write a test suite!
+#
+# what happens when empty keys/ values passed to various funcs
+# incorrect types
+# try to break cpython arg parsing - too many/few/incorrect args
+# Various efforts to cause Python-level leaks.
+#
+
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import itertools
+import os
+import random
+import unittest
+
+import lmdb
+import testlib
+
+from testlib import B
+from testlib import O
+
+
+try:
+ next(iter([1]))
+except NameError: # Python2.5.
+ def next(it):
+ return it.next()
+
+
+class CrashTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ # Various efforts to cause segfaults.
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env()
+ with self.env.begin(write=True) as txn:
+ txn.put(B('dave'), B(''))
+ txn.put(B('dave2'), B(''))
+
+ def testOldCrash(self):
+ txn = self.env.begin()
+ dir(iter(txn.cursor()))
+
+ def testCloseWithTxn(self):
+ txn = self.env.begin(write=True)
+ self.env.close()
+ self.assertRaises(Exception, (lambda: list(txn.cursor())))
+
+ def testDoubleClose(self):
+ self.env.close()
+ self.env.close()
+
+ def testDbDoubleClose(self):
+ db = self.env.open_db(key=B('dave3'))
+ #db.close()
+ #db.close()
+
+ def testTxnCloseActiveIter(self):
+ with self.env.begin() as txn:
+ it = txn.cursor().iternext()
+ self.assertRaises(Exception, (lambda: list(it)))
+
+ def testDbCloseActiveIter(self):
+ db = self.env.open_db(key=B('dave3'))
+ with self.env.begin(write=True) as txn:
+ txn.put(B('a'), B('b'), db=db)
+ it = txn.cursor(db=db).iternext()
+ self.assertRaises(Exception, (lambda: list(it)))
+
+
+class IteratorTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env()
+ self.txn = self.env.begin(write=True)
+ self.c = self.txn.cursor()
+
+ def testEmpty(self):
+ self.assertEqual([], list(self.c))
+ self.assertEqual([], list(self.c.iternext()))
+ self.assertEqual([], list(self.c.iterprev()))
+
+ def testFilled(self):
+ testlib.putData(self.txn)
+ self.assertEqual(testlib.ITEMS, list(self.c))
+ self.assertEqual(testlib.ITEMS, list(self.c))
+ self.assertEqual(testlib.ITEMS, list(self.c.iternext()))
+ self.assertEqual(testlib.ITEMS[::-1], list(self.txn.cursor().iterprev()))
+ self.assertEqual(testlib.ITEMS[::-1], list(self.c.iterprev()))
+ self.assertEqual(testlib.ITEMS, list(self.c))
+
+ def testFilledSkipForward(self):
+ testlib.putData(self.txn)
+ self.c.set_range(B('b'))
+ self.assertEqual(testlib.ITEMS[1:], list(self.c))
+
+ def testFilledSkipReverse(self):
+ testlib.putData(self.txn)
+ self.c.set_range(B('b'))
+ self.assertEqual(testlib.REV_ITEMS[-2:], list(self.c.iterprev()))
+
+ def testFilledSkipEof(self):
+ testlib.putData(self.txn)
+ self.assertEqual(False, self.c.set_range(B('z')))
+ self.assertEqual(testlib.REV_ITEMS, list(self.c.iterprev()))
+
+
+class BigReverseTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ # Test for issue with MDB_LAST+MDB_PREV skipping chunks of database.
+ def test_big_reverse(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ keys = [B('%05d' % i) for i in range(0xffff)]
+ for k in keys:
+ txn.put(k, k, append=True)
+ assert list(txn.cursor().iterprev(values=False)) == list(reversed(keys))
+
+
+class MultiCursorDeleteTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env()
+
+ def test1(self):
+ """Ensure MDB_NEXT is ignored on `c1' when it was previously positioned
+ on the key that `c2' just deleted."""
+ txn = self.env.begin(write=True)
+ cur = txn.cursor()
+ while cur.first():
+ cur.delete()
+
+ for i in range(1, 10):
+ cur.put(O(ord('a') + i) * i, B(''))
+
+ c1 = txn.cursor()
+ c1f = c1.iternext(values=False)
+ while next(c1f) != B('ddd'):
+ pass
+ c2 = txn.cursor()
+ assert c2.set_key(B('ddd'))
+ c2.delete()
+ assert next(c1f) == B('eeee')
+
+ def test_monster(self):
+ # Generate predictable sequence of sizes.
+ rand = random.Random()
+ rand.seed(0)
+
+ txn = self.env.begin(write=True)
+ keys = []
+ for i in range(20000):
+ key = B('%06x' % i)
+ val = B('x' * rand.randint(76, 350))
+ assert txn.put(key, val)
+ keys.append(key)
+
+ deleted = 0
+ for key in txn.cursor().iternext(values=False):
+ assert txn.delete(key), key
+ deleted += 1
+
+ assert deleted == len(keys), deleted
+
+
+class TxnFullTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_17bf75b12eb94d9903cd62329048b146d5313bad(self):
+ """
+ me_txn0 previously cached MDB_TXN_ERROR permanently. Fixed by
+ 17bf75b12eb94d9903cd62329048b146d5313bad.
+ """
+ path, env = testlib.temp_env(map_size=4096*9, sync=False, max_spare_txns=0)
+ for i in itertools.count():
+ try:
+ with env.begin(write=True) as txn:
+ txn.put(B(str(i)), B(str(i)))
+ except lmdb.MapFullError:
+ break
+
+ # Should not crash with MDB_BAD_TXN:
+ with env.begin(write=True) as txn:
+ txn.delete(B('1'))
+
+
+class EmptyIterTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_python3_iternext_segfault(self):
+ # https://github.com/dw/py-lmdb/issues/105
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ cur = txn.cursor()
+ ite = cur.iternext()
+ nex = getattr(ite, 'next',
+ getattr(ite, '__next__', None))
+ assert nex is not None
+ self.assertRaises(StopIteration, nex)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff -U3 -r -N no-tests/tests/cursor_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py
--- no-tests/tests/cursor_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py 2017-05-29 16:44:06.615481956 +0200
@@ -0,0 +1,222 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+# test delete(dupdata)
+
+from __future__ import absolute_import
+from __future__ import with_statement
+import unittest
+
+import testlib
+from testlib import B
+from testlib import BT
+
+
+class ContextManagerTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_ok(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ with txn.cursor() as curs:
+ curs.put(B('foo'), B('123'))
+ self.assertRaises(Exception, lambda: curs.get(B('foo')))
+
+ def test_crash(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+
+ try:
+ with txn.cursor() as curs:
+ curs.put(123, 123)
+ except:
+ pass
+ self.assertRaises(Exception, lambda: curs.get(B('foo')))
+
+
+class CursorTestBase(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env()
+ self.txn = self.env.begin(write=True)
+ self.c = self.txn.cursor()
+
+
+class CursorTest(CursorTestBase):
+ def testKeyValueItemEmpty(self):
+ self.assertEqual(B(''), self.c.key())
+ self.assertEqual(B(''), self.c.value())
+ self.assertEqual(BT('', ''), self.c.item())
+
+ def testFirstLastEmpty(self):
+ self.assertEqual(False, self.c.first())
+ self.assertEqual(False, self.c.last())
+
+ def testFirstFilled(self):
+ testlib.putData(self.txn)
+ self.assertEqual(True, self.c.first())
+ self.assertEqual(testlib.ITEMS[0], self.c.item())
+
+ def testLastFilled(self):
+ testlib.putData(self.txn)
+ self.assertEqual(True, self.c.last())
+ self.assertEqual(testlib.ITEMS[-1], self.c.item())
+
+ def testSetKey(self):
+ self.assertRaises(Exception, (lambda: self.c.set_key(B(''))))
+ self.assertEqual(False, self.c.set_key(B('missing')))
+ testlib.putData(self.txn)
+ self.assertEqual(True, self.c.set_key(B('b')))
+ self.assertEqual(False, self.c.set_key(B('ba')))
+
+ def testSetRange(self):
+ self.assertEqual(False, self.c.set_range(B('x')))
+ testlib.putData(self.txn)
+ self.assertEqual(False, self.c.set_range(B('x')))
+ self.assertEqual(True, self.c.set_range(B('a')))
+ self.assertEqual(B('a'), self.c.key())
+ self.assertEqual(True, self.c.set_range(B('ba')))
+ self.assertEqual(B('baa'), self.c.key())
+ self.c.set_range(B(''))
+ self.assertEqual(B('a'), self.c.key())
+
+ def testDeleteEmpty(self):
+ self.assertEqual(False, self.c.delete())
+
+ def testDeleteFirst(self):
+ testlib.putData(self.txn)
+ self.assertEqual(False, self.c.delete())
+ self.c.first()
+ self.assertEqual(BT('a', ''), self.c.item())
+ self.assertEqual(True, self.c.delete())
+ self.assertEqual(BT('b', ''), self.c.item())
+ self.assertEqual(True, self.c.delete())
+ self.assertEqual(BT('baa', ''), self.c.item())
+ self.assertEqual(True, self.c.delete())
+ self.assertEqual(BT('d', ''), self.c.item())
+ self.assertEqual(True, self.c.delete())
+ self.assertEqual(BT('', ''), self.c.item())
+ self.assertEqual(False, self.c.delete())
+ self.assertEqual(BT('', ''), self.c.item())
+
+ def testDeleteLast(self):
+ testlib.putData(self.txn)
+ self.assertEqual(True, self.c.last())
+ self.assertEqual(BT('d', ''), self.c.item())
+ self.assertEqual(True, self.c.delete())
+ self.assertEqual(BT('', ''), self.c.item())
+ self.assertEqual(False, self.c.delete())
+ self.assertEqual(BT('', ''), self.c.item())
+
+ def testCount(self):
+ self.assertRaises(Exception, (lambda: self.c.count()))
+ testlib.putData(self.txn)
+ self.c.first()
+ # TODO: complete dup key support.
+ #self.assertEqual(1, self.c.count())
+
+ def testPut(self):
+ pass
+
+
+class PutmultiTest(CursorTestBase):
+ def test_empty_seq(self):
+ consumed, added = self.c.putmulti(())
+ assert consumed == added == 0
+
+ def test_2list(self):
+ l = [BT('a', ''), BT('a', '')]
+ consumed, added = self.c.putmulti(l)
+ assert consumed == added == 2
+
+ li = iter(l)
+ consumed, added = self.c.putmulti(li)
+ assert consumed == added == 2
+
+ def test_2list_preserve(self):
+ l = [BT('a', ''), BT('a', '')]
+ consumed, added = self.c.putmulti(l, overwrite=False)
+ assert consumed == 2
+ assert added == 1
+
+ assert self.c.set_key(B('a'))
+ assert self.c.delete()
+
+ li = iter(l)
+ consumed, added = self.c.putmulti(li, overwrite=False)
+ assert consumed == 2
+ assert added == 1
+
+ def test_bad_seq1(self):
+ self.assertRaises(Exception,
+ lambda: self.c.putmulti(range(2)))
+
+
+class ReplaceTest(CursorTestBase):
+ def test_replace(self):
+ assert None is self.c.replace(B('a'), B(''))
+ assert B('') == self.c.replace(B('a'), B('x'))
+ assert B('x') == self.c.replace(B('a'), B('y'))
+
+
+class ContextManagerTest(CursorTestBase):
+ def test_enter(self):
+ with self.c as c:
+ assert c is self.c
+ c.put(B('a'), B('a'))
+ assert c.get(B('a')) == B('a')
+ self.assertRaises(Exception,
+ lambda: c.get(B('a')))
+
+ def test_exit_success(self):
+ with self.txn.cursor() as c:
+ c.put(B('a'), B('a'))
+ self.assertRaises(Exception,
+ lambda: c.get(B('a')))
+
+ def test_exit_failure(self):
+ try:
+ with self.txn.cursor() as c:
+ c.put(B('a'), B('a'))
+ raise ValueError
+ except ValueError:
+ pass
+ self.assertRaises(Exception,
+ lambda: c.get(B('a')))
+
+ def test_close(self):
+ self.c.close()
+ self.assertRaises(Exception,
+ lambda: c.get(B('a')))
+
+ def test_double_close(self):
+ self.c.close()
+ self.c.close()
+ self.assertRaises(Exception,
+ lambda: self.c.put(B('a'), B('a')))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff -U3 -r -N no-tests/tests/env_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py
--- no-tests/tests/env_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py 2017-06-12 23:54:54.737797526 +0200
@@ -0,0 +1,845 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+from __future__ import absolute_import
+from __future__ import with_statement
+import os
+import signal
+import sys
+import unittest
+import weakref
+
+import testlib
+from testlib import B
+from testlib import BT
+from testlib import OCT
+from testlib import INT_TYPES
+from testlib import UnicodeType
+
+import lmdb
+
+
+NO_READERS = UnicodeType('(no active readers)\n')
+
+try:
+ PAGE_SIZE = os.sysconf(os.sysconf_names['SC_PAGE_SIZE'])
+except (AttributeError, KeyError, OSError):
+ PAGE_SIZE = 4096
+
+
+class VersionTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_version(self):
+ ver = lmdb.version()
+ assert len(ver) == 3
+ assert all(isinstance(i, INT_TYPES) for i in ver)
+ assert all(i >= 0 for i in ver)
+
+
+class OpenTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_bad_paths(self):
+ self.assertRaises(Exception,
+ lambda: lmdb.open('/doesnt/exist/at/all'))
+ self.assertRaises(Exception,
+ lambda: lmdb.open(testlib.temp_file()))
+
+ def test_ok_path(self):
+ path, env = testlib.temp_env()
+ assert os.path.exists(path)
+ assert os.path.exists(os.path.join(path, 'data.mdb'))
+ assert os.path.exists(os.path.join(path, 'lock.mdb'))
+ assert env.path() == path
+
+ def test_bad_size(self):
+ self.assertRaises(OverflowError,
+ lambda: testlib.temp_env(map_size=-123))
+
+ def test_tiny_size(self):
+ _, env = testlib.temp_env(map_size=10)
+ def txn():
+ with env.begin(write=True) as txn:
+ txn.put(B('a'), B('a'))
+ self.assertRaises(lmdb.MapFullError, txn)
+
+ def test_subdir_false_junk(self):
+ path = testlib.temp_file()
+ fp = open(path, 'wb')
+ fp.write(B('A' * 8192))
+ fp.close()
+ self.assertRaises(lmdb.InvalidError,
+ lambda: lmdb.open(path, subdir=False))
+
+ def test_subdir_false_ok(self):
+ path = testlib.temp_file(create=False)
+ _, env = testlib.temp_env(path, subdir=False)
+ assert os.path.exists(path)
+ assert os.path.isfile(path)
+ assert os.path.isfile(path + '-lock')
+ assert not env.flags()['subdir']
+
+ def test_subdir_true_noexist_nocreate(self):
+ path = testlib.temp_dir(create=False)
+ self.assertRaises(lmdb.Error,
+ lambda: testlib.temp_env(path, subdir=True, create=False))
+ assert not os.path.exists(path)
+
+ def test_subdir_true_noexist_create(self):
+ path = testlib.temp_dir(create=False)
+ path_, env = testlib.temp_env(path, subdir=True, create=True)
+ assert path_ == path
+ assert env.path() == path
+
+ def test_subdir_true_exist_nocreate(self):
+ path, env = testlib.temp_env()
+ assert lmdb.open(path, subdir=True, create=False).path() == path
+
+ def test_subdir_true_exist_create(self):
+ path, env = testlib.temp_env()
+ assert lmdb.open(path, subdir=True, create=True).path() == path
+
+ def test_readonly_false(self):
+ path, env = testlib.temp_env(readonly=False)
+ with env.begin(write=True) as txn:
+ txn.put(B('a'), B(''))
+ with env.begin() as txn:
+ assert txn.get(B('a')) == B('')
+ assert not env.flags()['readonly']
+
+ def test_readonly_true_noexist(self):
+ path = testlib.temp_dir(create=False)
+ # Open readonly missing store should fail.
+ self.assertRaises(lmdb.Error,
+ lambda: lmdb.open(path, readonly=True, create=True))
+ # And create=True should not have mkdir'd it.
+ assert not os.path.exists(path)
+
+ def test_readonly_true_exist(self):
+ path, env = testlib.temp_env()
+ env2 = lmdb.open(path, readonly=True)
+ assert env2.path() == path
+ # Attempting a write txn should fail.
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: env2.begin(write=True))
+ # Flag should be set.
+ assert env2.flags()['readonly']
+
+ def test_metasync(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(metasync=flag)
+ assert env.flags()['metasync'] == flag
+
+ def test_lock(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(lock=flag)
+ lock_path = os.path.join(path, 'lock.mdb')
+ assert env.flags()['lock'] == flag
+ assert flag == os.path.exists(lock_path)
+
+ def test_sync(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(sync=flag)
+ assert env.flags()['sync'] == flag
+
+ def test_map_async(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(map_async=flag)
+ assert env.flags()['map_async'] == flag
+
+ def test_mode_subdir_create(self):
+ if sys.platform == 'win32':
+ # Mode argument is ignored on Windows; see lmdb.h
+ return
+
+ oldmask = os.umask(0)
+ try:
+ for mode in OCT('777'), OCT('755'), OCT('700'):
+ path = testlib.temp_dir(create=False)
+ env = lmdb.open(path, subdir=True, create=True, mode=mode)
+ fmode = mode & ~OCT('111')
+ assert testlib.path_mode(path) == mode
+ assert testlib.path_mode(path+'/data.mdb') == fmode
+ assert testlib.path_mode(path+'/lock.mdb') == fmode
+ finally:
+ os.umask(oldmask)
+
+ def test_mode_subdir_nocreate(self):
+ if sys.platform == 'win32':
+ # Mode argument is ignored on Windows; see lmdb.h
+ return
+
+ oldmask = os.umask(0)
+ try:
+ for mode in OCT('777'), OCT('755'), OCT('700'):
+ path = testlib.temp_dir()
+ env = lmdb.open(path, subdir=True, create=False, mode=mode)
+ fmode = mode & ~OCT('111')
+ assert testlib.path_mode(path+'/data.mdb') == fmode
+ assert testlib.path_mode(path+'/lock.mdb') == fmode
+ finally:
+ os.umask(oldmask)
+
+ def test_readahead(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(readahead=flag)
+ assert env.flags()['readahead'] == flag
+
+ def test_writemap(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(writemap=flag)
+ assert env.flags()['writemap'] == flag
+
+ def test_meminit(self):
+ for flag in True, False:
+ path, env = testlib.temp_env(meminit=flag)
+ assert env.flags()['meminit'] == flag
+
+ def test_max_readers(self):
+ self.assertRaises(lmdb.InvalidParameterError,
+ lambda: testlib.temp_env(max_readers=0))
+ for val in 123, 234:
+ _, env = testlib.temp_env(max_readers=val)
+ assert env.info()['max_readers'] == val
+
+ def test_max_dbs(self):
+ self.assertRaises(OverflowError,
+ lambda: testlib.temp_env(max_dbs=-1))
+ for val in 0, 10, 20:
+ _, env = testlib.temp_env(max_dbs=val)
+ dbs = [env.open_db(B('db%d' % i)) for i in range(val)]
+ self.assertRaises(lmdb.DbsFullError,
+ lambda: env.open_db(B('toomany')))
+
+
+class SetMapSizeTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_invalid(self):
+ _, env = testlib.temp_env()
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.set_mapsize(999999))
+
+ def test_negative(self):
+ _, env = testlib.temp_env()
+ self.assertRaises(OverflowError,
+ lambda: env.set_mapsize(-2015))
+
+ def test_applied(self):
+ _, env = testlib.temp_env(map_size=PAGE_SIZE * 8)
+ assert env.info()['map_size'] == PAGE_SIZE * 8
+
+ env.set_mapsize(PAGE_SIZE * 16)
+ assert env.info()['map_size'] == PAGE_SIZE * 16
+
+
+class CloseTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_close(self):
+ _, env = testlib.temp_env()
+ # Attempting things should be ok.
+ txn = env.begin(write=True)
+ txn.put(B('a'), B(''))
+ cursor = txn.cursor()
+ list(cursor)
+ cursor.first()
+ it = iter(cursor)
+
+ env.close()
+ # Repeated calls are ignored:
+ env.close()
+ # Attempting to use invalid objects should crash.
+ self.assertRaises(Exception, lambda: txn.cursor())
+ self.assertRaises(Exception, lambda: txn.commit())
+ self.assertRaises(Exception, lambda: cursor.first())
+ self.assertRaises(Exception, lambda: list(it))
+ # Abort should be OK though.
+ txn.abort()
+ # Attempting to start new txn should crash.
+ self.assertRaises(Exception,
+ lambda: env.begin())
+
+
+class ContextManagerTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_ok(self):
+ path, env = testlib.temp_env()
+ with env as env_:
+ assert env_ is env
+ with env.begin() as txn:
+ txn.get(B('foo'))
+ self.assertRaises(Exception, lambda: env.begin())
+
+ def test_crash(self):
+ path, env = testlib.temp_env()
+ try:
+ with env as env_:
+ assert env_ is env
+ with env.begin() as txn:
+ txn.get(123)
+ except:
+ pass
+ self.assertRaises(Exception, lambda: env.begin())
+
+
+class InfoMethodsTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_path(self):
+ path, env = testlib.temp_env()
+ assert path == env.path()
+ assert isinstance(env.path(), UnicodeType)
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.path())
+
+ def test_stat(self):
+ _, env = testlib.temp_env()
+ stat = env.stat()
+ for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\
+ 'entries':
+ assert isinstance(stat[k], INT_TYPES), k
+ assert stat[k] >= 0
+
+ assert stat['entries'] == 0
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('b'))
+ txn.commit()
+ stat = env.stat()
+ assert stat['entries'] == 1
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.stat())
+
+ def test_info(self):
+ _, env = testlib.temp_env()
+ info = env.info()
+ for k in 'map_addr', 'map_size', 'last_pgno', 'last_txnid', \
+ 'max_readers', 'num_readers':
+ assert isinstance(info[k], INT_TYPES), k
+ assert info[k] >= 0
+
+ assert info['last_txnid'] == 0
+ txn = env.begin(write=True)
+ txn.put(B('a'), B(''))
+ txn.commit()
+ info = env.info()
+ assert info['last_txnid'] == 1
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.info())
+
+ def test_flags(self):
+ _, env = testlib.temp_env()
+ info = env.flags()
+ for k in 'subdir', 'readonly', 'metasync', 'sync', 'map_async',\
+ 'readahead', 'writemap':
+ assert isinstance(info[k], bool)
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.flags())
+
+ def test_max_key_size(self):
+ _, env = testlib.temp_env()
+ mks = env.max_key_size()
+ assert isinstance(mks, INT_TYPES)
+ assert mks > 0
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.max_key_size())
+
+ def test_max_readers(self):
+ _, env = testlib.temp_env()
+ mr = env.max_readers()
+ assert isinstance(mr, INT_TYPES)
+ assert mr > 0 and mr == env.info()['max_readers']
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.max_readers())
+
+ def test_readers(self):
+ _, env = testlib.temp_env(max_spare_txns=0)
+ r = env.readers()
+ assert isinstance(r, UnicodeType)
+ assert r == NO_READERS
+
+ rtxn = env.begin()
+ r2 = env.readers()
+ assert isinstance(env.readers(), UnicodeType)
+ assert env.readers() != r
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.readers())
+
+
+class OtherMethodsTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_copy(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('b'))
+ txn.commit()
+
+ dest_dir = testlib.temp_dir()
+ env.copy(dest_dir)
+ assert os.path.exists(dest_dir + '/data.mdb')
+
+ cenv = lmdb.open(dest_dir)
+ ctxn = cenv.begin()
+ assert ctxn.get(B('a')) == B('b')
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.copy(testlib.temp_dir()))
+
+ def test_copy_compact(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('b'))
+ txn.commit()
+
+ dest_dir = testlib.temp_dir()
+ env.copy(dest_dir, compact=True)
+ assert os.path.exists(dest_dir + '/data.mdb')
+
+ cenv = lmdb.open(dest_dir)
+ ctxn = cenv.begin()
+ assert ctxn.get(B('a')) == B('b')
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.copy(testlib.temp_dir()))
+
+ def test_copyfd(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('b'))
+ txn.commit()
+
+ dst_path = testlib.temp_file(create=False)
+ fp = open(dst_path, 'wb')
+ env.copyfd(fp.fileno())
+
+ dstenv = lmdb.open(dst_path, subdir=False)
+ dtxn = dstenv.begin()
+ assert dtxn.get(B('a')) == B('b')
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.copyfd(fp.fileno()))
+ fp.close()
+
+ def test_copyfd_compact(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('b'))
+ txn.commit()
+
+ dst_path = testlib.temp_file(create=False)
+ fp = open(dst_path, 'wb')
+ env.copyfd(fp.fileno(), compact=True)
+
+ dstenv = lmdb.open(dst_path, subdir=False)
+ dtxn = dstenv.begin()
+ assert dtxn.get(B('a')) == B('b')
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.copyfd(fp.fileno()))
+ fp.close()
+
+ def test_sync(self):
+ _, env = testlib.temp_env()
+ env.sync(False)
+ env.sync(True)
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.sync(False))
+
+ @staticmethod
+ def _test_reader_check_child(path):
+ """Function to run in child process since we can't use fork() on
+ win32."""
+ env = lmdb.open(path, max_spare_txns=0)
+ txn = env.begin()
+ os._exit(0)
+
+ def test_reader_check(self):
+ if sys.platform == 'win32':
+ # Stale writers are cleared automatically on Windows, see lmdb.h
+ return
+
+ path, env = testlib.temp_env(max_spare_txns=0)
+ rc = env.reader_check()
+ assert rc == 0
+
+ # We need to open a separate env since Transaction.abort() always calls
+ # reset for a read-only txn, the actual abort doesn't happen until
+ # __del__, when Transaction discovers there is no room for it on the
+ # freelist.
+ env1 = lmdb.open(path)
+ txn1 = env1.begin()
+ assert env.readers() != NO_READERS
+ assert env.reader_check() == 0
+
+ # Start a child, open a txn, then crash the child.
+ rc = os.spawnl(os.P_WAIT, sys.executable, sys.executable,
+ __file__, 'test_reader_check_child', path)
+
+ assert rc == 0
+ assert env.reader_check() == 1
+ assert env.reader_check() == 0
+ assert env.readers() != NO_READERS
+
+ txn1.abort()
+ env1.close()
+ assert env.readers() == NO_READERS
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.reader_check())
+
+
+class BeginTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_begin_closed(self):
+ _, env = testlib.temp_env()
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.begin())
+
+ def test_begin_readonly(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ # Read txn can't write.
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: txn.put(B('a'), B('')))
+ txn.abort()
+
+ def test_begin_write(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ # Write txn can write.
+ assert txn.put(B('a'), B(''))
+ txn.commit()
+
+ def test_bind_db(self):
+ _, env = testlib.temp_env()
+ main = env.open_db(None)
+ sub = env.open_db(B('db1'))
+
+ txn = env.begin(write=True, db=sub)
+ assert txn.put(B('b'), B('')) # -> sub
+ assert txn.put(B('a'), B(''), db=main) # -> main
+ txn.commit()
+
+ txn = env.begin()
+ assert txn.get(B('a')) == B('')
+ assert txn.get(B('b')) is None
+ assert txn.get(B('a'), db=sub) is None
+ assert txn.get(B('b'), db=sub) == B('')
+ txn.abort()
+
+ def test_parent_readonly(self):
+ _, env = testlib.temp_env()
+ parent = env.begin()
+ # Nonsensical.
+ self.assertRaises(lmdb.InvalidParameterError,
+ lambda: env.begin(parent=parent))
+
+ def test_parent(self):
+ _, env = testlib.temp_env()
+ parent = env.begin(write=True)
+ parent.put(B('a'), B('a'))
+
+ child = env.begin(write=True, parent=parent)
+ assert child.get(B('a')) == B('a')
+ assert child.put(B('a'), B('b'))
+ child.abort()
+
+ # put() should have rolled back
+ assert parent.get(B('a')) == B('a')
+
+ child = env.begin(write=True, parent=parent)
+ assert child.put(B('a'), B('b'))
+ child.commit()
+
+ # put() should be visible
+ assert parent.get(B('a')) == B('b')
+
+ def test_buffers(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True, buffers=True)
+ assert txn.put(B('a'), B('a'))
+ b = txn.get(B('a'))
+ assert b is not None
+ assert len(b) == 1
+ assert not isinstance(b, type(B('')))
+ txn.commit()
+
+ txn = env.begin(buffers=False)
+ b = txn.get(B('a'))
+ assert b is not None
+ assert len(b) == 1
+ assert isinstance(b, type(B('')))
+ txn.abort()
+
+
+class OpenDbTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_main(self):
+ _, env = testlib.temp_env()
+ # Start write txn, so we cause deadlock if open_db attempts txn.
+ txn = env.begin(write=True)
+ # Now get main DBI, we should already be open.
+ db = env.open_db(None)
+ # w00t, no deadlock.
+
+ flags = db.flags(txn)
+ assert not flags['reverse_key']
+ assert not flags['dupsort']
+ txn.abort()
+
+ def test_unicode(self):
+ _, env = testlib.temp_env()
+ assert env.open_db(B('myindex')) is not None
+ self.assertRaises(TypeError,
+ lambda: env.open_db(UnicodeType('myindex')))
+
+ def test_sub_notxn(self):
+ _, env = testlib.temp_env()
+ assert env.info()['last_txnid'] == 0
+ db1 = env.open_db(B('subdb1'))
+ assert env.info()['last_txnid'] == 1
+ db2 = env.open_db(B('subdb2'))
+ assert env.info()['last_txnid'] == 2
+
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.open_db('subdb3'))
+
+ def test_sub_rotxn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=False)
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: env.open_db(B('subdb'), txn=txn))
+
+ def test_sub_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ db1 = env.open_db(B('subdb1'), txn=txn)
+ db2 = env.open_db(B('subdb2'), txn=txn)
+ for db in db1, db2:
+ assert db.flags(txn) == {
+ 'dupfixed': False,
+ 'dupsort': False,
+ 'integerdup': False,
+ 'integerkey': False,
+ 'reverse_key': False,
+ }
+ txn.commit()
+
+ def test_reopen(self):
+ path, env = testlib.temp_env()
+ db1 = env.open_db(B('subdb1'))
+ env.close()
+ env = lmdb.open(path, max_dbs=10)
+ db1 = env.open_db(B('subdb1'))
+
+ FLAG_SETS = [
+ (flag, val)
+ for flag in (
+ 'reverse_key', 'dupsort', 'integerkey', 'integerdup', 'dupfixed'
+ )
+ for val in (True, False)
+ ]
+
+ def test_flags(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+
+ for flag, val in self.FLAG_SETS:
+ key = B('%s-%s' % (flag, val))
+ db = env.open_db(key, txn=txn, **{flag: val})
+ assert db.flags(txn)[flag] == val
+
+ txn.commit()
+ # Test flag persistence.
+ env.close()
+ env = lmdb.open(path, max_dbs=10)
+ txn = env.begin(write=True)
+
+ for flag, val in self.FLAG_SETS:
+ key = B('%s-%s' % (flag, val))
+ db = env.open_db(key, txn=txn)
+ assert db.flags(txn)[flag] == val
+
+ def test_readonly_env_main(self):
+ path, env = testlib.temp_env()
+ env.close()
+
+ env = lmdb.open(path, readonly=True)
+ db = env.open_db(None)
+
+ def test_readonly_env_sub_noexist(self):
+ # https://github.com/dw/py-lmdb/issues/109
+ path, env = testlib.temp_env()
+ env.close()
+
+ env = lmdb.open(path, max_dbs=10, readonly=True)
+ self.assertRaises(lmdb.NotFoundError,
+ lambda: env.open_db(B('node_schedules'), create=False))
+
+ def test_readonly_env_sub_eperm(self):
+ # https://github.com/dw/py-lmdb/issues/109
+ path, env = testlib.temp_env()
+ env.close()
+
+ env = lmdb.open(path, max_dbs=10, readonly=True)
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: env.open_db(B('node_schedules'), create=True))
+
+ def test_readonly_env_sub(self):
+ # https://github.com/dw/py-lmdb/issues/109
+ path, env = testlib.temp_env()
+ assert env.open_db(B('node_schedules')) is not None
+ env.close()
+
+ env = lmdb.open(path, max_dbs=10, readonly=True)
+ db = env.open_db(B('node_schedules'), create=False)
+ assert db is not None
+
+
+reader_count = lambda env: env.readers().count('\n') - 1
+
+class SpareTxnTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_none(self):
+ _, env = testlib.temp_env(max_spare_txns=0)
+ assert 0 == reader_count(env)
+
+ t1 = env.begin()
+ assert 1 == reader_count(env)
+
+ t2 = env.begin()
+ assert 2 == reader_count(env)
+
+ t1.abort()
+ del t1
+ assert 1 == reader_count(env)
+
+ t2.abort()
+ del t2
+ assert 0 == reader_count(env)
+
+ def test_one(self):
+ _, env = testlib.temp_env(max_spare_txns=1)
+ # 1 here, since CFFI uses a temporary reader during init.
+ assert 1 >= reader_count(env)
+
+ t1 = env.begin()
+ assert 1 == reader_count(env)
+
+ t2 = env.begin()
+ assert 2 == reader_count(env)
+
+ t1.abort()
+ del t1
+ assert 2 == reader_count(env) # 1 live, 1 cached
+
+ t2.abort()
+ del t2
+ assert 1 == reader_count(env) # 1 cached
+
+ t3 = env.begin()
+ assert 1 == reader_count(env) # 1 live
+
+ t3.abort()
+ del t3
+ assert 1 == reader_count(env) # 1 cached
+
+
+class LeakTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_open_unref_does_not_leak(self):
+ temp_dir = testlib.temp_dir()
+ env = lmdb.open(temp_dir)
+ ref = weakref.ref(env)
+ env = None
+ testlib.debug_collect()
+ assert ref() is None
+
+ def test_open_close_does_not_leak(self):
+ temp_dir = testlib.temp_dir()
+ env = lmdb.open(temp_dir)
+ env.close()
+ ref = weakref.ref(env)
+ env = None
+ testlib.debug_collect()
+ assert ref() is None
+
+ def test_weakref_callback_invoked_once(self):
+ temp_dir = testlib.temp_dir()
+ env = lmdb.open(temp_dir)
+ env.close()
+ count = [0]
+ def callback(ref):
+ count[0] += 1
+ ref = weakref.ref(env, callback)
+ env = None
+ testlib.debug_collect()
+ assert ref() is None
+ assert count[0] == 1
+
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1 and sys.argv[1] == 'test_reader_check_child':
+ OtherMethodsTest._test_reader_check_child(sys.argv[2])
+ else:
+ unittest.main()
diff -U3 -r -N no-tests/tests/iteration_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py
--- no-tests/tests/iteration_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py 2017-05-29 16:44:06.615481956 +0200
@@ -0,0 +1,257 @@
+#! /usr/bin/env python
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+# test delete(dupdata)
+
+from __future__ import absolute_import
+from __future__ import with_statement
+import unittest
+
+import testlib
+from testlib import B
+from testlib import BT
+from testlib import KEYS, ITEMS, KEYS2, ITEMS2
+from testlib import putData, putBigData
+
+
+class IterationTestBase(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env() # creates 10 databases
+ self.txn = self.env.begin(write=True)
+ putData(self.txn)
+ self.c = self.txn.cursor()
+ self.empty_entry = (B(''), B(''))
+
+ def matchList(self, ls_a, ls_b):
+ return all(map(lambda x, y: x == y, ls_a, ls_b))
+
+
+class IterationTestBase2(unittest.TestCase):
+ """ This puts more data than its predecessor"""
+
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env() # creates 10 databases
+ self.txn = self.env.begin(write=True)
+ putBigData(self.txn) # HERE!
+ self.c = self.txn.cursor()
+ self.empty_entry = ('', '')
+
+ def matchList(self, ls_a, ls_b):
+ return all(map(lambda x, y: x == y, ls_a, ls_b))
+
+
+class IterationTest(IterationTestBase):
+ def testFromStart(self):
+ # From start
+ self.c.first()
+ self.assertEqual(self.c.key(), KEYS[0]) # start of db
+ test_list = [i for i in iter(self.c)]
+ self.assertEqual(self.matchList(test_list, ITEMS), True)
+ self.assertEqual(self.c.item(), self.empty_entry) # end of db
+
+ def testFromStartWithIternext(self):
+ # From start with iternext
+ self.c.first()
+ self.assertEqual(self.c.key(), KEYS[0]) # start of db
+ test_list = [i for i in self.c.iternext()]
+ # remaining elements in db
+ self.assertEqual(self.matchList(test_list, ITEMS), True)
+ self.assertEqual(self.c.item(), self.empty_entry) # end of db
+
+ def testFromStartWithNext(self):
+ # From start with next
+ self.c.first()
+ self.assertEqual(self.c.key(), KEYS[0]) # start of db
+ test_list = []
+ while 1:
+ test_list.append(self.c.item())
+ if not self.c.next():
+ break
+ self.assertEqual(self.matchList(test_list, ITEMS), True)
+
+ def testFromExistentKeySetKey(self):
+ self.c.first()
+ self.c.set_key(KEYS[1])
+ self.assertEqual(self.c.key(), KEYS[1])
+ test_list = [i for i in self.c.iternext()]
+ self.assertEqual(self.matchList(test_list, ITEMS[1:]), True)
+
+ def testFromExistentKeySetRange(self):
+ self.c.first()
+ self.c.set_range(KEYS[1])
+ self.assertEqual(self.c.key(), KEYS[1])
+ test_list = [i for i in self.c.iternext()]
+ self.assertEqual(self.matchList(test_list, ITEMS[1:]), True)
+
+ def testFromNonExistentKeySetRange(self):
+ self.c.first()
+ self.c.set_range(B('c'))
+ self.assertEqual(self.c.key(), B('d'))
+ test_list = [i for i in self.c.iternext()]
+ test_items = [i for i in ITEMS if i[0] > B('c')]
+ self.assertEqual(self.matchList(test_list, test_items), True)
+
+ def testFromLastKey(self):
+ self.c.last()
+ self.assertEqual(self.c.key(), KEYS[-1])
+ test_list = [i for i in self.c.iternext()]
+ self.assertEqual(self.matchList(test_list, ITEMS[-1:]), True)
+
+ def testFromNonExistentKeyPastEnd(self):
+ self.c.last()
+ self.assertEqual(self.c.key(), KEYS[-1])
+ # next() fails, leaving iterator in an unpositioned state.
+ self.c.next()
+ self.assertEqual(self.c.item(), self.empty_entry)
+ # iternext() from an unpositioned state proceeds from start of DB.
+ test_list = list(self.c.iternext())
+ self.assertEqual(test_list, ITEMS)
+
+
+class ReverseIterationTest(IterationTestBase):
+ def testFromStartRev(self):
+ # From start
+ self.c.first()
+ self.assertEqual(self.c.key(), KEYS[0]) # start of db
+ test_list = [i for i in self.c.iterprev()]
+ self.assertEqual(self.matchList(test_list, ITEMS[:1][::-1]), True)
+ self.assertEqual(self.c.item(), self.empty_entry) # very start of db
+
+ def testFromExistentKeySetKeyRev(self):
+ self.c.first()
+ self.c.set_key(KEYS[2])
+ self.assertEqual(self.c.key(), KEYS[2])
+ test_list = [i for i in self.c.iterprev()]
+ self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True)
+
+ def testFromExistentKeySetRangeRev(self):
+ self.c.first()
+ self.c.set_range(KEYS[2])
+ self.assertEqual(self.c.key(), KEYS[2])
+ test_list = [i for i in self.c.iterprev()]
+ self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True)
+
+ def testFromNonExistentKeySetRangeRev(self):
+ self.c.first()
+ self.c.set_range(B('c'))
+ self.assertEqual(self.c.key(), B('d'))
+ test_list = [i for i in self.c.iterprev()]
+ test_items = [i for i in ITEMS if i[0] <= B('d')]
+ test_items = test_items[::-1]
+ self.assertEqual(self.matchList(test_list, test_items), True)
+
+ def testFromLastKeyRev(self):
+ self.c.last()
+ self.assertEqual(self.c.key(), KEYS[-1])
+ test_list = [i for i in self.c.iterprev()]
+ self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True)
+
+ def testFromLastKeyWithPrevRev(self):
+ self.c.last()
+ self.assertEqual(self.c.key(), KEYS[-1]) # end of db
+ test_list = []
+ while 1:
+ test_list.append(self.c.item())
+ if not self.c.prev():
+ break
+ self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True)
+
+ def testFromNonExistentKeyPastEndRev(self):
+ self.c.first()
+ self.assertEqual(self.c.key(), KEYS[0])
+ # prev() fails, leaving iterator in an unpositioned state.
+ self.c.prev()
+ self.assertEqual(self.c.item(), self.empty_entry)
+ # iterprev() from an unpositioned state proceeds from end of DB.
+ test_list = list(self.c.iterprev())
+ self.assertEqual(test_list, ITEMS[::-1])
+
+class IterationTestWithDupsBase(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def setUp(self):
+ self.path, self.env = testlib.temp_env()
+ db = self.env.open_db(B('db1'), dupsort=True)
+ self.txn = self.env.begin(db, write=True)
+ for _ in range(2):
+ putData(self.txn)
+ self.c = self.txn.cursor()
+ self.empty_entry = ('', '')
+
+ def matchList(self, ls_a, ls_b):
+ return all(map(lambda x, y: x == y, ls_a, ls_b))
+
+
+class IterationTestWithDups(IterationTestWithDupsBase):
+ pass
+
+
+class SeekIterationTest(IterationTestBase2):
+ def testForwardIterationSeek(self):
+ self.c.first()
+ test_list = []
+ for i in self.c.iternext():
+ test_list.append(i)
+ # skips d and e
+ if self.c.key() == B('baa'):
+ self.c.set_key(B('e'))
+ test_item = [i for i in ITEMS2 if i[0] not in (B('d'), B('e'))]
+ self.assertEqual(test_list, test_item)
+
+ def testPutDuringIteration(self):
+ self.c.first()
+ test_list = []
+ c = self.txn.cursor()
+ for i in c.iternext():
+ test_list.append(i)
+ # adds 'i' upon seeing 'e'
+ if c.key() == B('e'):
+ self.c.put(B('i'), B(''))
+ test_item = ITEMS2 + [(B('i'), B(''))]
+ self.assertEqual(test_list, test_item)
+
+ def testDeleteDuringIteration(self):
+ self.c.first()
+ test_list = []
+ for i in self.c.iternext():
+ # deletes 'e' upon seeing it
+ if self.c.key() == B('e'):
+ # Causes 'e' to be deleted, and advances cursor to next
+ # element.
+ self.c.delete()
+ i = self.c.item()
+ test_list.append(i)
+
+ test_item = [i for i in ITEMS2 if i[0] != B('e')]
+ self.assertEqual(test_list, test_item)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff -U3 -r -N no-tests/tests/package_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py
--- no-tests/tests/package_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py 2017-05-29 16:44:06.615481956 +0200
@@ -0,0 +1,68 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+from __future__ import absolute_import
+import unittest
+
+import lmdb
+
+
+class PackageExportsTest(unittest.TestCase):
+ """
+ Ensure the list of exported names matches a predefined list. Designed to
+ ensure future interface changes to cffi.py and cpython.c don't break
+ consistency of "from lmdb import *".
+ """
+ def test_exports(self):
+ assert sorted(lmdb.__all__) == [
+ 'BadDbiError',
+ 'BadRslotError',
+ 'BadTxnError',
+ 'BadValsizeError',
+ 'CorruptedError',
+ 'Cursor',
+ 'CursorFullError',
+ 'DbsFullError',
+ 'DiskError',
+ 'Environment',
+ 'Error',
+ 'IncompatibleError',
+ 'InvalidError',
+ 'InvalidParameterError',
+ 'KeyExistsError',
+ 'LockError',
+ 'MapFullError',
+ 'MapResizedError',
+ 'MemoryError',
+ 'NotFoundError',
+ 'PageFullError',
+ 'PageNotFoundError',
+ 'PanicError',
+ 'ReadersFullError',
+ 'ReadonlyError',
+ 'TlsFullError',
+ 'Transaction',
+ 'TxnFullError',
+ 'VersionMismatchError',
+ 'enable_drop_gil',
+ 'version',
+ ]
diff -U3 -r -N no-tests/tests/testlib.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py
--- no-tests/tests/testlib.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py 2017-05-29 16:44:06.615481956 +0200
@@ -0,0 +1,154 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+from __future__ import absolute_import
+import atexit
+import gc
+import os
+import shutil
+import stat
+import sys
+import tempfile
+import traceback
+
+try:
+ import __builtin__
+except ImportError:
+ import builtins as __builtin__
+
+import lmdb
+
+
+_cleanups = []
+
+def cleanup():
+ while _cleanups:
+ func = _cleanups.pop()
+ try:
+ func()
+ except Exception:
+ traceback.print_exc()
+
+atexit.register(cleanup)
+
+
+def temp_dir(create=True):
+ path = tempfile.mkdtemp(prefix='lmdb_test')
+ assert path is not None, 'tempfile.mkdtemp failed'
+ if not create:
+ os.rmdir(path)
+ _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True))
+ if hasattr(path, 'decode'):
+ path = path.decode(sys.getfilesystemencoding())
+ return path
+
+
+def temp_file(create=True):
+ fd, path = tempfile.mkstemp(prefix='lmdb_test')
+ assert path is not None, 'tempfile.mkstemp failed'
+ os.close(fd)
+ if not create:
+ os.unlink(path)
+ _cleanups.append(lambda: os.path.exists(path) and os.unlink(path))
+ pathlock = path + '-lock'
+ _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock))
+ if hasattr(path, 'decode'):
+ path = path.decode(sys.getfilesystemencoding())
+ return path
+
+
+def temp_env(path=None, max_dbs=10, **kwargs):
+ if not path:
+ path = temp_dir()
+ env = lmdb.open(path, max_dbs=max_dbs, **kwargs)
+ _cleanups.append(env.close)
+ return path, env
+
+
+def path_mode(path):
+ return stat.S_IMODE(os.stat(path).st_mode)
+
+
+def debug_collect():
+ if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'):
+ old = gc.get_debug()
+ gc.set_debug(gc.DEBUG_LEAK)
+ gc.collect()
+ gc.set_debug(old)
+ else:
+ for x in range(10):
+ # PyPy doesn't collect objects with __del__ on first attempt.
+ gc.collect()
+
+
+# Handle moronic Python >=3.0 <3.3.
+UnicodeType = getattr(__builtin__, 'unicode', str)
+BytesType = getattr(__builtin__, 'bytes', str)
+
+
+try:
+ INT_TYPES = (int, long)
+except NameError:
+ INT_TYPES = (int,)
+
+# B(ascii 'string') -> bytes
+try:
+ bytes('') # Python>=2.6, alias for str().
+ B = lambda s: s
+except TypeError: # Python3.x, requires encoding parameter.
+ B = lambda s: bytes(s, 'ascii')
+except NameError: # Python<=2.5.
+ B = lambda s: s
+
+# BL('s1', 's2') -> ['bytes1', 'bytes2']
+BL = lambda *args: list(map(B, args))
+# TS('s1', 's2') -> ('bytes1', 'bytes2')
+BT = lambda *args: tuple(B(s) for s in args)
+# O(int) -> length-1 bytes
+O = lambda arg: B(chr(arg))
+# OCT(s) -> parse string as octal
+OCT = lambda s: int(s, 8)
+
+
+KEYS = BL('a', 'b', 'baa', 'd')
+ITEMS = [(k, B('')) for k in KEYS]
+REV_ITEMS = ITEMS[::-1]
+VALUES = [B('') for k in KEYS]
+
+KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h')
+ITEMS2 = [(k, B('')) for k in KEYS2]
+REV_ITEMS2 = ITEMS2[::-1]
+VALUES2 = [B('') for k in KEYS2]
+
+def putData(t, db=None):
+ for k, v in ITEMS:
+ if db:
+ t.put(k, v, db=db)
+ else:
+ t.put(k, v)
+
+def putBigData(t, db=None):
+ for k, v in ITEMS2:
+ if db:
+ t.put(k, v, db=db)
+ else:
+ t.put(k, v)
diff -U3 -r -N no-tests/tests/tool_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py
--- no-tests/tests/tool_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py 2017-05-29 16:44:06.616481955 +0200
@@ -0,0 +1,37 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+from __future__ import absolute_import
+import unittest
+
+import lmdb
+import lmdb.tool
+
+
+class ToolTest(unittest.TestCase):
+ def test_ok(self):
+ # For now, simply ensure the module can be compiled (3.x compat).
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
diff -U3 -r -N no-tests/tests/txn_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py
--- no-tests/tests/txn_test.py 1970-01-01 01:00:00.000000000 +0100
+++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py 2017-05-29 16:44:06.616481955 +0200
@@ -0,0 +1,538 @@
+#
+# Copyright 2013 The py-lmdb authors, all rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted only as authorized by the OpenLDAP
+# Public License.
+#
+# A copy of this license is available in the file LICENSE in the
+# top-level directory of the distribution or, alternatively, at
+# <http://www.OpenLDAP.org/license.html>.
+#
+# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
+#
+# Individual files and/or contributed packages may be copyright by
+# other parties and/or subject to additional restrictions.
+#
+# This work also contains materials derived from public sources.
+#
+# Additional information about OpenLDAP can be obtained at
+# <http://www.openldap.org/>.
+#
+
+from __future__ import absolute_import
+from __future__ import with_statement
+import struct
+import unittest
+import weakref
+
+import testlib
+from testlib import B
+from testlib import BT
+from testlib import OCT
+from testlib import INT_TYPES
+from testlib import BytesType
+from testlib import UnicodeType
+
+import lmdb
+
+
+UINT_0001 = struct.pack('I', 1)
+UINT_0002 = struct.pack('I', 2)
+ULONG_0001 = struct.pack('L', 1) # L != size_t
+ULONG_0002 = struct.pack('L', 2) # L != size_t
+
+
+class InitTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_closed(self):
+ _, env = testlib.temp_env()
+ env.close()
+ self.assertRaises(Exception,
+ lambda: lmdb.Transaction(env))
+
+ def test_readonly(self):
+ _, env = testlib.temp_env()
+ txn = lmdb.Transaction(env)
+ # Read txn can't write.
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: txn.put(B('a'), B('')))
+ txn.abort()
+
+ def test_begin_write(self):
+ _, env = testlib.temp_env()
+ txn = lmdb.Transaction(env, write=True)
+ # Write txn can write.
+ assert txn.put(B('a'), B(''))
+ txn.commit()
+
+ def test_bind_db(self):
+ _, env = testlib.temp_env()
+ main = env.open_db(None)
+ sub = env.open_db(B('db1'))
+
+ txn = lmdb.Transaction(env, write=True, db=sub)
+ assert txn.put(B('b'), B('')) # -> sub
+ assert txn.put(B('a'), B(''), db=main) # -> main
+ txn.commit()
+
+ txn = lmdb.Transaction(env)
+ assert txn.get(B('a')) == B('')
+ assert txn.get(B('b')) is None
+ assert txn.get(B('a'), db=sub) is None
+ assert txn.get(B('b'), db=sub) == B('')
+ txn.abort()
+
+ def test_bind_db_methods(self):
+ _, env = testlib.temp_env()
+ maindb = env.open_db(None)
+ db1 = env.open_db(B('d1'))
+ txn = lmdb.Transaction(env, write=True, db=db1)
+ assert txn.put(B('a'), B('d1'))
+ assert txn.get(B('a'), db=db1) == B('d1')
+ assert txn.get(B('a'), db=maindb) is None
+ assert txn.replace(B('a'), B('d11')) == B('d1')
+ assert txn.pop(B('a')) == B('d11')
+ assert txn.put(B('a'), B('main'), db=maindb, overwrite=False)
+ assert not txn.delete(B('a'))
+ txn.abort()
+
+ def test_parent_readonly(self):
+ _, env = testlib.temp_env()
+ parent = lmdb.Transaction(env)
+ # Nonsensical.
+ self.assertRaises(lmdb.InvalidParameterError,
+ lambda: lmdb.Transaction(env, parent=parent))
+
+ def test_parent(self):
+ _, env = testlib.temp_env()
+ parent = lmdb.Transaction(env, write=True)
+ parent.put(B('a'), B('a'))
+
+ child = lmdb.Transaction(env, write=True, parent=parent)
+ assert child.get(B('a')) == B('a')
+ assert child.put(B('a'), B('b'))
+ child.abort()
+
+ # put() should have rolled back
+ assert parent.get(B('a')) == B('a')
+
+ child = lmdb.Transaction(env, write=True, parent=parent)
+ assert child.put(B('a'), B('b'))
+ child.commit()
+
+ # put() should be visible
+ assert parent.get(B('a')) == B('b')
+
+ def test_buffers(self):
+ _, env = testlib.temp_env()
+ txn = lmdb.Transaction(env, write=True, buffers=True)
+ assert txn.put(B('a'), B('a'))
+ b = txn.get(B('a'))
+ assert b is not None
+ assert len(b) == 1
+ assert not isinstance(b, type(B('')))
+ txn.commit()
+
+ txn = lmdb.Transaction(env, buffers=False)
+ b = txn.get(B('a'))
+ assert b is not None
+ assert len(b) == 1
+ assert isinstance(b, type(B('')))
+ txn.abort()
+
+
+class ContextManagerTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_ok(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ with txn as txn_:
+ assert txn is txn_
+ txn.put(B('foo'), B('123'))
+
+ self.assertRaises(Exception, lambda: txn.get(B('foo')))
+ with env.begin() as txn:
+ assert txn.get(B('foo')) == B('123')
+
+ def test_crash(self):
+ path, env = testlib.temp_env()
+ txn = env.begin(write=True)
+
+ try:
+ with txn as txn_:
+ txn.put(B('foo'), B('123'))
+ txn.put(123, 123)
+ except:
+ pass
+
+ self.assertRaises(Exception, lambda: txn.get(B('foo')))
+ with env.begin() as txn:
+ assert txn.get(B('foo')) is None
+
+
+class IdTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_readonly_new(self):
+ _, env = testlib.temp_env()
+ with env.begin() as txn:
+ assert txn.id() == 0
+
+ def test_write_new(self):
+ _, env = testlib.temp_env()
+ with env.begin(write=True) as txn:
+ assert txn.id() == 1
+
+ def test_readonly_after_write(self):
+ _, env = testlib.temp_env()
+ with env.begin(write=True) as txn:
+ txn.put(B('a'), B('a'))
+ with env.begin() as txn:
+ assert txn.id() == 1
+
+ def test_invalid_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ txn.abort()
+ self.assertRaises(Exception, lambda: txn.id())
+
+
+class StatTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_stat(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'))
+ db2 = env.open_db(B('db2'))
+
+ txn = lmdb.Transaction(env)
+ for db in db1, db2:
+ stat = txn.stat(db)
+ for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\
+ 'entries':
+ assert isinstance(stat[k], INT_TYPES), k
+ assert stat[k] >= 0
+ assert stat['entries'] == 0
+
+ txn = lmdb.Transaction(env, write=True)
+ txn.put(B('a'), B('b'), db=db1)
+ txn.commit()
+
+ txn = lmdb.Transaction(env)
+ stat = txn.stat(db1)
+ assert stat['entries'] == 1
+
+ stat = txn.stat(db2)
+ assert stat['entries'] == 0
+
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: env.stat(db1))
+ env.close()
+ self.assertRaises(Exception,
+ lambda: env.stat(db1))
+
+
+class DropTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_empty(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'))
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('a'), db=db1)
+ assert txn.get(B('a'), db=db1) == B('a')
+ txn.drop(db1, False)
+ assert txn.get(B('a')) is None
+ txn.drop(db1, False) # should succeed.
+ assert txn.get(B('a')) is None
+
+ def test_delete(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'))
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('a'), db=db1)
+ txn.drop(db1)
+ self.assertRaises(lmdb.InvalidParameterError,
+ lambda: txn.get(B('a'), db=db1))
+ self.assertRaises(lmdb.InvalidParameterError,
+ lambda: txn.drop(db1))
+
+
+class CommitTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_bad_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: txn.commit())
+
+ def test_bad_env(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ env.close()
+ self.assertRaises(Exception,
+ lambda: txn.commit())
+
+ def test_commit_ro(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ txn.commit()
+ self.assertRaises(Exception,
+ lambda: txn.commit())
+
+ def test_commit_rw(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ assert txn.put(B('a'), B('a'))
+ txn.commit()
+ self.assertRaises(Exception,
+ lambda: txn.commit())
+ txn = env.begin()
+ assert txn.get(B('a')) == B('a')
+ txn.abort()
+
+
+class AbortTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_abort_ro(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ assert txn.get(B('a')) is None
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: txn.get(B('a')))
+ env.close()
+ self.assertRaises(Exception,
+ lambda: txn.get(B('a')))
+
+ def test_abort_rw(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ assert txn.put(B('a'), B('a'))
+ txn.abort()
+ txn = env.begin()
+ assert txn.get(B('a')) is None
+
+
+class GetTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_bad_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: txn.get(B('a')))
+
+ def test_bad_env(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ env.close()
+ self.assertRaises(Exception,
+ lambda: txn.get(B('a')))
+
+ def test_missing(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ assert txn.get(B('a')) is None
+ assert txn.get(B('a'), default='default') is 'default'
+
+ def test_empty_key(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ self.assertRaises(lmdb.BadValsizeError,
+ lambda: txn.get(B('')))
+
+ def test_db(self):
+ _, env = testlib.temp_env()
+ maindb = env.open_db(None)
+ db1 = env.open_db(B('db1'))
+
+ txn = env.begin()
+ assert txn.get(B('a'), db=db1) is None
+ txn.abort()
+
+ txn = env.begin(write=True)
+ txn.put(B('a'), B('a'), db=db1)
+ txn.commit()
+
+ txn = env.begin()
+ assert txn.get(B('a')) is None
+ txn.abort()
+
+ txn = env.begin(db=db1)
+ assert txn.get(B('a')) == B('a')
+ assert txn.get(B('a'), db=maindb) is None
+
+ def test_buffers_no(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ assert txn.put(B('a'), B('a'))
+ assert type(txn.get(B('a'))) is BytesType
+
+ def test_buffers_yes(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True, buffers=True)
+ assert txn.put(B('a'), B('a'))
+ assert type(txn.get(B('a'))) is not BytesType
+
+ def test_dupsort(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'), dupsort=True)
+ txn = env.begin(write=True, db=db1)
+ assert txn.put(B('a'), B('a'))
+ assert txn.put(B('a'), B('b'))
+ assert txn.get(B('a')) == B('a')
+
+ def test_integerkey(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'), integerkey=True)
+ txn = env.begin(write=True, db=db1)
+ assert txn.put(UINT_0001, B('a'))
+ assert txn.put(UINT_0002, B('b'))
+ assert txn.get(UINT_0001) == B('a')
+ assert txn.get(UINT_0002) == B('b')
+
+ def test_integerdup(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'), dupsort=True, integerdup=True)
+ txn = env.begin(write=True, db=db1)
+ assert txn.put(UINT_0001, UINT_0002)
+ assert txn.put(UINT_0001, UINT_0001)
+ assert txn.get(UINT_0001) == UINT_0001
+
+ def test_dupfixed(self):
+ _, env = testlib.temp_env()
+ db1 = env.open_db(B('db1'), dupsort=True, dupfixed=True)
+ txn = env.begin(write=True, db=db1)
+ assert txn.put(B('a'), B('a'))
+ assert txn.put(B('a'), B('b'))
+ assert txn.get(B('a')) == B('a')
+
+
+class PutTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_bad_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: txn.put(B('a'), B('a')))
+
+ def test_bad_env(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ env.close()
+ self.assertRaises(Exception,
+ lambda: txn.put(B('a'), B('a')))
+
+ def test_ro_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: txn.put(B('a'), B('a')))
+
+ def test_empty_key_value(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ self.assertRaises(lmdb.BadValsizeError,
+ lambda: txn.put(B(''), B('a')))
+
+ def test_dupsort(self):
+ _, env = testlib.temp_env()
+
+ def test_dupdata_no_dupsort(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ assert txn.put(B('a'), B('a'), dupdata=True)
+ assert txn.put(B('a'), B('b'), dupdata=True)
+ txn.get(B('a'))
+
+
+class ReplaceTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_bad_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ txn.abort()
+ self.assertRaises(Exception,
+ lambda: txn.replace(B('a'), B('a')))
+
+ def test_bad_env(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ env.close()
+ self.assertRaises(Exception,
+ lambda: txn.replace(B('a'), B('a')))
+
+ def test_ro_txn(self):
+ _, env = testlib.temp_env()
+ txn = env.begin()
+ self.assertRaises(lmdb.ReadonlyError,
+ lambda: txn.replace(B('a'), B('a')))
+
+ def test_empty_key_value(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ self.assertRaises(lmdb.BadValsizeError,
+ lambda: txn.replace(B(''), B('a')))
+
+ def test_dupsort_noexist(self):
+ _, env = testlib.temp_env()
+ db = env.open_db(B('db1'), dupsort=True)
+ txn = env.begin(write=True, db=db)
+ assert None == txn.replace(B('a'), B('x'))
+ assert B('x') == txn.replace(B('a'), B('y'))
+ assert B('y') == txn.replace(B('a'), B('z'))
+ cur = txn.cursor()
+ assert cur.set_key(B('a'))
+ assert [B('z')] == list(cur.iternext_dup())
+
+ def test_dupdata_no_dupsort(self):
+ _, env = testlib.temp_env()
+ txn = env.begin(write=True)
+ assert txn.put(B('a'), B('a'), dupdata=True)
+ assert txn.put(B('a'), B('b'), dupdata=True)
+ txn.get(B('a'))
+
+
+class LeakTest(unittest.TestCase):
+ def tearDown(self):
+ testlib.cleanup()
+
+ def test_open_close(self):
+ temp_dir = testlib.temp_dir()
+ env = lmdb.open(temp_dir)
+ with env.begin() as txn:
+ pass
+ env.close()
+ r1 = weakref.ref(env)
+ r2 = weakref.ref(txn)
+ env = None
+ txn = None
+ testlib.debug_collect()
+ assert r1() is None
+ assert r2() is None
+
+
+if __name__ == '__main__':
+ unittest.main()