File: test_semaphores.py

package info (click to toggle)
python-sysv-ipc 1.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 540 kB
  • sloc: ansic: 3,140; python: 1,960; makefile: 8; sh: 4
file content (422 lines) | stat: -rw-r--r-- 16,503 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# Python imports
import unittest
import datetime
import time
import os

# Project imports
import sysv_ipc
from .base import Base, make_key

# Not tested --
# - mode seems to be settable and readable, but ignored by the OS
# - undo flag is hard to test without launching another process
# - P() and V() are simple aliases for acquire() and release(). I could repeat all of the
#   acquire() and release() tests using the aliases. A different implementation might be to
#   supply an __init__.py along with the .so binary that would alias these functions in Python
#   which would be testable.
# - Z() is not tested in the positive case, i.e. that it actually unblocks once the semaphore
#   hits zero. (Requires a separate process to test.)


# N_RELEASES is the number of times release() is called in test_release()
N_RELEASES = 1000000  # 1 million


class SemaphoreTestBase(Base):
    """base class for Semaphore test classes"""
    def setUp(self):
        self.sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX, initial_value=1)

    def tearDown(self):
        if self.sem:
            self.sem.remove()

    def assertWriteToReadOnlyPropertyFails(self, property_name, value):
        """test that writing to a readonly property raises TypeError"""
        Base.assertWriteToReadOnlyPropertyFails(self, self.sem, property_name, value)

    def assertDeltasCloseEnough(self, delta_a, delta_b):
        """Compare two datetime.timedeltas and ensure they're within < 1 second of one another.

        This is useful when comparing actual times to expected times in tests where the actual
        time may have been a bit longer or a bit shorter than the expected.
        """
        # This is conceptually like computing abs(delta_a - delta_b).
        if delta_a > delta_b:
            delta = delta_a - delta_b
        else:
            delta = delta_b - delta_a

        self.assertEqual(delta.days, 0)
        self.assertEqual(delta.seconds, 0)
        # I don't test microseconds because that granularity isn't under the control of this module.


class TestSemaphoreCreation(SemaphoreTestBase):
    """Exercise stuff related to creating Semaphores"""
    def test_no_flags(self):
        """tests that opening a semaphore with no flags opens the existing
        semaphore and doesn't create a new semaphore"""
        sem_copy = sysv_ipc.Semaphore(self.sem.key)
        self.assertEqual(self.sem.key, sem_copy.key)

    def test_IPC_CREAT_existing(self):
        """tests sysv_ipc.IPC_CREAT to open an existing semaphore without IPC_EXCL"""
        sem_copy = sysv_ipc.Semaphore(self.sem.key, sysv_ipc.IPC_CREAT)

        self.assertEqual(self.sem.key, sem_copy.key)

    def test_IPC_CREAT_new(self):
        """tests sysv_ipc.IPC_CREAT to create a new semaphore without IPC_EXCL"""
        # I can't pass None for the name unless I also pass IPC_EXCL.
        key = make_key()

        # Note: this method of finding an unused key is vulnerable to a race
        # condition. It's good enough for test, but don't copy it for use in
        # production code!
        key_is_available = False
        while not key_is_available:
            try:
                sem = sysv_ipc.Semaphore(key)
                sem.close()
            except sysv_ipc.ExistentialError:
                key_is_available = True
            else:
                key = make_key()

        sem = sysv_ipc.Semaphore(key, sysv_ipc.IPC_CREAT)

        self.assertIsNotNone(sem)

        sem.remove()

    def test_IPC_EXCL(self):
        """tests IPC_CREAT | IPC_EXCL prevents opening an existing semaphore"""
        with self.assertRaises(sysv_ipc.ExistentialError):
            sysv_ipc.Semaphore(self.sem.key, sysv_ipc.IPC_CREX)

    def test_randomly_generated_key(self):
        """tests that the randomly-generated key works"""
        # This is tested implicitly elsewhere but I want to test it explicitly
        sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX)
        self.assertIsNotNone(sem.key)
        self.assertGreaterEqual(sem.key, sysv_ipc.KEY_MIN)
        self.assertLessEqual(sem.key, sysv_ipc.KEY_MAX)
        sem.remove()

    # # don't bother testing mode, it's ignored by the OS?

    def test_default_initial_value(self):
        """tests that the initial value is 0 by default"""
        sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX)
        self.assertEqual(sem.value, 0)
        sem.remove()

    def test_zero_initial_value(self):
        """tests that the initial value is 0 when assigned"""
        sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX, initial_value=0)
        self.assertEqual(sem.value, 0)
        sem.remove()

    def test_nonzero_initial_value(self):
        """tests that the initial value is non-zero when assigned"""
        sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX, initial_value=42)
        self.assertEqual(sem.value, 42)
        sem.remove()

    def test_kwargs(self):
        """ensure init accepts keyword args as advertised"""
        # mode 0x180 = 0600. Octal is difficult to express in Python 2/3 compatible code.
        sem = sysv_ipc.Semaphore(None, flags=sysv_ipc.IPC_CREX, mode=0x180, initial_value=0)
        sem.remove()


class TestSemaphoreAquisition(SemaphoreTestBase):
    """Exercise acquiring semaphores"""
    def test_simple_acquisition(self):
        """tests that acquisition works"""
        # I should be able to acquire this semaphore, but if I can't I don't want to hang the
        # test so I set block=False. If I can't acquire the semaphore, sysv_ipc will raise a
        # BusyError.
        self.sem.block = False
        # Should raise no error
        self.sem.acquire()

    def test_acquisition_delta(self):
        """tests that the delta param works"""
        self.sem.value = 42
        self.sem.acquire(None, 10)
        self.assertEqual(self.sem.value, 32)

    def test_acquisition_zero_delta(self):
        """tests that a zero delta is not allowed"""
        # acquire() w/zero delta is not allowed because at the C level, P(), V(), and Z() all
        # map to semop(), and the delta value is what differentiates P(), V(), and Z().
        with self.assertRaises(ValueError):
            self.sem.acquire(None, 0)

    def test_acquisition_non_blocking(self):
        """tests that a non-blocking attempt at acquisition works"""
        # Should raise no error
        self.sem.acquire()

        self.sem.block = False
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.acquire()

    # test acquisition failures
    # def test_acquisition_no_timeout(self):
    # FIXME
    # This is hard to test since it should wait infinitely. Probably the way
    # to do it is to spawn another process that holds the semaphore for
    # maybe 10 seconds and have this process wait on it. That's complicated
    # and not a really great test.

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_acquisition_zero_timeout(self):
        """tests that acquisition w/timeout=0 implements non-blocking behavior"""
        # Should not raise an error
        self.sem.acquire(0)
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.acquire(0)

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_acquisition_nonzero_int_timeout(self):
        """tests that acquisition w/timeout=an int is reasonably accurate"""
        # Should not raise an error
        self.sem.acquire(0)

        # This should raise a busy error
        wait_time = 1
        start = datetime.datetime.now()
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.acquire(wait_time)
        end = datetime.datetime.now()
        actual_delta = end - start
        expected_delta = datetime.timedelta(seconds=wait_time)

        self.assertDeltasCloseEnough(actual_delta, expected_delta)

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_acquisition_nonzero_float_timeout(self):
        """tests that acquisition w/timeout=a float is reasonably accurate"""
        # Should not raise an error
        self.sem.acquire(0)

        # This should raise a busy error
        wait_time = 1.5
        start = datetime.datetime.now()
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.acquire(wait_time)
        end = datetime.datetime.now()
        actual_delta = end - start
        expected_delta = datetime.timedelta(seconds=wait_time)

        self.assertDeltasCloseEnough(actual_delta, expected_delta)

    def test_acquire_kwargs(self):
        """Ensure acquire() takes kwargs as advertised"""
        self.sem.acquire(timeout=None, delta=1)

    def test_P_kwargs(self):
        """Ensure P() takes kwargs as advertised"""
        self.sem.P(timeout=None, delta=1)


class TestSemaphoreRelease(SemaphoreTestBase):
    """Exercise releasing semaphores"""
    def test_release(self):
        """tests that release works"""
        # Not only does it work, I can do it as many times as I want! I had
        # tried some code that called release() SEMAPHORE_VALUE_MAX times, but
        # on platforms where that's ~2 billion, the test takes too long to run.
        # So I'll stick to a lower (but still very large) number of releases.
        n_releases = min(N_RELEASES, sysv_ipc.SEMAPHORE_VALUE_MAX - 1)
        original_value = self.sem.value
        for i in range(n_releases):
            self.sem.release()
        self.assertEqual(self.sem.value, original_value + n_releases)

    def test_release_delta(self):
        """tests that release()'s delta param works"""
        original_value = self.sem.value
        self.sem.release(5)
        self.assertEqual(self.sem.value, original_value + 5)

    def test_release_kwargs(self):
        """Ensure release() takes kwargs as advertised"""
        self.sem.release(delta=1)

    def test_V_kwargs(self):
        """Ensure V() takes kwargs as advertised"""
        self.sem.V(delta=1)

    def test_context_manager(self):
        """tests that context manager acquire/release works"""
        with self.sem as sem:
            self.assertEqual(sem.value, 0)
            self.sem.block = False
            with self.assertRaises(sysv_ipc.BusyError):
                sem.acquire()

        self.assertEqual(sem.value, 1)

        # Should not raise an error.
        self.sem.block = False
        sem.acquire()


class TestSemaphoreZ(SemaphoreTestBase):
    """exercise Z() (block until zero)"""
    def test_Z_failure(self):
        """Ensure Z understands when the semaphore is non-zero"""
        self.sem.value = 42
        self.sem.block = False
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.Z()

    def test_Z_success(self):
        """Ensure Z understands when the semaphore is zero"""
        self.sem.value = 0
        self.sem.block = False
        # Should not raise BusyError
        self.sem.Z()

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_Z_zero_timeout(self):
        """tests that Z w/timeout=0 implements non-blocking behavior"""
        self.sem.value = 42
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.Z(0)

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_Z_nonzero_int_timeout(self):
        """tests that Z() w/timeout=an int is reasonably accurate"""
        self.sem.value = 42

        # This should raise a busy error
        wait_time = 1
        start = datetime.datetime.now()
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.Z(wait_time)
        end = datetime.datetime.now()
        actual_delta = end - start
        expected_delta = datetime.timedelta(seconds=wait_time)

        self.assertDeltasCloseEnough(actual_delta, expected_delta)

    @unittest.skipUnless(sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED, "Requires Semaphore timeout support")
    def test_Z_nonzero_float_timeout(self):
        """tests that Z() w/timeout=a float is reasonably accurate"""
        self.sem.value = 42

        # This should raise a busy error
        wait_time = 1.5
        start = datetime.datetime.now()
        with self.assertRaises(sysv_ipc.BusyError):
            self.sem.Z(wait_time)
        end = datetime.datetime.now()
        actual_delta = end - start
        expected_delta = datetime.timedelta(seconds=wait_time)

        self.assertDeltasCloseEnough(actual_delta, expected_delta)

    def test_Z_kwargs(self):
        """Ensure Z() takes kwargs as advertised"""
        self.sem.value = 0
        self.sem.block = False
        self.sem.Z(timeout=None)


class TestSemaphoreRemove(SemaphoreTestBase):
    """Exercise sem.remove()"""
    def test_remove(self):
        """tests that sem.remove() works"""
        self.sem.remove()
        with self.assertRaises(sysv_ipc.ExistentialError):
            sysv_ipc.Semaphore(self.sem.key)
        # Wipe this out so that self.tearDown() doesn't crash.
        self.sem = None


class TestSemaphorePropertiesAndAttributes(SemaphoreTestBase):
    """Exercise props and attrs"""
    def test_property_key(self):
        """exercise Semaphore.key"""
        self.assertGreaterEqual(self.sem.key, sysv_ipc.KEY_MIN)
        self.assertLessEqual(self.sem.key, sysv_ipc.KEY_MAX)
        self.assertWriteToReadOnlyPropertyFails('key', 42)

    def test_property_id(self):
        """exercise Semaphore.id"""
        self.assertGreaterEqual(self.sem.id, 0)
        self.assertWriteToReadOnlyPropertyFails('id', 42)

    def test_attribute_value(self):
        """exercise Semaphore.value"""
        # test read, although this has been tested very thoroughly above
        self.assertEqual(self.sem.value, 1)
        self.sem.value = 42
        self.assertEqual(self.sem.value, 42)

    def test_attribute_block(self):
        """exercise Semaphore.block"""
        # tested for semantics above, here I just test that it can be read.
        self.assertEqual(self.sem.block, True)
        self.sem.block = False
        self.assertEqual(self.sem.block, False)

    def test_attribute_uid(self):
        """exercise Semaphore.uid"""
        self.assertEqual(self.sem.uid, os.geteuid())

    def test_attribute_gid(self):
        """exercise Semaphore.gid"""
        self.assertEqual(self.sem.gid, os.getgid())

    def test_attribute_cuid(self):
        """exercise Semaphore.cuid"""
        self.assertEqual(self.sem.cuid, os.geteuid())
        self.assertWriteToReadOnlyPropertyFails('cuid', 42)

    def test_attribute_cgid(self):
        """exercise Semaphore.cgid"""
        self.assertEqual(self.sem.cgid, os.getgid())
        self.assertWriteToReadOnlyPropertyFails('cgid', 42)

    def test_attribute_last_pid(self):
        """exercise Semaphore.last_pid"""
        # According to the POSIX spec, only an operation (P(), V(), or Z()) should set last_pid,
        # and under FreeBSD that's true. However, under Linux and OS X, setting the semaphore's
        # value also changes last_pid. I believe this is incorrect behavior, and I've filed a
        # bug against the Linux kernel: https://bugzilla.kernel.org/show_bug.cgi?id=112271
        self.sem.release()
        self.assertEqual(self.sem.last_pid, os.getpid())
        self.assertWriteToReadOnlyPropertyFails('last_pid', 42)

    def test_attribute_waiting_for_nonzero(self):
        """exercise Semaphore.waiting_for_nonzero"""
        self.assertEqual(self.sem.waiting_for_nonzero, 0)
        self.assertWriteToReadOnlyPropertyFails('waiting_for_nonzero', 42)

    def test_attribute_waiting_for_zero(self):
        """exercise Semaphore.waiting_for_zero"""
        self.assertEqual(self.sem.waiting_for_zero, 0)
        self.assertWriteToReadOnlyPropertyFails('waiting_for_zero', 42)

    def test_attribute_o_time(self):
        """exercise Semaphore.o_time"""
        sem = sysv_ipc.Semaphore(None, sysv_ipc.IPC_CREX)
        self.assertEqual(self.sem.o_time, 0)
        # sem.release() will set o_time.
        sem.release()
        # I can't know precisely when o_time was set, but there should be < 10 seconds between
        # the sem.release() line above and the assertion below.
        self.assertLess(time.time() - sem.o_time, 10)
        self.assertWriteToReadOnlyPropertyFails('o_time', 42)
        sem.remove()


if __name__ == '__main__':
    unittest.main()