File: testlib.py

package info (click to toggle)
py-lmdb 1.4.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,832 kB
  • sloc: ansic: 11,932; python: 4,646; makefile: 128
file content (162 lines) | stat: -rw-r--r-- 4,205 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#
# Copyright 2013 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
#

from __future__ import absolute_import
import atexit
import gc
import os
import shutil
import stat
import sys
import tempfile
import traceback
import unittest

try:
    import __builtin__
except ImportError:
    import builtins as __builtin__

import lmdb

_cleanups = []

def cleanup():
    while _cleanups:
        func = _cleanups.pop()
        try:
            func()
        except Exception:
            traceback.print_exc()

atexit.register(cleanup)

class LmdbTest(unittest.TestCase):
    def tearDown(self):
        cleanup()


def temp_dir(create=True):
    path = tempfile.mkdtemp(prefix='lmdb_test')
    assert path is not None, 'tempfile.mkdtemp failed'
    if not create:
        os.rmdir(path)
    _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True))
    if hasattr(path, 'decode'):
        path = path.decode(sys.getfilesystemencoding())
    return path


def temp_file(create=True):
    fd, path = tempfile.mkstemp(prefix='lmdb_test')
    assert path is not None, 'tempfile.mkstemp failed'
    os.close(fd)
    if not create:
        os.unlink(path)
    _cleanups.append(lambda: os.path.exists(path) and os.unlink(path))
    pathlock = path + '-lock'
    _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock))
    if hasattr(path, 'decode'):
        path = path.decode(sys.getfilesystemencoding())
    return path


def temp_env(path=None, max_dbs=10, **kwargs):
    if not path:
        path = temp_dir()
    env = lmdb.open(path, max_dbs=max_dbs, **kwargs)
    _cleanups.append(env.close)
    return path, env


def path_mode(path):
    return stat.S_IMODE(os.stat(path).st_mode)


def debug_collect():
    if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'):
        old = gc.get_debug()
        gc.set_debug(gc.DEBUG_LEAK)
        gc.collect()
        gc.set_debug(old)
    else:
        for x in range(10):
            # PyPy doesn't collect objects with __del__ on first attempt.
            gc.collect()

UnicodeType = getattr(__builtin__, 'unicode', str)
BytesType = getattr(__builtin__, 'bytes', str)

try:
    INT_TYPES = (int, long)
except NameError:
    INT_TYPES = (int,)

# B(ascii 'string') -> bytes
try:
    bytes('')     # Python>=2.6, alias for str().
    B = lambda s: s
except TypeError: # Python3.x, requires encoding parameter.
    B = lambda s: bytes(s, 'ascii')

# BL('s1', 's2') -> ['bytes1', 'bytes2']
BL = lambda *args: list(map(B, args))
# TS('s1', 's2') -> ('bytes1', 'bytes2')
BT = lambda *args: tuple(B(s) for s in args)
# O(int) -> length-1 bytes
O = lambda arg: B(chr(arg))
# OCT(s) -> parse string as octal
OCT = lambda s: int(s, 8)


KEYS = BL('a', 'b', 'baa', 'd')
ITEMS = [(k, B('')) for k in KEYS]
REV_ITEMS = ITEMS[::-1]
VALUES = [B('') for k in KEYS]

KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h')
ITEMS2 = [(k, B('')) for k in KEYS2]
REV_ITEMS2 = ITEMS2[::-1]
VALUES2 = [B('') for k in KEYS2]

KEYSFIXED = BL('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h')
VALUES_MULTI = [(B('r'), B('s')) for k in KEYSFIXED]
ITEMS_MULTI_FIXEDKEY = [
    (kv[0], v) for kv in list(zip(KEYSFIXED, VALUES_MULTI)) for v in kv[1]
    ]

def _put_items(items, t, db=None):
    for k, v in items:
        if db:
            t.put(k, v, db=db)
        else:
            t.put(k, v)


def putData(t, db=None):
    _put_items(ITEMS, t, db=db)

def putBigData(t, db=None):
    _put_items(ITEMS2, t, db=db)

def putBigDataMultiFixed(t, db=None):
    _put_items(ITEMS_MULTI_FIXEDKEY, t, db=db)