File: test_select_new.py

package info (click to toggle)
jython 2.5.3-16%2Bdeb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 43,772 kB
  • ctags: 106,434
  • sloc: python: 351,322; java: 216,349; xml: 1,584; sh: 330; perl: 114; ansic: 102; makefile: 45
file content (246 lines) | stat: -rw-r--r-- 7,698 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
AMAK: 20050515: This module is a brand new test_select module, which gives much wider coverage.
"""

import errno
import time
from test import test_support
import unittest

import socket
import select

SERVER_ADDRESS = ("localhost", 54321)

DATA_CHUNK_SIZE = 1000
DATA_CHUNK = "." * DATA_CHUNK_SIZE

#
# The timing of these tests depends on the how the unerlying OS socket library
# handles buffering. These values may need tweaking for different platforms
#
# The fundamental problem is that there is no reliable way to fill a socket with bytes
#

if test_support.is_jython:
    SELECT_TIMEOUT = 0
else:
    # zero select timeout fails these tests on cpython (on windows 2003 anyway)
    SELECT_TIMEOUT = 0.001

READ_TIMEOUT = 5

class AsynchronousServer:

    def __init__(self):
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setblocking(0)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind(SERVER_ADDRESS)
        self.server_socket.listen(5)
        try:
            self.server_socket.accept()
        except socket.error:
            pass

    def select_acceptable(self):
        return select.select([self.server_socket], [self.server_socket], [], SELECT_TIMEOUT)[0]

    def verify_acceptable(self):
        start = time.time()
        while True:
            if self.select_acceptable():
                return
            elif (time.time() - start) > READ_TIMEOUT:
                raise Exception('Server socket did not accept in time')
            time.sleep(0.1)

    def verify_not_acceptable(self):
        assert not self.select_acceptable(), "Server socket should not be acceptable"

    def accept(self):
        self.verify_acceptable()
        new_socket, address = self.server_socket.accept()
        return AsynchronousHandler(new_socket)

    def close(self):
        self.server_socket.close()


class AsynchronousHandler:

    def __init__(self, new_socket):
        self.socket = new_socket
        self.socket.setblocking(0)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    def write(self):
        """
        Writes on this channel until select no longer reports it as writable.

        Returns the number of bytes written
        """
        total_bytes = 0
        while 1:
            try:
                if self.select_writable():
                    bytes_sent = self.socket.send(DATA_CHUNK)
                    total_bytes += bytes_sent
                else:
                    return total_bytes
            except socket.error, se:
                if se.value == 10035:
                    continue
                raise se

    def read(self, expected):
        """
        Reads expected bytes from this socket

        An Exception is raised if expected bytes aren't read before READ_TIMEOUT
        is reached.
        """
        results = ""
        start = time.time()
        while 1:
            if self.select_readable():
                recvd_bytes = self.socket.recv(expected - len(results))
                if len(recvd_bytes):
                    results += recvd_bytes
                if len(results) == expected:
                    return results
            else:
                stop = time.time()
                if (stop - start) > READ_TIMEOUT:
                    raise Exception("Got %d bytes but %d bytes were written."  %
                                    (len(results), expected))

    def select_readable(self):
        return select.select([self.socket], [], [], SELECT_TIMEOUT)[0]

    def verify_readable(self):
        assert self.select_readable(), "Socket should be ready for reading"

    def verify_not_readable(self):
        assert not self.select_readable(), "Socket should not be ready for reading"

    def select_writable(self):
        return select.select([], [self.socket], [], SELECT_TIMEOUT)[1]

    def verify_writable(self):
        assert self.select_writable(), "Socket should be ready for writing"

    def verify_not_writable(self):
        assert not self.select_writable(), "Socket should not be ready for writing"

    def verify_only_writable(self):
        self.verify_writable()
        self.verify_not_readable()

    def close(self):
        self.socket.close()

class AsynchronousClient(AsynchronousHandler):

    def __init__(self):
        AsynchronousHandler.__init__(self, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
        self.connected = 0

    def start_connect(self):
        result = self.socket.connect_ex(SERVER_ADDRESS)
        if result == errno.EISCONN:
            self.connected = 1
        else:
            assert result == errno.EINPROGRESS

    def finish_connect(self):
        if self.connected:
            return
        start = time.time()
        while True:
            self.start_connect()
            if self.connected:
                break
            elif (time.time() - start) > READ_TIMEOUT:
                raise Exception('Client socket incomplete connect')
            time.sleep(0.1)

class TestSelectOnAccept(unittest.TestCase):
    def setUp(self):
        self.server = AsynchronousServer()
        self.client = AsynchronousClient()
        self.handler = None

    def testSelectOnAccept(self):
        self.server.verify_not_acceptable()
        self.client.start_connect()
        self.server.verify_acceptable()
        self.handler = self.server.accept()
        self.client.finish_connect()
        self.server.verify_not_acceptable()

    def tearDown(self):
        self.client.close()
        if self.handler:
            self.handler.close()
        self.server.close()

class TestSelect(unittest.TestCase):
    def setUp(self):
        self.server = AsynchronousServer()
        self.client = AsynchronousClient()
        self.client.start_connect()
        self.handler = self.server.accept()
        self.client.finish_connect()

    def tearDown(self):
        self.client.close()
        self.handler.close()
        self.server.close()

    def testClientOut(self):
        self.client.verify_only_writable()
        self.handler.verify_only_writable()

        written = self.client.write()
        self.handler.verify_readable()

        self.handler.read(written/2)
        self.handler.verify_readable()

        self.handler.read(written/2)
        self.handler.verify_not_readable()

    def testHandlerOut(self):
        written = self.handler.write()
        self.client.verify_readable()

        self.client.read(written/2)
        self.client.verify_readable()

        self.client.read(written/2)
        self.client.verify_not_readable()

    def testBothOut(self):
        client_written = self.client.write()
        handler_written = self.handler.write()
        self.client.verify_readable()
        self.handler.verify_readable()

        self.client.read(handler_written/2)
        self.handler.read(client_written/2)
        self.client.verify_readable()
        self.handler.verify_readable()

        self.client.read(handler_written/2)
        self.handler.read(client_written/2)
        self.client.verify_only_writable()
        self.handler.verify_only_writable()

def test_main():
    tests = [TestSelect, TestSelectOnAccept]
    suites = [unittest.makeSuite(klass, 'test') for klass in tests]
    test_support.run_suite(unittest.TestSuite(suites))    

if __name__ == "__main__":
    test_main()