1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
#
# Copyright 2013 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
#
from __future__ import absolute_import
import atexit
import gc
import os
import shutil
import stat
import sys
import tempfile
import traceback
import unittest
try:
import __builtin__
except ImportError:
import builtins as __builtin__
import lmdb
_cleanups = []
def cleanup():
while _cleanups:
func = _cleanups.pop()
try:
func()
except Exception:
traceback.print_exc()
atexit.register(cleanup)
class LmdbTest(unittest.TestCase):
def tearDown(self):
cleanup()
def temp_dir(create=True):
path = tempfile.mkdtemp(prefix='lmdb_test')
assert path is not None, 'tempfile.mkdtemp failed'
if not create:
os.rmdir(path)
_cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True))
if hasattr(path, 'decode'):
path = path.decode(sys.getfilesystemencoding())
return path
def temp_file(create=True):
fd, path = tempfile.mkstemp(prefix='lmdb_test')
assert path is not None, 'tempfile.mkstemp failed'
os.close(fd)
if not create:
os.unlink(path)
_cleanups.append(lambda: os.path.exists(path) and os.unlink(path))
pathlock = path + '-lock'
_cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock))
if hasattr(path, 'decode'):
path = path.decode(sys.getfilesystemencoding())
return path
def temp_env(path=None, max_dbs=10, **kwargs):
if not path:
path = temp_dir()
env = lmdb.open(path, max_dbs=max_dbs, **kwargs)
_cleanups.append(env.close)
return path, env
def path_mode(path):
return stat.S_IMODE(os.stat(path).st_mode)
def debug_collect():
if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'):
old = gc.get_debug()
gc.set_debug(gc.DEBUG_LEAK)
gc.collect()
gc.set_debug(old)
else:
for x in range(10):
# PyPy doesn't collect objects with __del__ on first attempt.
gc.collect()
UnicodeType = getattr(__builtin__, 'unicode', str)
BytesType = getattr(__builtin__, 'bytes', str)
try:
INT_TYPES = (int, long)
except NameError:
INT_TYPES = (int,)
# B(ascii 'string') -> bytes
try:
bytes('') # Python>=2.6, alias for str().
B = lambda s: s
except TypeError: # Python3.x, requires encoding parameter.
B = lambda s: bytes(s, 'ascii')
# BL('s1', 's2') -> ['bytes1', 'bytes2']
BL = lambda *args: list(map(B, args))
# TS('s1', 's2') -> ('bytes1', 'bytes2')
BT = lambda *args: tuple(B(s) for s in args)
# O(int) -> length-1 bytes
O = lambda arg: B(chr(arg))
# OCT(s) -> parse string as octal
OCT = lambda s: int(s, 8)
KEYS = BL('a', 'b', 'baa', 'd')
ITEMS = [(k, B('')) for k in KEYS]
REV_ITEMS = ITEMS[::-1]
VALUES = [B('') for k in KEYS]
KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h')
ITEMS2 = [(k, B('')) for k in KEYS2]
REV_ITEMS2 = ITEMS2[::-1]
VALUES2 = [B('') for k in KEYS2]
KEYSFIXED = BL('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h')
VALUES_MULTI = [(B('r'), B('s')) for k in KEYSFIXED]
ITEMS_MULTI_FIXEDKEY = [
(kv[0], v) for kv in list(zip(KEYSFIXED, VALUES_MULTI)) for v in kv[1]
]
def _put_items(items, t, db=None):
for k, v in items:
if db:
t.put(k, v, db=db)
else:
t.put(k, v)
def putData(t, db=None):
_put_items(ITEMS, t, db=db)
def putBigData(t, db=None):
_put_items(ITEMS2, t, db=db)
def putBigDataMultiFixed(t, db=None):
_put_items(ITEMS_MULTI_FIXEDKEY, t, db=db)
|