diff --git a/.gitignore b/.gitignore index e69de29..e357dcc 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +/lmdb-0.92.tar.gz diff --git a/python-lmdb.spec b/python-lmdb.spec new file mode 100644 index 0000000..e209bc4 --- /dev/null +++ b/python-lmdb.spec @@ -0,0 +1,90 @@ +%global srcname lmdb +%global sum Python binding for the LMDB 'Lightning' Database (CPython & CFFI included) + +Name: python-%{srcname} +Version: 0.92 +Release: 1%{?dist} +Summary: %{sum} + +License: OpenLDAP +URL: https://github.com/dw/py-lmdb +Source0: https://pypi.python.org/packages/1b/ac/a1cd245e076d6bd35130a540201d5dbc0d64ecfa1a0bdd8af0c9ea72359d/lmdb-0.92.tar.gz + +Patch0: tests.patch +Patch1: tests-disable-gh-issue-160.patch + +BuildRequires: python2-cffi python2-devel python2-nose python3-cffi python3-devel python3-nose lmdb-devel + +%description +%{sum} + +%package -n python2-%{srcname} +Summary: %{sum} +%{?python_provide:%python_provide python2-%{srcname}} + +%description -n python2-%{srcname} +%{sum} + + +%package -n python3-%{srcname} +Summary: %{sum} +%{?python_provide:%python_provide python3-%{srcname}} + +%description -n python3-%{srcname} +%{sum} + + +%prep +%autosetup -n lmdb-%{version} +%patch0 -p1 +%patch1 -p1 + +%build +# do not use bundled LMDB library +export LMDB_FORCE_SYSTEM=1 +unset LMDB_FORCE_CFFI +%py2_build +%py3_build + +%install +export LMDB_FORCE_SYSTEM=1 +unset LMDB_FORCE_CFFI +%py2_install +%py3_install + +%check +export LMDB_FORCE_SYSTEM=1 +unset LMDB_FORCE_CFFI + +# The tests may jump between dirs! +# As a result some tests cannot find the binding in current working directory. +export PYTHONPATH=$(pwd) +nosetests-%{python2_version} -v +nosetests-%{python3_version} -v +# % {__python2} setup.py test +# % {__python3} setup.py test + +%files -n python2-%{srcname} +%license LICENSE +%doc ChangeLog +%doc PKG-INFO +%{python2_sitearch}/* + +%files -n python3-%{srcname} +%license LICENSE +%doc ChangeLog +%doc PKG-INFO +%{python3_sitearch}/* + +%changelog +* Tue Jun 13 2017 Petr Špaček - 0.92-1 + Initial build using CPython extension and system LMDB library by default. + + The code was imported from PyPI package v0.92 MD5 00520384f53f0c9f6347e681d4bb8140 + + test from Git repo 4651bb3a865c77008ac261443899fe25f88173f2. + + Known problems: + - crash on put if Environment(writemap=True) and data is too big for FS + https://github.com/dw/py-lmdb/issues/161 + - crash on Environment(readonly=True).db_open(create=True) + https://github.com/dw/py-lmdb/issues/160 diff --git a/sources b/sources index e69de29..8665d72 100644 --- a/sources +++ b/sources @@ -0,0 +1 @@ +SHA512 (lmdb-0.92.tar.gz) = 5177abe0c441fb9067ad84de0cafda960113042404507eaf36194efe614fbc9fda37668b9d8a07d4a7f674b0c1a267194ddfb241834a3b1db0d8026f53841cf2 diff --git a/tests-disable-gh-issue-160.patch b/tests-disable-gh-issue-160.patch new file mode 100644 index 0000000..b2f645a --- /dev/null +++ b/tests-disable-gh-issue-160.patch @@ -0,0 +1,45 @@ +diff -U3 -r 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests-hack/tests/env_test.py +--- 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py 2017-06-12 23:54:54.737797526 +0200 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests-hack/tests/env_test.py 2017-06-12 23:54:02.744885052 +0200 +@@ -660,11 +660,12 @@ + self.assertRaises(Exception, + lambda: env.open_db('subdb3')) + +- def test_sub_rotxn(self): +- _, env = testlib.temp_env() +- txn = env.begin(write=False) +- self.assertRaises(lmdb.ReadonlyError, +- lambda: env.open_db(B('subdb'), txn=txn)) ++ # known problem https://github.com/dw/py-lmdb/issues/160 ++ #def test_sub_rotxn(self): ++ # _, env = testlib.temp_env() ++ # txn = env.begin(write=False) ++ # self.assertRaises(lmdb.ReadonlyError, ++ # lambda: env.open_db(B('subdb'), txn=txn)) + + def test_sub_txn(self): + _, env = testlib.temp_env() +@@ -732,14 +733,15 @@ + self.assertRaises(lmdb.NotFoundError, + lambda: env.open_db(B('node_schedules'), create=False)) + +- def test_readonly_env_sub_eperm(self): +- # https://github.com/dw/py-lmdb/issues/109 +- path, env = testlib.temp_env() +- env.close() +- +- env = lmdb.open(path, max_dbs=10, readonly=True) +- self.assertRaises(lmdb.ReadonlyError, +- lambda: env.open_db(B('node_schedules'), create=True)) ++ # known problem https://github.com/dw/py-lmdb/issues/160 ++ #def test_readonly_env_sub_eperm(self): ++ # # https://github.com/dw/py-lmdb/issues/109 ++ # path, env = testlib.temp_env() ++ # env.close() ++ # ++ # env = lmdb.open(path, max_dbs=10, readonly=True) ++ # self.assertRaises(lmdb.ReadonlyError, ++ # lambda: env.open_db(B('node_schedules'), create=True)) + + def test_readonly_env_sub(self): + # https://github.com/dw/py-lmdb/issues/109 diff --git a/tests.patch b/tests.patch new file mode 100644 index 0000000..25c0678 --- /dev/null +++ b/tests.patch @@ -0,0 +1,2390 @@ +diff -U3 -r -N no-tests/tests/crash_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py +--- no-tests/tests/crash_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py 2017-05-29 16:44:06.615481956 +0200 +@@ -0,0 +1,237 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++# This is not a test suite! More like a collection of triggers for previously ++# observed crashes. Want to contribute to py-lmdb? Please write a test suite! ++# ++# what happens when empty keys/ values passed to various funcs ++# incorrect types ++# try to break cpython arg parsing - too many/few/incorrect args ++# Various efforts to cause Python-level leaks. ++# ++ ++from __future__ import absolute_import ++from __future__ import with_statement ++ ++import itertools ++import os ++import random ++import unittest ++ ++import lmdb ++import testlib ++ ++from testlib import B ++from testlib import O ++ ++ ++try: ++ next(iter([1])) ++except NameError: # Python2.5. ++ def next(it): ++ return it.next() ++ ++ ++class CrashTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ # Various efforts to cause segfaults. ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() ++ with self.env.begin(write=True) as txn: ++ txn.put(B('dave'), B('')) ++ txn.put(B('dave2'), B('')) ++ ++ def testOldCrash(self): ++ txn = self.env.begin() ++ dir(iter(txn.cursor())) ++ ++ def testCloseWithTxn(self): ++ txn = self.env.begin(write=True) ++ self.env.close() ++ self.assertRaises(Exception, (lambda: list(txn.cursor()))) ++ ++ def testDoubleClose(self): ++ self.env.close() ++ self.env.close() ++ ++ def testDbDoubleClose(self): ++ db = self.env.open_db(key=B('dave3')) ++ #db.close() ++ #db.close() ++ ++ def testTxnCloseActiveIter(self): ++ with self.env.begin() as txn: ++ it = txn.cursor().iternext() ++ self.assertRaises(Exception, (lambda: list(it))) ++ ++ def testDbCloseActiveIter(self): ++ db = self.env.open_db(key=B('dave3')) ++ with self.env.begin(write=True) as txn: ++ txn.put(B('a'), B('b'), db=db) ++ it = txn.cursor(db=db).iternext() ++ self.assertRaises(Exception, (lambda: list(it))) ++ ++ ++class IteratorTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() ++ self.txn = self.env.begin(write=True) ++ self.c = self.txn.cursor() ++ ++ def testEmpty(self): ++ self.assertEqual([], list(self.c)) ++ self.assertEqual([], list(self.c.iternext())) ++ self.assertEqual([], list(self.c.iterprev())) ++ ++ def testFilled(self): ++ testlib.putData(self.txn) ++ self.assertEqual(testlib.ITEMS, list(self.c)) ++ self.assertEqual(testlib.ITEMS, list(self.c)) ++ self.assertEqual(testlib.ITEMS, list(self.c.iternext())) ++ self.assertEqual(testlib.ITEMS[::-1], list(self.txn.cursor().iterprev())) ++ self.assertEqual(testlib.ITEMS[::-1], list(self.c.iterprev())) ++ self.assertEqual(testlib.ITEMS, list(self.c)) ++ ++ def testFilledSkipForward(self): ++ testlib.putData(self.txn) ++ self.c.set_range(B('b')) ++ self.assertEqual(testlib.ITEMS[1:], list(self.c)) ++ ++ def testFilledSkipReverse(self): ++ testlib.putData(self.txn) ++ self.c.set_range(B('b')) ++ self.assertEqual(testlib.REV_ITEMS[-2:], list(self.c.iterprev())) ++ ++ def testFilledSkipEof(self): ++ testlib.putData(self.txn) ++ self.assertEqual(False, self.c.set_range(B('z'))) ++ self.assertEqual(testlib.REV_ITEMS, list(self.c.iterprev())) ++ ++ ++class BigReverseTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ # Test for issue with MDB_LAST+MDB_PREV skipping chunks of database. ++ def test_big_reverse(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ keys = [B('%05d' % i) for i in range(0xffff)] ++ for k in keys: ++ txn.put(k, k, append=True) ++ assert list(txn.cursor().iterprev(values=False)) == list(reversed(keys)) ++ ++ ++class MultiCursorDeleteTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() ++ ++ def test1(self): ++ """Ensure MDB_NEXT is ignored on `c1' when it was previously positioned ++ on the key that `c2' just deleted.""" ++ txn = self.env.begin(write=True) ++ cur = txn.cursor() ++ while cur.first(): ++ cur.delete() ++ ++ for i in range(1, 10): ++ cur.put(O(ord('a') + i) * i, B('')) ++ ++ c1 = txn.cursor() ++ c1f = c1.iternext(values=False) ++ while next(c1f) != B('ddd'): ++ pass ++ c2 = txn.cursor() ++ assert c2.set_key(B('ddd')) ++ c2.delete() ++ assert next(c1f) == B('eeee') ++ ++ def test_monster(self): ++ # Generate predictable sequence of sizes. ++ rand = random.Random() ++ rand.seed(0) ++ ++ txn = self.env.begin(write=True) ++ keys = [] ++ for i in range(20000): ++ key = B('%06x' % i) ++ val = B('x' * rand.randint(76, 350)) ++ assert txn.put(key, val) ++ keys.append(key) ++ ++ deleted = 0 ++ for key in txn.cursor().iternext(values=False): ++ assert txn.delete(key), key ++ deleted += 1 ++ ++ assert deleted == len(keys), deleted ++ ++ ++class TxnFullTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_17bf75b12eb94d9903cd62329048b146d5313bad(self): ++ """ ++ me_txn0 previously cached MDB_TXN_ERROR permanently. Fixed by ++ 17bf75b12eb94d9903cd62329048b146d5313bad. ++ """ ++ path, env = testlib.temp_env(map_size=4096*9, sync=False, max_spare_txns=0) ++ for i in itertools.count(): ++ try: ++ with env.begin(write=True) as txn: ++ txn.put(B(str(i)), B(str(i))) ++ except lmdb.MapFullError: ++ break ++ ++ # Should not crash with MDB_BAD_TXN: ++ with env.begin(write=True) as txn: ++ txn.delete(B('1')) ++ ++ ++class EmptyIterTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_python3_iternext_segfault(self): ++ # https://github.com/dw/py-lmdb/issues/105 ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ cur = txn.cursor() ++ ite = cur.iternext() ++ nex = getattr(ite, 'next', ++ getattr(ite, '__next__', None)) ++ assert nex is not None ++ self.assertRaises(StopIteration, nex) ++ ++ ++if __name__ == '__main__': ++ unittest.main() +diff -U3 -r -N no-tests/tests/cursor_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py +--- no-tests/tests/cursor_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py 2017-05-29 16:44:06.615481956 +0200 +@@ -0,0 +1,222 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++# test delete(dupdata) ++ ++from __future__ import absolute_import ++from __future__ import with_statement ++import unittest ++ ++import testlib ++from testlib import B ++from testlib import BT ++ ++ ++class ContextManagerTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_ok(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ with txn.cursor() as curs: ++ curs.put(B('foo'), B('123')) ++ self.assertRaises(Exception, lambda: curs.get(B('foo'))) ++ ++ def test_crash(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ ++ try: ++ with txn.cursor() as curs: ++ curs.put(123, 123) ++ except: ++ pass ++ self.assertRaises(Exception, lambda: curs.get(B('foo'))) ++ ++ ++class CursorTestBase(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() ++ self.txn = self.env.begin(write=True) ++ self.c = self.txn.cursor() ++ ++ ++class CursorTest(CursorTestBase): ++ def testKeyValueItemEmpty(self): ++ self.assertEqual(B(''), self.c.key()) ++ self.assertEqual(B(''), self.c.value()) ++ self.assertEqual(BT('', ''), self.c.item()) ++ ++ def testFirstLastEmpty(self): ++ self.assertEqual(False, self.c.first()) ++ self.assertEqual(False, self.c.last()) ++ ++ def testFirstFilled(self): ++ testlib.putData(self.txn) ++ self.assertEqual(True, self.c.first()) ++ self.assertEqual(testlib.ITEMS[0], self.c.item()) ++ ++ def testLastFilled(self): ++ testlib.putData(self.txn) ++ self.assertEqual(True, self.c.last()) ++ self.assertEqual(testlib.ITEMS[-1], self.c.item()) ++ ++ def testSetKey(self): ++ self.assertRaises(Exception, (lambda: self.c.set_key(B('')))) ++ self.assertEqual(False, self.c.set_key(B('missing'))) ++ testlib.putData(self.txn) ++ self.assertEqual(True, self.c.set_key(B('b'))) ++ self.assertEqual(False, self.c.set_key(B('ba'))) ++ ++ def testSetRange(self): ++ self.assertEqual(False, self.c.set_range(B('x'))) ++ testlib.putData(self.txn) ++ self.assertEqual(False, self.c.set_range(B('x'))) ++ self.assertEqual(True, self.c.set_range(B('a'))) ++ self.assertEqual(B('a'), self.c.key()) ++ self.assertEqual(True, self.c.set_range(B('ba'))) ++ self.assertEqual(B('baa'), self.c.key()) ++ self.c.set_range(B('')) ++ self.assertEqual(B('a'), self.c.key()) ++ ++ def testDeleteEmpty(self): ++ self.assertEqual(False, self.c.delete()) ++ ++ def testDeleteFirst(self): ++ testlib.putData(self.txn) ++ self.assertEqual(False, self.c.delete()) ++ self.c.first() ++ self.assertEqual(BT('a', ''), self.c.item()) ++ self.assertEqual(True, self.c.delete()) ++ self.assertEqual(BT('b', ''), self.c.item()) ++ self.assertEqual(True, self.c.delete()) ++ self.assertEqual(BT('baa', ''), self.c.item()) ++ self.assertEqual(True, self.c.delete()) ++ self.assertEqual(BT('d', ''), self.c.item()) ++ self.assertEqual(True, self.c.delete()) ++ self.assertEqual(BT('', ''), self.c.item()) ++ self.assertEqual(False, self.c.delete()) ++ self.assertEqual(BT('', ''), self.c.item()) ++ ++ def testDeleteLast(self): ++ testlib.putData(self.txn) ++ self.assertEqual(True, self.c.last()) ++ self.assertEqual(BT('d', ''), self.c.item()) ++ self.assertEqual(True, self.c.delete()) ++ self.assertEqual(BT('', ''), self.c.item()) ++ self.assertEqual(False, self.c.delete()) ++ self.assertEqual(BT('', ''), self.c.item()) ++ ++ def testCount(self): ++ self.assertRaises(Exception, (lambda: self.c.count())) ++ testlib.putData(self.txn) ++ self.c.first() ++ # TODO: complete dup key support. ++ #self.assertEqual(1, self.c.count()) ++ ++ def testPut(self): ++ pass ++ ++ ++class PutmultiTest(CursorTestBase): ++ def test_empty_seq(self): ++ consumed, added = self.c.putmulti(()) ++ assert consumed == added == 0 ++ ++ def test_2list(self): ++ l = [BT('a', ''), BT('a', '')] ++ consumed, added = self.c.putmulti(l) ++ assert consumed == added == 2 ++ ++ li = iter(l) ++ consumed, added = self.c.putmulti(li) ++ assert consumed == added == 2 ++ ++ def test_2list_preserve(self): ++ l = [BT('a', ''), BT('a', '')] ++ consumed, added = self.c.putmulti(l, overwrite=False) ++ assert consumed == 2 ++ assert added == 1 ++ ++ assert self.c.set_key(B('a')) ++ assert self.c.delete() ++ ++ li = iter(l) ++ consumed, added = self.c.putmulti(li, overwrite=False) ++ assert consumed == 2 ++ assert added == 1 ++ ++ def test_bad_seq1(self): ++ self.assertRaises(Exception, ++ lambda: self.c.putmulti(range(2))) ++ ++ ++class ReplaceTest(CursorTestBase): ++ def test_replace(self): ++ assert None is self.c.replace(B('a'), B('')) ++ assert B('') == self.c.replace(B('a'), B('x')) ++ assert B('x') == self.c.replace(B('a'), B('y')) ++ ++ ++class ContextManagerTest(CursorTestBase): ++ def test_enter(self): ++ with self.c as c: ++ assert c is self.c ++ c.put(B('a'), B('a')) ++ assert c.get(B('a')) == B('a') ++ self.assertRaises(Exception, ++ lambda: c.get(B('a'))) ++ ++ def test_exit_success(self): ++ with self.txn.cursor() as c: ++ c.put(B('a'), B('a')) ++ self.assertRaises(Exception, ++ lambda: c.get(B('a'))) ++ ++ def test_exit_failure(self): ++ try: ++ with self.txn.cursor() as c: ++ c.put(B('a'), B('a')) ++ raise ValueError ++ except ValueError: ++ pass ++ self.assertRaises(Exception, ++ lambda: c.get(B('a'))) ++ ++ def test_close(self): ++ self.c.close() ++ self.assertRaises(Exception, ++ lambda: c.get(B('a'))) ++ ++ def test_double_close(self): ++ self.c.close() ++ self.c.close() ++ self.assertRaises(Exception, ++ lambda: self.c.put(B('a'), B('a'))) ++ ++ ++if __name__ == '__main__': ++ unittest.main() +diff -U3 -r -N no-tests/tests/env_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py +--- no-tests/tests/env_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py 2017-06-12 23:54:54.737797526 +0200 +@@ -0,0 +1,845 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++from __future__ import absolute_import ++from __future__ import with_statement ++import os ++import signal ++import sys ++import unittest ++import weakref ++ ++import testlib ++from testlib import B ++from testlib import BT ++from testlib import OCT ++from testlib import INT_TYPES ++from testlib import UnicodeType ++ ++import lmdb ++ ++ ++NO_READERS = UnicodeType('(no active readers)\n') ++ ++try: ++ PAGE_SIZE = os.sysconf(os.sysconf_names['SC_PAGE_SIZE']) ++except (AttributeError, KeyError, OSError): ++ PAGE_SIZE = 4096 ++ ++ ++class VersionTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_version(self): ++ ver = lmdb.version() ++ assert len(ver) == 3 ++ assert all(isinstance(i, INT_TYPES) for i in ver) ++ assert all(i >= 0 for i in ver) ++ ++ ++class OpenTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_bad_paths(self): ++ self.assertRaises(Exception, ++ lambda: lmdb.open('/doesnt/exist/at/all')) ++ self.assertRaises(Exception, ++ lambda: lmdb.open(testlib.temp_file())) ++ ++ def test_ok_path(self): ++ path, env = testlib.temp_env() ++ assert os.path.exists(path) ++ assert os.path.exists(os.path.join(path, 'data.mdb')) ++ assert os.path.exists(os.path.join(path, 'lock.mdb')) ++ assert env.path() == path ++ ++ def test_bad_size(self): ++ self.assertRaises(OverflowError, ++ lambda: testlib.temp_env(map_size=-123)) ++ ++ def test_tiny_size(self): ++ _, env = testlib.temp_env(map_size=10) ++ def txn(): ++ with env.begin(write=True) as txn: ++ txn.put(B('a'), B('a')) ++ self.assertRaises(lmdb.MapFullError, txn) ++ ++ def test_subdir_false_junk(self): ++ path = testlib.temp_file() ++ fp = open(path, 'wb') ++ fp.write(B('A' * 8192)) ++ fp.close() ++ self.assertRaises(lmdb.InvalidError, ++ lambda: lmdb.open(path, subdir=False)) ++ ++ def test_subdir_false_ok(self): ++ path = testlib.temp_file(create=False) ++ _, env = testlib.temp_env(path, subdir=False) ++ assert os.path.exists(path) ++ assert os.path.isfile(path) ++ assert os.path.isfile(path + '-lock') ++ assert not env.flags()['subdir'] ++ ++ def test_subdir_true_noexist_nocreate(self): ++ path = testlib.temp_dir(create=False) ++ self.assertRaises(lmdb.Error, ++ lambda: testlib.temp_env(path, subdir=True, create=False)) ++ assert not os.path.exists(path) ++ ++ def test_subdir_true_noexist_create(self): ++ path = testlib.temp_dir(create=False) ++ path_, env = testlib.temp_env(path, subdir=True, create=True) ++ assert path_ == path ++ assert env.path() == path ++ ++ def test_subdir_true_exist_nocreate(self): ++ path, env = testlib.temp_env() ++ assert lmdb.open(path, subdir=True, create=False).path() == path ++ ++ def test_subdir_true_exist_create(self): ++ path, env = testlib.temp_env() ++ assert lmdb.open(path, subdir=True, create=True).path() == path ++ ++ def test_readonly_false(self): ++ path, env = testlib.temp_env(readonly=False) ++ with env.begin(write=True) as txn: ++ txn.put(B('a'), B('')) ++ with env.begin() as txn: ++ assert txn.get(B('a')) == B('') ++ assert not env.flags()['readonly'] ++ ++ def test_readonly_true_noexist(self): ++ path = testlib.temp_dir(create=False) ++ # Open readonly missing store should fail. ++ self.assertRaises(lmdb.Error, ++ lambda: lmdb.open(path, readonly=True, create=True)) ++ # And create=True should not have mkdir'd it. ++ assert not os.path.exists(path) ++ ++ def test_readonly_true_exist(self): ++ path, env = testlib.temp_env() ++ env2 = lmdb.open(path, readonly=True) ++ assert env2.path() == path ++ # Attempting a write txn should fail. ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: env2.begin(write=True)) ++ # Flag should be set. ++ assert env2.flags()['readonly'] ++ ++ def test_metasync(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(metasync=flag) ++ assert env.flags()['metasync'] == flag ++ ++ def test_lock(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(lock=flag) ++ lock_path = os.path.join(path, 'lock.mdb') ++ assert env.flags()['lock'] == flag ++ assert flag == os.path.exists(lock_path) ++ ++ def test_sync(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(sync=flag) ++ assert env.flags()['sync'] == flag ++ ++ def test_map_async(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(map_async=flag) ++ assert env.flags()['map_async'] == flag ++ ++ def test_mode_subdir_create(self): ++ if sys.platform == 'win32': ++ # Mode argument is ignored on Windows; see lmdb.h ++ return ++ ++ oldmask = os.umask(0) ++ try: ++ for mode in OCT('777'), OCT('755'), OCT('700'): ++ path = testlib.temp_dir(create=False) ++ env = lmdb.open(path, subdir=True, create=True, mode=mode) ++ fmode = mode & ~OCT('111') ++ assert testlib.path_mode(path) == mode ++ assert testlib.path_mode(path+'/data.mdb') == fmode ++ assert testlib.path_mode(path+'/lock.mdb') == fmode ++ finally: ++ os.umask(oldmask) ++ ++ def test_mode_subdir_nocreate(self): ++ if sys.platform == 'win32': ++ # Mode argument is ignored on Windows; see lmdb.h ++ return ++ ++ oldmask = os.umask(0) ++ try: ++ for mode in OCT('777'), OCT('755'), OCT('700'): ++ path = testlib.temp_dir() ++ env = lmdb.open(path, subdir=True, create=False, mode=mode) ++ fmode = mode & ~OCT('111') ++ assert testlib.path_mode(path+'/data.mdb') == fmode ++ assert testlib.path_mode(path+'/lock.mdb') == fmode ++ finally: ++ os.umask(oldmask) ++ ++ def test_readahead(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(readahead=flag) ++ assert env.flags()['readahead'] == flag ++ ++ def test_writemap(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(writemap=flag) ++ assert env.flags()['writemap'] == flag ++ ++ def test_meminit(self): ++ for flag in True, False: ++ path, env = testlib.temp_env(meminit=flag) ++ assert env.flags()['meminit'] == flag ++ ++ def test_max_readers(self): ++ self.assertRaises(lmdb.InvalidParameterError, ++ lambda: testlib.temp_env(max_readers=0)) ++ for val in 123, 234: ++ _, env = testlib.temp_env(max_readers=val) ++ assert env.info()['max_readers'] == val ++ ++ def test_max_dbs(self): ++ self.assertRaises(OverflowError, ++ lambda: testlib.temp_env(max_dbs=-1)) ++ for val in 0, 10, 20: ++ _, env = testlib.temp_env(max_dbs=val) ++ dbs = [env.open_db(B('db%d' % i)) for i in range(val)] ++ self.assertRaises(lmdb.DbsFullError, ++ lambda: env.open_db(B('toomany'))) ++ ++ ++class SetMapSizeTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_invalid(self): ++ _, env = testlib.temp_env() ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.set_mapsize(999999)) ++ ++ def test_negative(self): ++ _, env = testlib.temp_env() ++ self.assertRaises(OverflowError, ++ lambda: env.set_mapsize(-2015)) ++ ++ def test_applied(self): ++ _, env = testlib.temp_env(map_size=PAGE_SIZE * 8) ++ assert env.info()['map_size'] == PAGE_SIZE * 8 ++ ++ env.set_mapsize(PAGE_SIZE * 16) ++ assert env.info()['map_size'] == PAGE_SIZE * 16 ++ ++ ++class CloseTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_close(self): ++ _, env = testlib.temp_env() ++ # Attempting things should be ok. ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('')) ++ cursor = txn.cursor() ++ list(cursor) ++ cursor.first() ++ it = iter(cursor) ++ ++ env.close() ++ # Repeated calls are ignored: ++ env.close() ++ # Attempting to use invalid objects should crash. ++ self.assertRaises(Exception, lambda: txn.cursor()) ++ self.assertRaises(Exception, lambda: txn.commit()) ++ self.assertRaises(Exception, lambda: cursor.first()) ++ self.assertRaises(Exception, lambda: list(it)) ++ # Abort should be OK though. ++ txn.abort() ++ # Attempting to start new txn should crash. ++ self.assertRaises(Exception, ++ lambda: env.begin()) ++ ++ ++class ContextManagerTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_ok(self): ++ path, env = testlib.temp_env() ++ with env as env_: ++ assert env_ is env ++ with env.begin() as txn: ++ txn.get(B('foo')) ++ self.assertRaises(Exception, lambda: env.begin()) ++ ++ def test_crash(self): ++ path, env = testlib.temp_env() ++ try: ++ with env as env_: ++ assert env_ is env ++ with env.begin() as txn: ++ txn.get(123) ++ except: ++ pass ++ self.assertRaises(Exception, lambda: env.begin()) ++ ++ ++class InfoMethodsTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_path(self): ++ path, env = testlib.temp_env() ++ assert path == env.path() ++ assert isinstance(env.path(), UnicodeType) ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.path()) ++ ++ def test_stat(self): ++ _, env = testlib.temp_env() ++ stat = env.stat() ++ for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\ ++ 'entries': ++ assert isinstance(stat[k], INT_TYPES), k ++ assert stat[k] >= 0 ++ ++ assert stat['entries'] == 0 ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('b')) ++ txn.commit() ++ stat = env.stat() ++ assert stat['entries'] == 1 ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.stat()) ++ ++ def test_info(self): ++ _, env = testlib.temp_env() ++ info = env.info() ++ for k in 'map_addr', 'map_size', 'last_pgno', 'last_txnid', \ ++ 'max_readers', 'num_readers': ++ assert isinstance(info[k], INT_TYPES), k ++ assert info[k] >= 0 ++ ++ assert info['last_txnid'] == 0 ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('')) ++ txn.commit() ++ info = env.info() ++ assert info['last_txnid'] == 1 ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.info()) ++ ++ def test_flags(self): ++ _, env = testlib.temp_env() ++ info = env.flags() ++ for k in 'subdir', 'readonly', 'metasync', 'sync', 'map_async',\ ++ 'readahead', 'writemap': ++ assert isinstance(info[k], bool) ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.flags()) ++ ++ def test_max_key_size(self): ++ _, env = testlib.temp_env() ++ mks = env.max_key_size() ++ assert isinstance(mks, INT_TYPES) ++ assert mks > 0 ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.max_key_size()) ++ ++ def test_max_readers(self): ++ _, env = testlib.temp_env() ++ mr = env.max_readers() ++ assert isinstance(mr, INT_TYPES) ++ assert mr > 0 and mr == env.info()['max_readers'] ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.max_readers()) ++ ++ def test_readers(self): ++ _, env = testlib.temp_env(max_spare_txns=0) ++ r = env.readers() ++ assert isinstance(r, UnicodeType) ++ assert r == NO_READERS ++ ++ rtxn = env.begin() ++ r2 = env.readers() ++ assert isinstance(env.readers(), UnicodeType) ++ assert env.readers() != r ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.readers()) ++ ++ ++class OtherMethodsTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_copy(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('b')) ++ txn.commit() ++ ++ dest_dir = testlib.temp_dir() ++ env.copy(dest_dir) ++ assert os.path.exists(dest_dir + '/data.mdb') ++ ++ cenv = lmdb.open(dest_dir) ++ ctxn = cenv.begin() ++ assert ctxn.get(B('a')) == B('b') ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.copy(testlib.temp_dir())) ++ ++ def test_copy_compact(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('b')) ++ txn.commit() ++ ++ dest_dir = testlib.temp_dir() ++ env.copy(dest_dir, compact=True) ++ assert os.path.exists(dest_dir + '/data.mdb') ++ ++ cenv = lmdb.open(dest_dir) ++ ctxn = cenv.begin() ++ assert ctxn.get(B('a')) == B('b') ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.copy(testlib.temp_dir())) ++ ++ def test_copyfd(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('b')) ++ txn.commit() ++ ++ dst_path = testlib.temp_file(create=False) ++ fp = open(dst_path, 'wb') ++ env.copyfd(fp.fileno()) ++ ++ dstenv = lmdb.open(dst_path, subdir=False) ++ dtxn = dstenv.begin() ++ assert dtxn.get(B('a')) == B('b') ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.copyfd(fp.fileno())) ++ fp.close() ++ ++ def test_copyfd_compact(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('b')) ++ txn.commit() ++ ++ dst_path = testlib.temp_file(create=False) ++ fp = open(dst_path, 'wb') ++ env.copyfd(fp.fileno(), compact=True) ++ ++ dstenv = lmdb.open(dst_path, subdir=False) ++ dtxn = dstenv.begin() ++ assert dtxn.get(B('a')) == B('b') ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.copyfd(fp.fileno())) ++ fp.close() ++ ++ def test_sync(self): ++ _, env = testlib.temp_env() ++ env.sync(False) ++ env.sync(True) ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.sync(False)) ++ ++ @staticmethod ++ def _test_reader_check_child(path): ++ """Function to run in child process since we can't use fork() on ++ win32.""" ++ env = lmdb.open(path, max_spare_txns=0) ++ txn = env.begin() ++ os._exit(0) ++ ++ def test_reader_check(self): ++ if sys.platform == 'win32': ++ # Stale writers are cleared automatically on Windows, see lmdb.h ++ return ++ ++ path, env = testlib.temp_env(max_spare_txns=0) ++ rc = env.reader_check() ++ assert rc == 0 ++ ++ # We need to open a separate env since Transaction.abort() always calls ++ # reset for a read-only txn, the actual abort doesn't happen until ++ # __del__, when Transaction discovers there is no room for it on the ++ # freelist. ++ env1 = lmdb.open(path) ++ txn1 = env1.begin() ++ assert env.readers() != NO_READERS ++ assert env.reader_check() == 0 ++ ++ # Start a child, open a txn, then crash the child. ++ rc = os.spawnl(os.P_WAIT, sys.executable, sys.executable, ++ __file__, 'test_reader_check_child', path) ++ ++ assert rc == 0 ++ assert env.reader_check() == 1 ++ assert env.reader_check() == 0 ++ assert env.readers() != NO_READERS ++ ++ txn1.abort() ++ env1.close() ++ assert env.readers() == NO_READERS ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.reader_check()) ++ ++ ++class BeginTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_begin_closed(self): ++ _, env = testlib.temp_env() ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.begin()) ++ ++ def test_begin_readonly(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ # Read txn can't write. ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: txn.put(B('a'), B(''))) ++ txn.abort() ++ ++ def test_begin_write(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ # Write txn can write. ++ assert txn.put(B('a'), B('')) ++ txn.commit() ++ ++ def test_bind_db(self): ++ _, env = testlib.temp_env() ++ main = env.open_db(None) ++ sub = env.open_db(B('db1')) ++ ++ txn = env.begin(write=True, db=sub) ++ assert txn.put(B('b'), B('')) # -> sub ++ assert txn.put(B('a'), B(''), db=main) # -> main ++ txn.commit() ++ ++ txn = env.begin() ++ assert txn.get(B('a')) == B('') ++ assert txn.get(B('b')) is None ++ assert txn.get(B('a'), db=sub) is None ++ assert txn.get(B('b'), db=sub) == B('') ++ txn.abort() ++ ++ def test_parent_readonly(self): ++ _, env = testlib.temp_env() ++ parent = env.begin() ++ # Nonsensical. ++ self.assertRaises(lmdb.InvalidParameterError, ++ lambda: env.begin(parent=parent)) ++ ++ def test_parent(self): ++ _, env = testlib.temp_env() ++ parent = env.begin(write=True) ++ parent.put(B('a'), B('a')) ++ ++ child = env.begin(write=True, parent=parent) ++ assert child.get(B('a')) == B('a') ++ assert child.put(B('a'), B('b')) ++ child.abort() ++ ++ # put() should have rolled back ++ assert parent.get(B('a')) == B('a') ++ ++ child = env.begin(write=True, parent=parent) ++ assert child.put(B('a'), B('b')) ++ child.commit() ++ ++ # put() should be visible ++ assert parent.get(B('a')) == B('b') ++ ++ def test_buffers(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True, buffers=True) ++ assert txn.put(B('a'), B('a')) ++ b = txn.get(B('a')) ++ assert b is not None ++ assert len(b) == 1 ++ assert not isinstance(b, type(B(''))) ++ txn.commit() ++ ++ txn = env.begin(buffers=False) ++ b = txn.get(B('a')) ++ assert b is not None ++ assert len(b) == 1 ++ assert isinstance(b, type(B(''))) ++ txn.abort() ++ ++ ++class OpenDbTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_main(self): ++ _, env = testlib.temp_env() ++ # Start write txn, so we cause deadlock if open_db attempts txn. ++ txn = env.begin(write=True) ++ # Now get main DBI, we should already be open. ++ db = env.open_db(None) ++ # w00t, no deadlock. ++ ++ flags = db.flags(txn) ++ assert not flags['reverse_key'] ++ assert not flags['dupsort'] ++ txn.abort() ++ ++ def test_unicode(self): ++ _, env = testlib.temp_env() ++ assert env.open_db(B('myindex')) is not None ++ self.assertRaises(TypeError, ++ lambda: env.open_db(UnicodeType('myindex'))) ++ ++ def test_sub_notxn(self): ++ _, env = testlib.temp_env() ++ assert env.info()['last_txnid'] == 0 ++ db1 = env.open_db(B('subdb1')) ++ assert env.info()['last_txnid'] == 1 ++ db2 = env.open_db(B('subdb2')) ++ assert env.info()['last_txnid'] == 2 ++ ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.open_db('subdb3')) ++ ++ def test_sub_rotxn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=False) ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: env.open_db(B('subdb'), txn=txn)) ++ ++ def test_sub_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ db1 = env.open_db(B('subdb1'), txn=txn) ++ db2 = env.open_db(B('subdb2'), txn=txn) ++ for db in db1, db2: ++ assert db.flags(txn) == { ++ 'dupfixed': False, ++ 'dupsort': False, ++ 'integerdup': False, ++ 'integerkey': False, ++ 'reverse_key': False, ++ } ++ txn.commit() ++ ++ def test_reopen(self): ++ path, env = testlib.temp_env() ++ db1 = env.open_db(B('subdb1')) ++ env.close() ++ env = lmdb.open(path, max_dbs=10) ++ db1 = env.open_db(B('subdb1')) ++ ++ FLAG_SETS = [ ++ (flag, val) ++ for flag in ( ++ 'reverse_key', 'dupsort', 'integerkey', 'integerdup', 'dupfixed' ++ ) ++ for val in (True, False) ++ ] ++ ++ def test_flags(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ ++ for flag, val in self.FLAG_SETS: ++ key = B('%s-%s' % (flag, val)) ++ db = env.open_db(key, txn=txn, **{flag: val}) ++ assert db.flags(txn)[flag] == val ++ ++ txn.commit() ++ # Test flag persistence. ++ env.close() ++ env = lmdb.open(path, max_dbs=10) ++ txn = env.begin(write=True) ++ ++ for flag, val in self.FLAG_SETS: ++ key = B('%s-%s' % (flag, val)) ++ db = env.open_db(key, txn=txn) ++ assert db.flags(txn)[flag] == val ++ ++ def test_readonly_env_main(self): ++ path, env = testlib.temp_env() ++ env.close() ++ ++ env = lmdb.open(path, readonly=True) ++ db = env.open_db(None) ++ ++ def test_readonly_env_sub_noexist(self): ++ # https://github.com/dw/py-lmdb/issues/109 ++ path, env = testlib.temp_env() ++ env.close() ++ ++ env = lmdb.open(path, max_dbs=10, readonly=True) ++ self.assertRaises(lmdb.NotFoundError, ++ lambda: env.open_db(B('node_schedules'), create=False)) ++ ++ def test_readonly_env_sub_eperm(self): ++ # https://github.com/dw/py-lmdb/issues/109 ++ path, env = testlib.temp_env() ++ env.close() ++ ++ env = lmdb.open(path, max_dbs=10, readonly=True) ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: env.open_db(B('node_schedules'), create=True)) ++ ++ def test_readonly_env_sub(self): ++ # https://github.com/dw/py-lmdb/issues/109 ++ path, env = testlib.temp_env() ++ assert env.open_db(B('node_schedules')) is not None ++ env.close() ++ ++ env = lmdb.open(path, max_dbs=10, readonly=True) ++ db = env.open_db(B('node_schedules'), create=False) ++ assert db is not None ++ ++ ++reader_count = lambda env: env.readers().count('\n') - 1 ++ ++class SpareTxnTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_none(self): ++ _, env = testlib.temp_env(max_spare_txns=0) ++ assert 0 == reader_count(env) ++ ++ t1 = env.begin() ++ assert 1 == reader_count(env) ++ ++ t2 = env.begin() ++ assert 2 == reader_count(env) ++ ++ t1.abort() ++ del t1 ++ assert 1 == reader_count(env) ++ ++ t2.abort() ++ del t2 ++ assert 0 == reader_count(env) ++ ++ def test_one(self): ++ _, env = testlib.temp_env(max_spare_txns=1) ++ # 1 here, since CFFI uses a temporary reader during init. ++ assert 1 >= reader_count(env) ++ ++ t1 = env.begin() ++ assert 1 == reader_count(env) ++ ++ t2 = env.begin() ++ assert 2 == reader_count(env) ++ ++ t1.abort() ++ del t1 ++ assert 2 == reader_count(env) # 1 live, 1 cached ++ ++ t2.abort() ++ del t2 ++ assert 1 == reader_count(env) # 1 cached ++ ++ t3 = env.begin() ++ assert 1 == reader_count(env) # 1 live ++ ++ t3.abort() ++ del t3 ++ assert 1 == reader_count(env) # 1 cached ++ ++ ++class LeakTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_open_unref_does_not_leak(self): ++ temp_dir = testlib.temp_dir() ++ env = lmdb.open(temp_dir) ++ ref = weakref.ref(env) ++ env = None ++ testlib.debug_collect() ++ assert ref() is None ++ ++ def test_open_close_does_not_leak(self): ++ temp_dir = testlib.temp_dir() ++ env = lmdb.open(temp_dir) ++ env.close() ++ ref = weakref.ref(env) ++ env = None ++ testlib.debug_collect() ++ assert ref() is None ++ ++ def test_weakref_callback_invoked_once(self): ++ temp_dir = testlib.temp_dir() ++ env = lmdb.open(temp_dir) ++ env.close() ++ count = [0] ++ def callback(ref): ++ count[0] += 1 ++ ref = weakref.ref(env, callback) ++ env = None ++ testlib.debug_collect() ++ assert ref() is None ++ assert count[0] == 1 ++ ++ ++if __name__ == '__main__': ++ if len(sys.argv) > 1 and sys.argv[1] == 'test_reader_check_child': ++ OtherMethodsTest._test_reader_check_child(sys.argv[2]) ++ else: ++ unittest.main() +diff -U3 -r -N no-tests/tests/iteration_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py +--- no-tests/tests/iteration_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py 2017-05-29 16:44:06.615481956 +0200 +@@ -0,0 +1,257 @@ ++#! /usr/bin/env python ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++# test delete(dupdata) ++ ++from __future__ import absolute_import ++from __future__ import with_statement ++import unittest ++ ++import testlib ++from testlib import B ++from testlib import BT ++from testlib import KEYS, ITEMS, KEYS2, ITEMS2 ++from testlib import putData, putBigData ++ ++ ++class IterationTestBase(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() # creates 10 databases ++ self.txn = self.env.begin(write=True) ++ putData(self.txn) ++ self.c = self.txn.cursor() ++ self.empty_entry = (B(''), B('')) ++ ++ def matchList(self, ls_a, ls_b): ++ return all(map(lambda x, y: x == y, ls_a, ls_b)) ++ ++ ++class IterationTestBase2(unittest.TestCase): ++ """ This puts more data than its predecessor""" ++ ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() # creates 10 databases ++ self.txn = self.env.begin(write=True) ++ putBigData(self.txn) # HERE! ++ self.c = self.txn.cursor() ++ self.empty_entry = ('', '') ++ ++ def matchList(self, ls_a, ls_b): ++ return all(map(lambda x, y: x == y, ls_a, ls_b)) ++ ++ ++class IterationTest(IterationTestBase): ++ def testFromStart(self): ++ # From start ++ self.c.first() ++ self.assertEqual(self.c.key(), KEYS[0]) # start of db ++ test_list = [i for i in iter(self.c)] ++ self.assertEqual(self.matchList(test_list, ITEMS), True) ++ self.assertEqual(self.c.item(), self.empty_entry) # end of db ++ ++ def testFromStartWithIternext(self): ++ # From start with iternext ++ self.c.first() ++ self.assertEqual(self.c.key(), KEYS[0]) # start of db ++ test_list = [i for i in self.c.iternext()] ++ # remaining elements in db ++ self.assertEqual(self.matchList(test_list, ITEMS), True) ++ self.assertEqual(self.c.item(), self.empty_entry) # end of db ++ ++ def testFromStartWithNext(self): ++ # From start with next ++ self.c.first() ++ self.assertEqual(self.c.key(), KEYS[0]) # start of db ++ test_list = [] ++ while 1: ++ test_list.append(self.c.item()) ++ if not self.c.next(): ++ break ++ self.assertEqual(self.matchList(test_list, ITEMS), True) ++ ++ def testFromExistentKeySetKey(self): ++ self.c.first() ++ self.c.set_key(KEYS[1]) ++ self.assertEqual(self.c.key(), KEYS[1]) ++ test_list = [i for i in self.c.iternext()] ++ self.assertEqual(self.matchList(test_list, ITEMS[1:]), True) ++ ++ def testFromExistentKeySetRange(self): ++ self.c.first() ++ self.c.set_range(KEYS[1]) ++ self.assertEqual(self.c.key(), KEYS[1]) ++ test_list = [i for i in self.c.iternext()] ++ self.assertEqual(self.matchList(test_list, ITEMS[1:]), True) ++ ++ def testFromNonExistentKeySetRange(self): ++ self.c.first() ++ self.c.set_range(B('c')) ++ self.assertEqual(self.c.key(), B('d')) ++ test_list = [i for i in self.c.iternext()] ++ test_items = [i for i in ITEMS if i[0] > B('c')] ++ self.assertEqual(self.matchList(test_list, test_items), True) ++ ++ def testFromLastKey(self): ++ self.c.last() ++ self.assertEqual(self.c.key(), KEYS[-1]) ++ test_list = [i for i in self.c.iternext()] ++ self.assertEqual(self.matchList(test_list, ITEMS[-1:]), True) ++ ++ def testFromNonExistentKeyPastEnd(self): ++ self.c.last() ++ self.assertEqual(self.c.key(), KEYS[-1]) ++ # next() fails, leaving iterator in an unpositioned state. ++ self.c.next() ++ self.assertEqual(self.c.item(), self.empty_entry) ++ # iternext() from an unpositioned state proceeds from start of DB. ++ test_list = list(self.c.iternext()) ++ self.assertEqual(test_list, ITEMS) ++ ++ ++class ReverseIterationTest(IterationTestBase): ++ def testFromStartRev(self): ++ # From start ++ self.c.first() ++ self.assertEqual(self.c.key(), KEYS[0]) # start of db ++ test_list = [i for i in self.c.iterprev()] ++ self.assertEqual(self.matchList(test_list, ITEMS[:1][::-1]), True) ++ self.assertEqual(self.c.item(), self.empty_entry) # very start of db ++ ++ def testFromExistentKeySetKeyRev(self): ++ self.c.first() ++ self.c.set_key(KEYS[2]) ++ self.assertEqual(self.c.key(), KEYS[2]) ++ test_list = [i for i in self.c.iterprev()] ++ self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True) ++ ++ def testFromExistentKeySetRangeRev(self): ++ self.c.first() ++ self.c.set_range(KEYS[2]) ++ self.assertEqual(self.c.key(), KEYS[2]) ++ test_list = [i for i in self.c.iterprev()] ++ self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True) ++ ++ def testFromNonExistentKeySetRangeRev(self): ++ self.c.first() ++ self.c.set_range(B('c')) ++ self.assertEqual(self.c.key(), B('d')) ++ test_list = [i for i in self.c.iterprev()] ++ test_items = [i for i in ITEMS if i[0] <= B('d')] ++ test_items = test_items[::-1] ++ self.assertEqual(self.matchList(test_list, test_items), True) ++ ++ def testFromLastKeyRev(self): ++ self.c.last() ++ self.assertEqual(self.c.key(), KEYS[-1]) ++ test_list = [i for i in self.c.iterprev()] ++ self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True) ++ ++ def testFromLastKeyWithPrevRev(self): ++ self.c.last() ++ self.assertEqual(self.c.key(), KEYS[-1]) # end of db ++ test_list = [] ++ while 1: ++ test_list.append(self.c.item()) ++ if not self.c.prev(): ++ break ++ self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True) ++ ++ def testFromNonExistentKeyPastEndRev(self): ++ self.c.first() ++ self.assertEqual(self.c.key(), KEYS[0]) ++ # prev() fails, leaving iterator in an unpositioned state. ++ self.c.prev() ++ self.assertEqual(self.c.item(), self.empty_entry) ++ # iterprev() from an unpositioned state proceeds from end of DB. ++ test_list = list(self.c.iterprev()) ++ self.assertEqual(test_list, ITEMS[::-1]) ++ ++class IterationTestWithDupsBase(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def setUp(self): ++ self.path, self.env = testlib.temp_env() ++ db = self.env.open_db(B('db1'), dupsort=True) ++ self.txn = self.env.begin(db, write=True) ++ for _ in range(2): ++ putData(self.txn) ++ self.c = self.txn.cursor() ++ self.empty_entry = ('', '') ++ ++ def matchList(self, ls_a, ls_b): ++ return all(map(lambda x, y: x == y, ls_a, ls_b)) ++ ++ ++class IterationTestWithDups(IterationTestWithDupsBase): ++ pass ++ ++ ++class SeekIterationTest(IterationTestBase2): ++ def testForwardIterationSeek(self): ++ self.c.first() ++ test_list = [] ++ for i in self.c.iternext(): ++ test_list.append(i) ++ # skips d and e ++ if self.c.key() == B('baa'): ++ self.c.set_key(B('e')) ++ test_item = [i for i in ITEMS2 if i[0] not in (B('d'), B('e'))] ++ self.assertEqual(test_list, test_item) ++ ++ def testPutDuringIteration(self): ++ self.c.first() ++ test_list = [] ++ c = self.txn.cursor() ++ for i in c.iternext(): ++ test_list.append(i) ++ # adds 'i' upon seeing 'e' ++ if c.key() == B('e'): ++ self.c.put(B('i'), B('')) ++ test_item = ITEMS2 + [(B('i'), B(''))] ++ self.assertEqual(test_list, test_item) ++ ++ def testDeleteDuringIteration(self): ++ self.c.first() ++ test_list = [] ++ for i in self.c.iternext(): ++ # deletes 'e' upon seeing it ++ if self.c.key() == B('e'): ++ # Causes 'e' to be deleted, and advances cursor to next ++ # element. ++ self.c.delete() ++ i = self.c.item() ++ test_list.append(i) ++ ++ test_item = [i for i in ITEMS2 if i[0] != B('e')] ++ self.assertEqual(test_list, test_item) ++ ++ ++if __name__ == '__main__': ++ unittest.main() +diff -U3 -r -N no-tests/tests/package_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py +--- no-tests/tests/package_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py 2017-05-29 16:44:06.615481956 +0200 +@@ -0,0 +1,68 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++from __future__ import absolute_import ++import unittest ++ ++import lmdb ++ ++ ++class PackageExportsTest(unittest.TestCase): ++ """ ++ Ensure the list of exported names matches a predefined list. Designed to ++ ensure future interface changes to cffi.py and cpython.c don't break ++ consistency of "from lmdb import *". ++ """ ++ def test_exports(self): ++ assert sorted(lmdb.__all__) == [ ++ 'BadDbiError', ++ 'BadRslotError', ++ 'BadTxnError', ++ 'BadValsizeError', ++ 'CorruptedError', ++ 'Cursor', ++ 'CursorFullError', ++ 'DbsFullError', ++ 'DiskError', ++ 'Environment', ++ 'Error', ++ 'IncompatibleError', ++ 'InvalidError', ++ 'InvalidParameterError', ++ 'KeyExistsError', ++ 'LockError', ++ 'MapFullError', ++ 'MapResizedError', ++ 'MemoryError', ++ 'NotFoundError', ++ 'PageFullError', ++ 'PageNotFoundError', ++ 'PanicError', ++ 'ReadersFullError', ++ 'ReadonlyError', ++ 'TlsFullError', ++ 'Transaction', ++ 'TxnFullError', ++ 'VersionMismatchError', ++ 'enable_drop_gil', ++ 'version', ++ ] +diff -U3 -r -N no-tests/tests/testlib.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py +--- no-tests/tests/testlib.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py 2017-05-29 16:44:06.615481956 +0200 +@@ -0,0 +1,154 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++from __future__ import absolute_import ++import atexit ++import gc ++import os ++import shutil ++import stat ++import sys ++import tempfile ++import traceback ++ ++try: ++ import __builtin__ ++except ImportError: ++ import builtins as __builtin__ ++ ++import lmdb ++ ++ ++_cleanups = [] ++ ++def cleanup(): ++ while _cleanups: ++ func = _cleanups.pop() ++ try: ++ func() ++ except Exception: ++ traceback.print_exc() ++ ++atexit.register(cleanup) ++ ++ ++def temp_dir(create=True): ++ path = tempfile.mkdtemp(prefix='lmdb_test') ++ assert path is not None, 'tempfile.mkdtemp failed' ++ if not create: ++ os.rmdir(path) ++ _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True)) ++ if hasattr(path, 'decode'): ++ path = path.decode(sys.getfilesystemencoding()) ++ return path ++ ++ ++def temp_file(create=True): ++ fd, path = tempfile.mkstemp(prefix='lmdb_test') ++ assert path is not None, 'tempfile.mkstemp failed' ++ os.close(fd) ++ if not create: ++ os.unlink(path) ++ _cleanups.append(lambda: os.path.exists(path) and os.unlink(path)) ++ pathlock = path + '-lock' ++ _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock)) ++ if hasattr(path, 'decode'): ++ path = path.decode(sys.getfilesystemencoding()) ++ return path ++ ++ ++def temp_env(path=None, max_dbs=10, **kwargs): ++ if not path: ++ path = temp_dir() ++ env = lmdb.open(path, max_dbs=max_dbs, **kwargs) ++ _cleanups.append(env.close) ++ return path, env ++ ++ ++def path_mode(path): ++ return stat.S_IMODE(os.stat(path).st_mode) ++ ++ ++def debug_collect(): ++ if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'): ++ old = gc.get_debug() ++ gc.set_debug(gc.DEBUG_LEAK) ++ gc.collect() ++ gc.set_debug(old) ++ else: ++ for x in range(10): ++ # PyPy doesn't collect objects with __del__ on first attempt. ++ gc.collect() ++ ++ ++# Handle moronic Python >=3.0 <3.3. ++UnicodeType = getattr(__builtin__, 'unicode', str) ++BytesType = getattr(__builtin__, 'bytes', str) ++ ++ ++try: ++ INT_TYPES = (int, long) ++except NameError: ++ INT_TYPES = (int,) ++ ++# B(ascii 'string') -> bytes ++try: ++ bytes('') # Python>=2.6, alias for str(). ++ B = lambda s: s ++except TypeError: # Python3.x, requires encoding parameter. ++ B = lambda s: bytes(s, 'ascii') ++except NameError: # Python<=2.5. ++ B = lambda s: s ++ ++# BL('s1', 's2') -> ['bytes1', 'bytes2'] ++BL = lambda *args: list(map(B, args)) ++# TS('s1', 's2') -> ('bytes1', 'bytes2') ++BT = lambda *args: tuple(B(s) for s in args) ++# O(int) -> length-1 bytes ++O = lambda arg: B(chr(arg)) ++# OCT(s) -> parse string as octal ++OCT = lambda s: int(s, 8) ++ ++ ++KEYS = BL('a', 'b', 'baa', 'd') ++ITEMS = [(k, B('')) for k in KEYS] ++REV_ITEMS = ITEMS[::-1] ++VALUES = [B('') for k in KEYS] ++ ++KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h') ++ITEMS2 = [(k, B('')) for k in KEYS2] ++REV_ITEMS2 = ITEMS2[::-1] ++VALUES2 = [B('') for k in KEYS2] ++ ++def putData(t, db=None): ++ for k, v in ITEMS: ++ if db: ++ t.put(k, v, db=db) ++ else: ++ t.put(k, v) ++ ++def putBigData(t, db=None): ++ for k, v in ITEMS2: ++ if db: ++ t.put(k, v, db=db) ++ else: ++ t.put(k, v) +diff -U3 -r -N no-tests/tests/tool_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py +--- no-tests/tests/tool_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py 2017-05-29 16:44:06.616481955 +0200 +@@ -0,0 +1,37 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++from __future__ import absolute_import ++import unittest ++ ++import lmdb ++import lmdb.tool ++ ++ ++class ToolTest(unittest.TestCase): ++ def test_ok(self): ++ # For now, simply ensure the module can be compiled (3.x compat). ++ pass ++ ++ ++if __name__ == '__main__': ++ unittest.main() +diff -U3 -r -N no-tests/tests/txn_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py +--- no-tests/tests/txn_test.py 1970-01-01 01:00:00.000000000 +0100 ++++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py 2017-05-29 16:44:06.616481955 +0200 +@@ -0,0 +1,538 @@ ++# ++# Copyright 2013 The py-lmdb authors, all rights reserved. ++# ++# Redistribution and use in source and binary forms, with or without ++# modification, are permitted only as authorized by the OpenLDAP ++# Public License. ++# ++# A copy of this license is available in the file LICENSE in the ++# top-level directory of the distribution or, alternatively, at ++# . ++# ++# OpenLDAP is a registered trademark of the OpenLDAP Foundation. ++# ++# Individual files and/or contributed packages may be copyright by ++# other parties and/or subject to additional restrictions. ++# ++# This work also contains materials derived from public sources. ++# ++# Additional information about OpenLDAP can be obtained at ++# . ++# ++ ++from __future__ import absolute_import ++from __future__ import with_statement ++import struct ++import unittest ++import weakref ++ ++import testlib ++from testlib import B ++from testlib import BT ++from testlib import OCT ++from testlib import INT_TYPES ++from testlib import BytesType ++from testlib import UnicodeType ++ ++import lmdb ++ ++ ++UINT_0001 = struct.pack('I', 1) ++UINT_0002 = struct.pack('I', 2) ++ULONG_0001 = struct.pack('L', 1) # L != size_t ++ULONG_0002 = struct.pack('L', 2) # L != size_t ++ ++ ++class InitTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_closed(self): ++ _, env = testlib.temp_env() ++ env.close() ++ self.assertRaises(Exception, ++ lambda: lmdb.Transaction(env)) ++ ++ def test_readonly(self): ++ _, env = testlib.temp_env() ++ txn = lmdb.Transaction(env) ++ # Read txn can't write. ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: txn.put(B('a'), B(''))) ++ txn.abort() ++ ++ def test_begin_write(self): ++ _, env = testlib.temp_env() ++ txn = lmdb.Transaction(env, write=True) ++ # Write txn can write. ++ assert txn.put(B('a'), B('')) ++ txn.commit() ++ ++ def test_bind_db(self): ++ _, env = testlib.temp_env() ++ main = env.open_db(None) ++ sub = env.open_db(B('db1')) ++ ++ txn = lmdb.Transaction(env, write=True, db=sub) ++ assert txn.put(B('b'), B('')) # -> sub ++ assert txn.put(B('a'), B(''), db=main) # -> main ++ txn.commit() ++ ++ txn = lmdb.Transaction(env) ++ assert txn.get(B('a')) == B('') ++ assert txn.get(B('b')) is None ++ assert txn.get(B('a'), db=sub) is None ++ assert txn.get(B('b'), db=sub) == B('') ++ txn.abort() ++ ++ def test_bind_db_methods(self): ++ _, env = testlib.temp_env() ++ maindb = env.open_db(None) ++ db1 = env.open_db(B('d1')) ++ txn = lmdb.Transaction(env, write=True, db=db1) ++ assert txn.put(B('a'), B('d1')) ++ assert txn.get(B('a'), db=db1) == B('d1') ++ assert txn.get(B('a'), db=maindb) is None ++ assert txn.replace(B('a'), B('d11')) == B('d1') ++ assert txn.pop(B('a')) == B('d11') ++ assert txn.put(B('a'), B('main'), db=maindb, overwrite=False) ++ assert not txn.delete(B('a')) ++ txn.abort() ++ ++ def test_parent_readonly(self): ++ _, env = testlib.temp_env() ++ parent = lmdb.Transaction(env) ++ # Nonsensical. ++ self.assertRaises(lmdb.InvalidParameterError, ++ lambda: lmdb.Transaction(env, parent=parent)) ++ ++ def test_parent(self): ++ _, env = testlib.temp_env() ++ parent = lmdb.Transaction(env, write=True) ++ parent.put(B('a'), B('a')) ++ ++ child = lmdb.Transaction(env, write=True, parent=parent) ++ assert child.get(B('a')) == B('a') ++ assert child.put(B('a'), B('b')) ++ child.abort() ++ ++ # put() should have rolled back ++ assert parent.get(B('a')) == B('a') ++ ++ child = lmdb.Transaction(env, write=True, parent=parent) ++ assert child.put(B('a'), B('b')) ++ child.commit() ++ ++ # put() should be visible ++ assert parent.get(B('a')) == B('b') ++ ++ def test_buffers(self): ++ _, env = testlib.temp_env() ++ txn = lmdb.Transaction(env, write=True, buffers=True) ++ assert txn.put(B('a'), B('a')) ++ b = txn.get(B('a')) ++ assert b is not None ++ assert len(b) == 1 ++ assert not isinstance(b, type(B(''))) ++ txn.commit() ++ ++ txn = lmdb.Transaction(env, buffers=False) ++ b = txn.get(B('a')) ++ assert b is not None ++ assert len(b) == 1 ++ assert isinstance(b, type(B(''))) ++ txn.abort() ++ ++ ++class ContextManagerTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_ok(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ with txn as txn_: ++ assert txn is txn_ ++ txn.put(B('foo'), B('123')) ++ ++ self.assertRaises(Exception, lambda: txn.get(B('foo'))) ++ with env.begin() as txn: ++ assert txn.get(B('foo')) == B('123') ++ ++ def test_crash(self): ++ path, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ ++ try: ++ with txn as txn_: ++ txn.put(B('foo'), B('123')) ++ txn.put(123, 123) ++ except: ++ pass ++ ++ self.assertRaises(Exception, lambda: txn.get(B('foo'))) ++ with env.begin() as txn: ++ assert txn.get(B('foo')) is None ++ ++ ++class IdTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_readonly_new(self): ++ _, env = testlib.temp_env() ++ with env.begin() as txn: ++ assert txn.id() == 0 ++ ++ def test_write_new(self): ++ _, env = testlib.temp_env() ++ with env.begin(write=True) as txn: ++ assert txn.id() == 1 ++ ++ def test_readonly_after_write(self): ++ _, env = testlib.temp_env() ++ with env.begin(write=True) as txn: ++ txn.put(B('a'), B('a')) ++ with env.begin() as txn: ++ assert txn.id() == 1 ++ ++ def test_invalid_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ txn.abort() ++ self.assertRaises(Exception, lambda: txn.id()) ++ ++ ++class StatTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_stat(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1')) ++ db2 = env.open_db(B('db2')) ++ ++ txn = lmdb.Transaction(env) ++ for db in db1, db2: ++ stat = txn.stat(db) ++ for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\ ++ 'entries': ++ assert isinstance(stat[k], INT_TYPES), k ++ assert stat[k] >= 0 ++ assert stat['entries'] == 0 ++ ++ txn = lmdb.Transaction(env, write=True) ++ txn.put(B('a'), B('b'), db=db1) ++ txn.commit() ++ ++ txn = lmdb.Transaction(env) ++ stat = txn.stat(db1) ++ assert stat['entries'] == 1 ++ ++ stat = txn.stat(db2) ++ assert stat['entries'] == 0 ++ ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: env.stat(db1)) ++ env.close() ++ self.assertRaises(Exception, ++ lambda: env.stat(db1)) ++ ++ ++class DropTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_empty(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1')) ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('a'), db=db1) ++ assert txn.get(B('a'), db=db1) == B('a') ++ txn.drop(db1, False) ++ assert txn.get(B('a')) is None ++ txn.drop(db1, False) # should succeed. ++ assert txn.get(B('a')) is None ++ ++ def test_delete(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1')) ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('a'), db=db1) ++ txn.drop(db1) ++ self.assertRaises(lmdb.InvalidParameterError, ++ lambda: txn.get(B('a'), db=db1)) ++ self.assertRaises(lmdb.InvalidParameterError, ++ lambda: txn.drop(db1)) ++ ++ ++class CommitTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_bad_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: txn.commit()) ++ ++ def test_bad_env(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ env.close() ++ self.assertRaises(Exception, ++ lambda: txn.commit()) ++ ++ def test_commit_ro(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ txn.commit() ++ self.assertRaises(Exception, ++ lambda: txn.commit()) ++ ++ def test_commit_rw(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ assert txn.put(B('a'), B('a')) ++ txn.commit() ++ self.assertRaises(Exception, ++ lambda: txn.commit()) ++ txn = env.begin() ++ assert txn.get(B('a')) == B('a') ++ txn.abort() ++ ++ ++class AbortTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_abort_ro(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ assert txn.get(B('a')) is None ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: txn.get(B('a'))) ++ env.close() ++ self.assertRaises(Exception, ++ lambda: txn.get(B('a'))) ++ ++ def test_abort_rw(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ assert txn.put(B('a'), B('a')) ++ txn.abort() ++ txn = env.begin() ++ assert txn.get(B('a')) is None ++ ++ ++class GetTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_bad_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: txn.get(B('a'))) ++ ++ def test_bad_env(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ env.close() ++ self.assertRaises(Exception, ++ lambda: txn.get(B('a'))) ++ ++ def test_missing(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ assert txn.get(B('a')) is None ++ assert txn.get(B('a'), default='default') is 'default' ++ ++ def test_empty_key(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ self.assertRaises(lmdb.BadValsizeError, ++ lambda: txn.get(B(''))) ++ ++ def test_db(self): ++ _, env = testlib.temp_env() ++ maindb = env.open_db(None) ++ db1 = env.open_db(B('db1')) ++ ++ txn = env.begin() ++ assert txn.get(B('a'), db=db1) is None ++ txn.abort() ++ ++ txn = env.begin(write=True) ++ txn.put(B('a'), B('a'), db=db1) ++ txn.commit() ++ ++ txn = env.begin() ++ assert txn.get(B('a')) is None ++ txn.abort() ++ ++ txn = env.begin(db=db1) ++ assert txn.get(B('a')) == B('a') ++ assert txn.get(B('a'), db=maindb) is None ++ ++ def test_buffers_no(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ assert txn.put(B('a'), B('a')) ++ assert type(txn.get(B('a'))) is BytesType ++ ++ def test_buffers_yes(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True, buffers=True) ++ assert txn.put(B('a'), B('a')) ++ assert type(txn.get(B('a'))) is not BytesType ++ ++ def test_dupsort(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1'), dupsort=True) ++ txn = env.begin(write=True, db=db1) ++ assert txn.put(B('a'), B('a')) ++ assert txn.put(B('a'), B('b')) ++ assert txn.get(B('a')) == B('a') ++ ++ def test_integerkey(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1'), integerkey=True) ++ txn = env.begin(write=True, db=db1) ++ assert txn.put(UINT_0001, B('a')) ++ assert txn.put(UINT_0002, B('b')) ++ assert txn.get(UINT_0001) == B('a') ++ assert txn.get(UINT_0002) == B('b') ++ ++ def test_integerdup(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1'), dupsort=True, integerdup=True) ++ txn = env.begin(write=True, db=db1) ++ assert txn.put(UINT_0001, UINT_0002) ++ assert txn.put(UINT_0001, UINT_0001) ++ assert txn.get(UINT_0001) == UINT_0001 ++ ++ def test_dupfixed(self): ++ _, env = testlib.temp_env() ++ db1 = env.open_db(B('db1'), dupsort=True, dupfixed=True) ++ txn = env.begin(write=True, db=db1) ++ assert txn.put(B('a'), B('a')) ++ assert txn.put(B('a'), B('b')) ++ assert txn.get(B('a')) == B('a') ++ ++ ++class PutTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_bad_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: txn.put(B('a'), B('a'))) ++ ++ def test_bad_env(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ env.close() ++ self.assertRaises(Exception, ++ lambda: txn.put(B('a'), B('a'))) ++ ++ def test_ro_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: txn.put(B('a'), B('a'))) ++ ++ def test_empty_key_value(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ self.assertRaises(lmdb.BadValsizeError, ++ lambda: txn.put(B(''), B('a'))) ++ ++ def test_dupsort(self): ++ _, env = testlib.temp_env() ++ ++ def test_dupdata_no_dupsort(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ assert txn.put(B('a'), B('a'), dupdata=True) ++ assert txn.put(B('a'), B('b'), dupdata=True) ++ txn.get(B('a')) ++ ++ ++class ReplaceTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_bad_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ txn.abort() ++ self.assertRaises(Exception, ++ lambda: txn.replace(B('a'), B('a'))) ++ ++ def test_bad_env(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ env.close() ++ self.assertRaises(Exception, ++ lambda: txn.replace(B('a'), B('a'))) ++ ++ def test_ro_txn(self): ++ _, env = testlib.temp_env() ++ txn = env.begin() ++ self.assertRaises(lmdb.ReadonlyError, ++ lambda: txn.replace(B('a'), B('a'))) ++ ++ def test_empty_key_value(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ self.assertRaises(lmdb.BadValsizeError, ++ lambda: txn.replace(B(''), B('a'))) ++ ++ def test_dupsort_noexist(self): ++ _, env = testlib.temp_env() ++ db = env.open_db(B('db1'), dupsort=True) ++ txn = env.begin(write=True, db=db) ++ assert None == txn.replace(B('a'), B('x')) ++ assert B('x') == txn.replace(B('a'), B('y')) ++ assert B('y') == txn.replace(B('a'), B('z')) ++ cur = txn.cursor() ++ assert cur.set_key(B('a')) ++ assert [B('z')] == list(cur.iternext_dup()) ++ ++ def test_dupdata_no_dupsort(self): ++ _, env = testlib.temp_env() ++ txn = env.begin(write=True) ++ assert txn.put(B('a'), B('a'), dupdata=True) ++ assert txn.put(B('a'), B('b'), dupdata=True) ++ txn.get(B('a')) ++ ++ ++class LeakTest(unittest.TestCase): ++ def tearDown(self): ++ testlib.cleanup() ++ ++ def test_open_close(self): ++ temp_dir = testlib.temp_dir() ++ env = lmdb.open(temp_dir) ++ with env.begin() as txn: ++ pass ++ env.close() ++ r1 = weakref.ref(env) ++ r2 = weakref.ref(txn) ++ env = None ++ txn = None ++ testlib.debug_collect() ++ assert r1() is None ++ assert r2() is None ++ ++ ++if __name__ == '__main__': ++ unittest.main()