File: test__socket_timeout.py

package info (click to toggle)
python-gevent 1.0.1-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 9,948 kB
  • ctags: 12,954
  • sloc: python: 39,061; ansic: 26,289; sh: 13,582; makefile: 833; awk: 18
file content (44 lines) | stat: -rw-r--r-- 1,258 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import sys
import gevent
from gevent import socket
import greentest


class Test(greentest.TestCase):

    def start(self):
        self.server = socket.socket()
        self.server.bind(('127.0.0.1', 0))
        self.server.listen(1)
        self.server_port = self.server.getsockname()[1]
        self.acceptor = gevent.spawn(self.server.accept)

    def stop(self):
        self.server.close()
        self.acceptor.kill()
        del self.acceptor
        del self.server

    def test(self):
        self.start()
        try:
            sock = socket.socket()
            sock.connect(('127.0.0.1', self.server_port))
            try:
                sock.settimeout(0.1)
                try:
                    result = sock.recv(1024)
                    raise AssertionError('Expected timeout to be raised, instead recv() returned %r' % (result, ))
                except socket.error:
                    ex = sys.exc_info()[1]
                    self.assertEqual(ex.args, ('timed out',))
                    self.assertEqual(str(ex), 'timed out')
                    self.assertEqual(ex[0], 'timed out')
            finally:
                sock.close()
        finally:
            self.stop()


if __name__ == '__main__':
    greentest.main()