File: test_select.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (405 lines) | stat: -rw-r--r-- 14,513 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import sys
import py

from pypy.interpreter.error import OperationError


class _AppTestSelect:
    def test_sleep(self):
        """
        The timeout parameter to select.select specifies the approximate
        maximum amount of time for that function to block before it returns
        to report that no results are available.
        """
        import time, select
        readend, writeend = self.getpair()
        try:
            start = time.time()
            iwtd, owtd, ewtd = select.select([readend], [], [], 0.3)
            end = time.time()
            assert iwtd == owtd == ewtd == []
            assert end - start > 0.25
        finally:
            readend.close()
            writeend.close()
        raises(ValueError, select.select, [], [], [], -1)

    def test_list_tuple(self):
        import time, select
        readend, writeend = self.getpair()
        try:
            iwtd, owtd, ewtd = select.select([readend], (), (), .3)
        finally:
            readend.close()
            writeend.close()

    def test_readable(self):
        """
        select.select returns elements from the "read list" (the first
        parameter) which may have data available to be read.
        """
        import select
        readend, writeend = self.getpair()
        try:
            iwtd, owtd, ewtd = select.select([readend], [], [], 0)
            assert iwtd == owtd == ewtd == []
            writeend.send(b'X')
            iwtd, owtd, ewtd = select.select([readend], [], [])
            assert iwtd == [readend]
            assert owtd == ewtd == []
        finally:
            writeend.close()
            readend.close()

    def test_writable(self):
        """
        select.select returns elements from the "write list" (the second
        parameter) on which a write/send may be possible.
        """
        import select
        readend, writeend = self.getpair()
        try:
            iwtd, owtd, ewtd = select.select([], [writeend], [], 0)
            assert iwtd == ewtd == []
            assert owtd == [writeend]
        finally:
            writeend.close()
            readend.close()

    def test_write_read(self):
        """
        select.select returns elements from the "write list" (the second
        parameter) on which a write/send may be possible.  select.select
        returns elements from the "read list" (the first parameter) which
        may have data available to be read. (the second part of this test
        overlaps significantly with test_readable. -exarkun)
        """
        import select
        readend, writeend = self.getpair()
        try:
            total_out = 0
            while True:
                iwtd, owtd, ewtd = select.select([], [writeend], [], 0)
                assert iwtd == ewtd == []
                if owtd == []:
                    break
                assert owtd == [writeend]
                total_out += writeend.send(b'x' * 512)
            total_in = 0
            while total_in < total_out:
                iwtd, owtd, ewtd = select.select([readend], [], [], 5)
                assert owtd == ewtd == []
                assert iwtd == [readend]    # there is more expected
                data = readend.recv(4096)
                assert len(data) > 0
                assert data == b'x' * len(data)
                total_in += len(data)
            assert total_in == total_out
            iwtd, owtd, ewtd = select.select([readend], [], [], 0)
            assert owtd == ewtd == []
            assert iwtd == []    # there is not more expected
        finally:
            writeend.close()
            readend.close()

    def test_write_close(self):
        """
        select.select returns elements from the "read list" (the first
        parameter) which have no data to be read but which have been closed.
        """
        import select, sys
        readend, writeend = self.getpair()
        try:
            try:
                total_out = writeend.send(b'x' * 512)
            finally:
                # win32 sends the 'closed' event immediately, even when
                # more data is available
                if sys.platform != 'win32':
                    writeend.close()
                    import gc; gc.collect()
            assert 1 <= total_out <= 512
            total_in = 0
            while True:
                iwtd, owtd, ewtd = select.select([readend], [], [])
                assert iwtd == [readend]
                assert owtd == ewtd == []
                data = readend.recv(4096)
                if len(data) == 0:
                    break
                assert data == b'x' * len(data)
                total_in += len(data)
                # win32: check that closing the socket exits the loop
                if sys.platform == 'win32' and total_in == total_out:
                    writeend.close()
            assert total_in == total_out
        finally:
            readend.close()

    def test_read_closed(self):
        """
        select.select returns elements from the "read list" (the first
        parameter) which are at eof (even if they are the write end of a
        pipe).
        """
        import select
        readend, writeend = self.getpair()
        try:
            readend.close()
            import gc; gc.collect()
            iwtd, owtd, ewtd = select.select([writeend], [], [], 0)
            assert iwtd == [writeend]
            assert owtd == ewtd == []
        finally:
            writeend.close()

    def test_read_many(self):
        """
        select.select returns only the elements from the "read list" (the
        first parameter) which may have data available to be read.
        (test_readable has a lot of overlap with this test. -exarkun)
        """
        import select
        readends = []
        writeends = []
        try:
            for i in range(10):
                fd1, fd2 = self.getpair()
                readends.append(fd1)
                writeends.append(fd2)
            iwtd, owtd, ewtd = select.select(readends, [], [], 0)
            assert iwtd == owtd == ewtd == []

            for i in range(50):
                n = (i*3) % 10
                writeends[n].send(b'X')
                iwtd, owtd, ewtd = select.select(readends, [], [])
                assert iwtd == [readends[n]]
                assert owtd == ewtd == []
                data = readends[n].recv(1)
                assert data == b'X'

        finally:
            for fd in readends + writeends:
                fd.close()

    def test_read_end_closed(self):
        """
        select.select returns elements from the "write list" (the second
        parameter) when they are not writable but when the corresponding
        read end has been closed. (this test currently doesn't make the
        write end non-writable before testing its selectability. -exarkun)
        """
        import select
        readend, writeend = self.getpair()
        readend.close()
        try:
            iwtd, owtd, ewtd = select.select([writeend], [writeend], [writeend])
            assert iwtd == owtd == [writeend]
            assert ewtd == []
        finally:
            writeend.close()

    def test_select_descr_out_of_bounds(self):
        import sys, select
        raises(ValueError, select.select, [-1], [], [])
        raises(ValueError, select.select, [], [-2], [])
        raises(ValueError, select.select, [], [], [-3])
        if sys.platform != 'win32':
            raises(ValueError, select.select, [2000000000], [], [])
            raises(ValueError, select.select, [], [2000000000], [])
            raises(ValueError, select.select, [], [], [2000000000])

    def test_poll_arguments(self):
        import select
        if not hasattr(select, 'poll'):
            skip("no select.poll() on this platform")
        pollster = select.poll()
        pollster.register(1)
        raises(ValueError, pollster.register, 0, -1)
        raises(OverflowError, pollster.register, 0, 1 << 64)
        pollster.register(0, 32768) # SHRT_MAX + 1
        exc = raises(ValueError, pollster.register, 0, -32768 - 1)
        assert "positive" in str(exc.value)
        exc = raises(OverflowError, pollster.register, 0, 1000000)
        assert "unsigned" in str(exc.value)
        pollster.register(0, 65535) # USHRT_MAX
        raises(OverflowError, pollster.register, 0, 65536) # USHRT_MAX + 1
        raises(OverflowError, pollster.poll, 2147483648) # INT_MAX +  1
        raises(OverflowError, pollster.poll, 4294967296) # UINT_MAX + 1
        exc = raises(TypeError, pollster.poll, '123')
        assert str(exc.value) == 'timeout must be an integer or None'

        raises(ValueError, pollster.modify, 1, -1)
        raises(OverflowError, pollster.modify, 1, 1 << 64)

    def test_select_new_forbidden(self):
        import select
        if not hasattr(select, 'poll'):
            skip("no select.poll() on this platform")
        pollster = select.poll()
        Poll = type(select.poll())
        with raises(TypeError) as info:
            Poll()
        assert "cannot create 'select.poll' instances" in str(info.value)


class AppTestSelectWithPipes(_AppTestSelect):
    "Use a pipe to get pairs of file descriptors"
    spaceconfig = {
        "usemodules": ["select", "time", "thread"]
    }

    def setup_class(cls):
        if sys.platform == 'win32':
            py.test.skip("select() doesn't work with pipes on win32")

    def w_getpair(self):
        # Wraps a file descriptor in an socket-like object
        import os
        class FileAsSocket:
            def __init__(self, fd):
                self.fd = fd
            def fileno(self):
                return self.fd
            def send(self, data):
                return os.write(self.fd, data)
            def recv(self, length):
                return os.read(self.fd, length)
            def close(self):
                return os.close(self.fd)
        s1, s2 = os.pipe()
        return FileAsSocket(s1), FileAsSocket(s2)

    def test_poll(self):
        import select
        if not hasattr(select, 'poll'):
            skip("no select.poll() on this platform")
        readend, writeend = self.getpair()
        try:
            class A(object):
                def fileno(self):
                    return readend.fileno()
            poll = select.poll()
            poll.register(A())

            res = poll.poll(10) # timeout in ms
            assert res == []
            res = poll.poll(1.1) # check floats
            assert res == []

            writeend.send(b"foo!")
            # can't easily test actual blocking, is done in lib-python tests
            res = poll.poll()
            assert res == [(readend.fileno(), 1)]

            # check negative timeout
            # proper test in lib-python, test_poll_blocks_with_negative_ms
            res = poll.poll(-0.001)
            assert res == [(readend.fileno(), 1)]
        finally:
            readend.close()
            writeend.close()

    def test_poll_threaded(self):
        import os, select, _thread as thread, time
        if not hasattr(select, 'poll'):
            skip("no select.poll() on this platform")
        r, w = os.pipe()
        rfds = [os.dup(r) for _ in range(10)]
        try:
            pollster = select.poll()
            for fd in rfds:
                pollster.register(fd, select.POLLIN)

            t = thread.start_new_thread(pollster.poll, ())
            try:
                time.sleep(0.3)
                for i in range(100): print(''),  # to release GIL untranslated
                # trigger ufds array reallocation
                for fd in rfds:
                    pollster.unregister(fd)
                pollster.register(w, select.POLLOUT)
                exc = raises(RuntimeError, pollster.poll)
                assert str(exc.value) == 'concurrent poll() invocation'
            finally:
                # and make the call to poll() from the thread return
                os.write(w, b'spam')
                time.sleep(0.3)
                for i in range(100): print(''),  # to release GIL untranslated
        finally:
            os.close(r)
            os.close(w)
            for fd in rfds:
                os.close(fd)

    def test_resize_list_in_select(self):
        import select
        class Foo(object):
            def fileno(self):
                if len(l) < 100:
                    l.append(Foo())
                return 0
        l = [Foo()]
        select.select(l, (), (), 0)
        assert 1 <= len(l) <= 100    
        # ^^^ CPython gives 100, PyPy gives 1.  I think both are OK as
        # long as there is no crash.

    def test_PIPE_BUF(self):
        # no PIPE_BUF on Windows; this test class is skipped on Windows.
        import select
        assert isinstance(select.PIPE_BUF, int)


class AppTestSelectWithSockets(_AppTestSelect):
    """Same tests with connected sockets.
    socket.socketpair() does not exists on win32,
    so we start our own server.
    """
    spaceconfig = {
        "usemodules": ["select", "_socket", "time", "thread"],
    }

    import os
    if hasattr(os, 'uname') and os.uname()[4] == 's390x':
        py.test.skip("build bot for s390x cannot open sockets")

    def w_make_server(self):
        import _socket
        if hasattr(self, 'sock'):
            return self.sock
        self.sock = _socket.socket()
        try_ports = [1023] + list(range(20000, 30000, 437))
        for port in try_ports:
            print('binding to port %d:' % (port,))
            self.sockaddress = ('127.0.0.1', port)
            try:
                self.sock.bind(self.sockaddress)
                break
            except _socket.error as e:   # should get a "Permission denied"
                print(e)
            else:
                raise(e)

    def w_getpair(self):
        """Helper method which returns a pair of connected sockets."""
        import _socket
        import _thread

        self.make_server()

        self.make_server()

        self.sock.listen(1)
        s2 = _socket.socket()
        _thread.start_new_thread(s2.connect, (self.sockaddress,))
        fd, addr2 = self.sock._accept()
        s1 = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM,
                              proto=0, fileno=fd)

        # speed up the tests that want to fill the buffers
        s1.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, 4096)
        s2.setsockopt(_socket.SOL_SOCKET, _socket.SO_SNDBUF, 4096)

        return s1, s2