File: test_dbm_gnu.py

package info (click to toggle)
python3.14 3.14.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 169,680 kB
  • sloc: python: 751,968; ansic: 717,163; xml: 31,250; sh: 5,989; cpp: 4,063; makefile: 1,995; objc: 787; lisp: 502; javascript: 136; asm: 75; csh: 12
file content (79 lines) | stat: -rw-r--r-- 2,814 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import unittest

from test.support import import_helper, os_helper, threading_helper
from test.support.threading_helper import run_concurrently

import threading

gdbm = import_helper.import_module("dbm.gnu")

NTHREADS = 10
KEY_PER_THREAD = 1000

gdbm_filename = "test_gdbm_file"


@threading_helper.requires_working_threading()
class TestGdbm(unittest.TestCase):
    def test_racing_dbm_gnu(self):
        def gdbm_multi_op_worker(db):
            # Each thread sets, gets, and iterates
            tid = threading.get_ident()

            # Insert keys
            for i in range(KEY_PER_THREAD):
                db[f"key_{tid}_{i}"] = f"value_{tid}_{i}"

            for i in range(KEY_PER_THREAD):
                # Keys and values are stored as bytes; encode values for
                # comparison
                key = f"key_{tid}_{i}"
                value = f"value_{tid}_{i}".encode()
                self.assertIn(key, db)
                self.assertEqual(db[key], value)
                self.assertEqual(db.get(key), value)
                self.assertIsNone(db.get("not_exist"))
                with self.assertRaises(KeyError):
                    db["not_exist"]

            # Iterate over the database keys and verify only those belonging
            # to this thread. Other threads may concurrently delete their keys.
            key_prefix = f"key_{tid}".encode()
            key = db.firstkey()
            key_count = 0
            while key:
                if key.startswith(key_prefix):
                    self.assertIn(key, db)
                    key_count += 1
                key = db.nextkey(key)

            # Can't assert key_count == KEY_PER_THREAD because concurrent
            # threads may insert or delete keys during iteration. This can
            # cause keys to be skipped or counted multiple times, making the
            # count unreliable.
            # See: https://www.gnu.org.ua/software/gdbm/manual/Sequential.html
            # self.assertEqual(key_count, KEY_PER_THREAD)

            # Delete this thread's keys
            for i in range(KEY_PER_THREAD):
                key = f"key_{tid}_{i}"
                del db[key]
                self.assertNotIn(key, db)
                with self.assertRaises(KeyError):
                    del db["not_exist"]

            # Re-insert keys
            for i in range(KEY_PER_THREAD):
                db[f"key_{tid}_{i}"] = f"value_{tid}_{i}"

        with os_helper.temp_dir() as tmpdirname:
            db = gdbm.open(f"{tmpdirname}/{gdbm_filename}", "c")
            run_concurrently(
                worker_func=gdbm_multi_op_worker, nthreads=NTHREADS, args=(db,)
            )
            self.assertEqual(len(db), NTHREADS * KEY_PER_THREAD)
            db.close()


if __name__ == "__main__":
    unittest.main()