File: test_message_queues.py

package info (click to toggle)
python-sysv-ipc 1.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 540 kB
  • sloc: ansic: 3,140; python: 1,960; makefile: 8; sh: 4
file content (346 lines) | stat: -rw-r--r-- 14,205 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# Python imports
import unittest
import time
import os
import numbers
import sys

# Project imports
import sysv_ipc
from .base import Base, make_key, sleep_past_granularity

# Not tested --
# - mode seems to be settable and readable, but ignored by the OS


class MessageQueueTestBase(Base):
    """base class for MessageQueue test classes"""
    def setUp(self):
        self.mq = sysv_ipc.MessageQueue(None, sysv_ipc.IPC_CREX)

    def tearDown(self):
        if self.mq:
            self.mq.remove()

    def assertWriteToReadOnlyPropertyFails(self, property_name, value):
        """test that writing to a readonly property raises TypeError"""
        Base.assertWriteToReadOnlyPropertyFails(self, self.mq, property_name, value)


class TestMessageQueueCreation(MessageQueueTestBase):
    """Exercise stuff related to creating MessageQueue"""

    def test_no_flags(self):
        """tests that opening a MessageQueue with no flags opens the existing
        MessageQueue and doesn't create a new MessageQueue"""
        mem_copy = sysv_ipc.MessageQueue(self.mq.key)
        self.assertEqual(self.mq.key, mem_copy.key)

    def test_IPC_CREAT_existing(self):
        """tests sysv_ipc.IPC_CREAT to open an existing MessageQueue without IPC_EXCL"""
        mem_copy = sysv_ipc.MessageQueue(self.mq.key, sysv_ipc.IPC_CREAT)

        self.assertEqual(self.mq.key, mem_copy.key)

    def test_IPC_CREAT_new(self):
        """tests sysv_ipc.IPC_CREAT to create a new MessageQueue without IPC_EXCL"""
        # I can't pass None for the name unless I also pass IPC_EXCL.
        key = make_key()

        # Note: this method of finding an unused key is vulnerable to a race
        # condition. It's good enough for test, but don't copy it for use in
        # production code!
        key_is_available = False
        while not key_is_available:
            try:
                mem = sysv_ipc.MessageQueue(key)
                mem.detach()
                mem.remove()
            except sysv_ipc.ExistentialError:
                key_is_available = True
            else:
                key = make_key()

        mq = sysv_ipc.MessageQueue(key, sysv_ipc.IPC_CREAT)

        self.assertIsNotNone(mq)

        mq.remove()

    def test_IPC_EXCL(self):
        """tests IPC_CREAT | IPC_EXCL prevents opening an existing MessageQueue"""
        with self.assertRaises(sysv_ipc.ExistentialError):
            sysv_ipc.MessageQueue(self.mq.key, sysv_ipc.IPC_CREX)

    def test_randomly_generated_key(self):
        """tests that the randomly-generated key works"""
        # This is tested implicitly elsewhere but I want to test it explicitly
        mq = sysv_ipc.MessageQueue(None, sysv_ipc.IPC_CREX)
        self.assertIsNotNone(mq.key)
        self.assertGreaterEqual(mq.key, sysv_ipc.KEY_MIN)
        self.assertLessEqual(mq.key, sysv_ipc.KEY_MAX)
        mq.remove()

    # don't bother testing mode, it's ignored by the OS?

    def test_default_flags(self):
        """tests that the flag is 0 by default (==> open existing)"""
        mq = sysv_ipc.MessageQueue(self.mq.key)
        self.assertEqual(self.mq.id, mq.id)

    def test_kwargs(self):
        """ensure init accepts keyword args as advertised"""
        # mode 0x180 = 0600. Octal is difficult to express in Python 2/3 compatible code.
        mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX, mode=0x180,
                                   max_message_size=256)
        mq.remove()


class TestMessageQueueSendReceive(MessageQueueTestBase):
    """Exercise send() and receive()"""
    def test_simple_send_receive(self):
        test_string = b'abcdefg'
        self.mq.send(test_string)
        self.assertEqual(self.mq.receive(), (test_string, 1))

    def test_message_type_send(self):
        """test the msg type param of send()"""
        test_string = b'abcdefg'
        self.mq.send(test_string, type=2)
        self.assertEqual(self.mq.receive(), (test_string, 2))

        with self.assertRaises(ValueError):
            self.mq.send(test_string, type=-1)

    def test_message_type_receive_default_order(self):
        """test that receive() doesn't filter by type by default"""
        for i in range(1, 4):
            self.mq.send('type' + str(i), type=i)

        # default order is FIFO.
        self.assertEqual(self.mq.receive(), (b'type1', 1))
        self.assertEqual(self.mq.receive(), (b'type2', 2))
        self.assertEqual(self.mq.receive(), (b'type3', 3))

        self.assertEqual(self.mq.current_messages, 0)

    # The bug referenced below affects use of a negative type. Supposedly it's only on 32 binaries
    # running on 64 bit systems, but I see it using 64-bit Python under 64-bit Linux.
    # A less demanding version of this test follows so Linux doesn't go entirely untested.
    @unittest.skipIf(sys.platform.startswith('linux'),
                     'msgrcv() buggy on Linux: https://bugzilla.kernel.org/show_bug.cgi?id=94181')
    def test_message_type_receive_specific_order(self):
        # Place messsages in Q w/highest type first
        for i in range(4, 0, -1):
            self.mq.send('type' + str(i), type=i)

        # receive(type=-2) should get "the first message of the lowest type that is <= the absolute
        # value of type."
        self.assertEqual(self.mq.receive(type=-2), (b'type2', 2))

        # receive(type=3) should get "the first message of that type."
        self.assertEqual(self.mq.receive(type=3), (b'type3', 3))

        # Ensure the others are still there.
        self.assertEqual(self.mq.receive(), (b'type4', 4))
        self.assertEqual(self.mq.receive(), (b'type1', 1))

        self.assertEqual(self.mq.current_messages, 0)

    def test_message_type_receive_specific_order_no_negative_type(self):
        """test that receive() filters appropriately on positive msg type (softer test for Linux)"""
        # Place messsages in Q w/highest type first
        for i in range(4, 0, -1):
            self.mq.send('type' + str(i), type=i)

        # receive(type=3) should get "the first message of that type."
        self.assertEqual(self.mq.receive(type=3), (b'type3', 3))

        # Ensure the others are still there.
        self.assertEqual(self.mq.receive(), (b'type4', 4))
        self.assertEqual(self.mq.receive(), (b'type2', 2))
        self.assertEqual(self.mq.receive(), (b'type1', 1))

        self.assertEqual(self.mq.current_messages, 0)

    def test_send_non_blocking(self):
        """Test that send(block=False) raises BusyError as appropriate"""
        # This is a bit tricky since the OS has its own ideas about when the queue is full.
        # I would like to fill it precisely to the brim and then make one last call to send(),
        # but I don't know exactly when the OS will decide the Q is full. Instead, I just keep
        # stuffing messages in until I get some kind of an error. If it's a BusyError, all is well.
        done = False

        while not done:
            try:
                self.mq.send('x', block=False)
            except sysv_ipc.BusyError:
                done = True

    def test_receive_non_blocking(self):
        """Test that receive(block=False) raises BusyError as appropriate"""
        with self.assertRaises(sysv_ipc.BusyError):
            self.mq.receive(block=False)

        self.mq.send('x', type=3)
        with self.assertRaises(sysv_ipc.BusyError):
            self.mq.receive(type=2, block=False)

    def test_ascii_null(self):
        """ensure I can send & receive 0x00"""
        test_string = b'abc' + bytes(0) + b'def'
        self.mq.send(test_string)
        self.assertEqual(self.mq.receive(), (test_string, 1))

    def test_utf8(self):
        """Test writing encoded Unicode"""
        test_string = 'G' + '\u00F6' + 'teborg'
        test_string = test_string.encode('utf-8')
        self.mq.send(test_string)
        self.assertEqual(self.mq.receive(), (test_string, 1))

    def test_send_kwargs(self):
        """ensure send() accepts keyword args as advertised"""
        self.mq.send(b'x', block=True, type=1)

    def test_receive_kwargs(self):
        """ensure receive() accepts keyword args as advertised"""
        self.mq.send(b'x', block=True, type=1)
        self.mq.receive(block=False, type=0)

    def test_max_message_size_respected(self):
        '''ensure the max_message_size param is respected'''
        mq = sysv_ipc.MessageQueue(None, sysv_ipc.IPC_CREX, max_message_size=10)

        with self.assertRaises(ValueError):
            mq.send(b' ' * 11, block=False)

        mq.remove()


class TestMessageQueueRemove(MessageQueueTestBase):
    """Exercise mq.remove()"""
    def test_remove(self):
        """tests that mq.remove() works"""
        self.mq.remove()
        with self.assertRaises(sysv_ipc.ExistentialError):
            sysv_ipc.MessageQueue(self.mq.key)
        # Wipe this out so that self.tearDown() doesn't crash.
        self.mq = None


class TestMessageQueuePropertiesAndAttributes(MessageQueueTestBase):
    """Exercise props and attrs"""
    def test_property_key(self):
        """exercise MessageQueue.key"""
        self.assertGreaterEqual(self.mq.key, sysv_ipc.KEY_MIN)
        self.assertLessEqual(self.mq.key, sysv_ipc.KEY_MAX)
        self.assertWriteToReadOnlyPropertyFails('key', 42)

    # The POSIX spec says "msgget() shall return a non-negative integer", but OS X sometimes
    # returns a negative number like -1765146624. My guess is that they're using a UINT somewhere
    # which exceeds INT_MAX and hence looks negative, or they just don't care about the spec.
    # msgget() ref: http://pubs.opengroup.org/onlinepubs/009695399/functions/msgget.html
    @unittest.skipIf(sys.platform.startswith('darwin'),
                     'OS X message queues sometimes return negative ids')
    def test_property_id(self):
        """exercise MessageQueue.id"""
        self.assertGreaterEqual(self.mq.id, 0)
        self.assertWriteToReadOnlyPropertyFails('id', 42)

    def test_property_id_weak_for_darwin(self):
        """exercise MessageQueue.id with the Darwin-failing test removed"""
        self.assertIsInstance(self.mq.id, numbers.Integral)
        self.assertWriteToReadOnlyPropertyFails('id', 42)

    def test_attribute_max_size(self):
        """exercise MessageQueue.max_size"""
        self.assertGreaterEqual(self.mq.max_size, 0)
        # writing to max_size should not fail, and I test that here. However, as documented,
        # setting it is no guarantee that the OS will respect it. Caveat emptor.
        self.mq.max_size = 2048

    def test_property_last_send_time(self):
        """exercise MessageQueue.last_send_time"""
        self.assertEqual(self.mq.last_send_time, 0)
        # I can't record exactly when this send() happens, but as long as it is within 5 seconds
        # of the assertion happening, this test will pass.
        self.mq.send('x')
        self.assertLess(self.mq.last_send_time - time.time(), 5)
        self.assertWriteToReadOnlyPropertyFails('last_send_time', 42)

    def test_property_last_receive_time(self):
        """exercise MessageQueue.last_receive_time"""
        self.assertEqual(self.mq.last_receive_time, 0)
        self.mq.send('x')
        self.mq.receive()
        # I can't record exactly when this send() happens, but as long as it is within 5 seconds
        # of the assertion happening, this test will pass.
        self.assertLess(self.mq.last_receive_time - time.time(), 5)
        self.assertWriteToReadOnlyPropertyFails('last_receive_time', 42)

    def test_property_last_change_time(self):
        """exercise MessageQueue.last_change_time"""
        # Note that last_change_time doesn't start out as 0 (unlike e.g. last_receive_time), so
        # I don't test that here.
        original_last_change_time = self.mq.last_change_time
        sleep_past_granularity()
        # This might seem like a no-op, but setting the UID to any value triggers a call that
        # should set last_change_time.
        self.mq.uid = self.mq.uid
        self.assertLess(self.mq.last_change_time - time.time(), 5)
        # Ensure the time actually changed.
        self.assertNotEqual(self.mq.last_change_time, original_last_change_time)
        self.assertWriteToReadOnlyPropertyFails('last_change_time', 42)

    def test_property_last_send_pid(self):
        """exercise MessageQueue.last_send_pid"""
        self.assertEqual(self.mq.last_send_pid, 0)
        self.mq.send('x')
        self.assertEqual(self.mq.last_send_pid, os.getpid())
        self.assertWriteToReadOnlyPropertyFails('last_send_pid', 42)

    def test_property_last_receive_pid(self):
        """exercise MessageQueue.last_receive_pid"""
        self.assertEqual(self.mq.last_receive_pid, 0)
        self.mq.send('x')
        self.mq.receive()
        self.assertEqual(self.mq.last_receive_pid, os.getpid())
        self.assertWriteToReadOnlyPropertyFails('last_receive_pid', 42)

    def test_property_current_messages(self):
        """exercise MessageQueue.current_messages"""
        self.assertEqual(self.mq.current_messages, 0)
        self.mq.send('x')
        self.mq.send('x')
        self.mq.send('x')
        self.assertEqual(self.mq.current_messages, 3)
        self.mq.receive()
        self.assertEqual(self.mq.current_messages, 2)
        self.mq.receive()
        self.assertEqual(self.mq.current_messages, 1)
        self.mq.receive()
        self.assertEqual(self.mq.current_messages, 0)
        self.assertWriteToReadOnlyPropertyFails('current_messages', 42)

    def test_attribute_uid(self):
        """exercise MessageQueue.uid"""
        self.assertEqual(self.mq.uid, os.geteuid())

    def test_attribute_gid(self):
        """exercise MessageQueue.gid"""
        self.assertEqual(self.mq.gid, os.getgid())

    def test_attribute_cuid(self):
        """exercise MessageQueue.cuid"""
        self.assertEqual(self.mq.cuid, os.geteuid())
        self.assertWriteToReadOnlyPropertyFails('cuid', 42)

    def test_attribute_cgid(self):
        """exercise MessageQueue.cgid"""
        self.assertEqual(self.mq.cgid, os.getgid())
        self.assertWriteToReadOnlyPropertyFails('cgid', 42)


if __name__ == '__main__':
    unittest.main()