1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
import unittest
from test.support import import_helper, os_helper, threading_helper
from test.support.threading_helper import run_concurrently
import threading
gdbm = import_helper.import_module("dbm.gnu")
NTHREADS = 10
KEY_PER_THREAD = 1000
gdbm_filename = "test_gdbm_file"
@threading_helper.requires_working_threading()
class TestGdbm(unittest.TestCase):
def test_racing_dbm_gnu(self):
def gdbm_multi_op_worker(db):
# Each thread sets, gets, and iterates
tid = threading.get_ident()
# Insert keys
for i in range(KEY_PER_THREAD):
db[f"key_{tid}_{i}"] = f"value_{tid}_{i}"
for i in range(KEY_PER_THREAD):
# Keys and values are stored as bytes; encode values for
# comparison
key = f"key_{tid}_{i}"
value = f"value_{tid}_{i}".encode()
self.assertIn(key, db)
self.assertEqual(db[key], value)
self.assertEqual(db.get(key), value)
self.assertIsNone(db.get("not_exist"))
with self.assertRaises(KeyError):
db["not_exist"]
# Iterate over the database keys and verify only those belonging
# to this thread. Other threads may concurrently delete their keys.
key_prefix = f"key_{tid}".encode()
key = db.firstkey()
key_count = 0
while key:
if key.startswith(key_prefix):
self.assertIn(key, db)
key_count += 1
key = db.nextkey(key)
# Can't assert key_count == KEY_PER_THREAD because concurrent
# threads may insert or delete keys during iteration. This can
# cause keys to be skipped or counted multiple times, making the
# count unreliable.
# See: https://www.gnu.org.ua/software/gdbm/manual/Sequential.html
# self.assertEqual(key_count, KEY_PER_THREAD)
# Delete this thread's keys
for i in range(KEY_PER_THREAD):
key = f"key_{tid}_{i}"
del db[key]
self.assertNotIn(key, db)
with self.assertRaises(KeyError):
del db["not_exist"]
# Re-insert keys
for i in range(KEY_PER_THREAD):
db[f"key_{tid}_{i}"] = f"value_{tid}_{i}"
with os_helper.temp_dir() as tmpdirname:
db = gdbm.open(f"{tmpdirname}/{gdbm_filename}", "c")
run_concurrently(
worker_func=gdbm_multi_op_worker, nthreads=NTHREADS, args=(db,)
)
self.assertEqual(len(db), NTHREADS * KEY_PER_THREAD)
db.close()
if __name__ == "__main__":
unittest.main()
|