File: test__socket.py

package info (click to toggle)
python-gevent 0.13.6-1%2Bnmu3
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 2,324 kB
  • sloc: python: 13,296; makefile: 95; ansic: 37
file content (159 lines) | stat: -rw-r--r-- 5,417 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
import gevent
from gevent import socket
import greentest
import time


class TestTCP(greentest.TestCase):

    TIMEOUT_ERROR = socket.timeout

    def setUp(self):
        greentest.TestCase.setUp(self)
        self.listener = socket.tcp_listener(('127.0.0.1', 0))

    def tearDown(self):
        del self.listener
        greentest.TestCase.tearDown(self)

    def create_connection(self):
        return socket.create_connection(('127.0.0.1', self.listener.getsockname()[1]))

    def test_fullduplex(self):

        def server():
            (client, addr) = self.listener.accept()
            # start reading, then, while reading, start writing. the reader should not hang forever
            N = 100000  # must be a big enough number so that sendall calls trampoline
            sender = gevent.spawn_link_exception(client.sendall, 't' * N)
            result = client.recv(1000)
            assert result == 'hello world', result
            sender.join(0.2)
            sender.kill()
            if client.__class__.__name__ == 'SSLObject':
                # if sslold.SSLObject is not closed then the other end will receive sslerror: (8, 'Unexpected EOF')
                # Not sure if it must be fixed but I don't want to waste time on that since
                # the preferred way to do ssl now is via gevent.ssl which works OK without explicit close
                client.close()

        #print '%s: client' % getcurrent()

        server_proc = gevent.spawn_link_exception(server)
        client = self.create_connection()
        client_reader = gevent.spawn_link_exception(client.makefile().read)
        gevent.sleep(0.001)
        client.send('hello world')

        # close() used to hang
        client.close()

        # this tests "full duplex" bug;
        server_proc.get()

        client_reader.get()

    def test_recv_timeout(self):
        acceptor = gevent.spawn_link_exception(self.listener.accept)
        try:
            client = self.create_connection()
            client.settimeout(0.1)
            start = time.time()
            try:
                data = client.recv(1024)
            except self.TIMEOUT_ERROR:
                assert 0.1 - 0.01 <= time.time() - start <= 0.1 + 0.1, (time.time() - start)
            else:
                raise AssertionError('%s should have been raised, instead recv returned %r' % (self.TIMEOUT_ERROR, data, ))
        finally:
            acceptor.get()

    def test_sendall_timeout(self):
        acceptor = gevent.spawn_link_exception(self.listener.accept)
        try:
            client = self.create_connection()
            client.settimeout(0.1)
            start = time.time()
            send_succeed = False
            data_sent = 'h' * 100000
            try:
                client.sendall(data_sent)
            except self.TIMEOUT_ERROR:
                assert 0.1 - 0.01 <= time.time() - start <= 0.1 + 0.1, (time.time() - start)
            else:
                assert time.time() - start <= 0.1 + 0.01, (time.time() - start)
                send_succeed = True
        finally:
            conn, addr = acceptor.get()
        if send_succeed:
            client.close()
            data_read = conn.makefile().read()
            self.assertEqual(len(data_sent), len(data_read))
            self.assertEqual(data_sent, data_read)
            print '%s: WARNING: read the data instead of failing with timeout' % self.__class__.__name__

    def test_makefile(self):
        def accept_once():
            conn, addr = self.listener.accept()
            fd = conn.makefile()
            conn.close()
            fd.write('hello\n')
            fd.close()

        acceptor = gevent.spawn(accept_once)
        try:
            client = self.create_connection()
            fd = client.makefile()
            client.close()
            assert fd.readline() == 'hello\n'
            assert fd.read() == ''
            fd.close()
        finally:
            acceptor.get()


if hasattr(socket, 'ssl'):

    class TestSSL(TestTCP):

        certfile = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        privfile = os.path.join(os.path.dirname(__file__), 'test_server.key')
        TIMEOUT_ERROR = socket.sslerror

        def setUp(self):
            TestTCP.setUp(self)
            self.listener = ssl_listener(('127.0.0.1', 0), self.privfile, self.certfile)

        def create_connection(self):
            return socket.ssl(socket.create_connection(('127.0.0.1', self.listener.getsockname()[1])))

    def ssl_listener(address, private_key, certificate):
        import _socket
        r = _socket.socket()
        sock = socket.ssl(r, private_key, certificate)
        socket.bind_and_listen(sock, address)
        return sock


class TestCreateConnection(greentest.TestCase):

    def test(self):
        tempsock1 = socket.socket()
        tempsock1.bind(('', 0))
        source_port = tempsock1.getsockname()[1]
        tempsock2 = socket.socket()
        tempsock2.bind(('', 0))
        tempsock2.getsockname()[1]
        tempsock1.close()
        del tempsock1
        tempsock2.close()
        del tempsock2
        try:
            socket.create_connection(('localhost', 4), timeout=30, source_address=('', source_port))
        except socket.error, ex:
            if 'connection refused' not in str(ex).lower():
                raise


if __name__ == '__main__':
    greentest.main()