File: test__socket_timeout.py

package info (click to toggle)
python-gevent 0.13.6-1%2Bnmu3
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 2,324 kB
  • sloc: python: 13,296; makefile: 95; ansic: 37
file content (38 lines) | stat: -rw-r--r-- 1,082 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import gevent
from gevent import socket
import unittest


class Test(unittest.TestCase):

    def setUp(self):
        self.server = socket.socket()
        self.server.bind(('127.0.0.1', 0))
        self.server.listen(1)
        self.server_port = self.server.getsockname()[1]
        self.acceptor = gevent.spawn(self.server.accept)

    def tearDown(self):
        self.server.close()
        self.acceptor.kill()
        del self.acceptor
        del self.server

    def test(self):
        sock = socket.socket()
        sock.connect(('127.0.0.1', self.server_port))
        try:
            sock.settimeout(0.1)
            try:
                result = sock.recv(1024)
                raise AssertionError('Expected timeout to be raised, instead recv() returned %r' % (result, ))
            except socket.error, ex:
                self.assertEqual(ex.args, ('timed out',))
                self.assertEqual(str(ex), 'timed out')
                self.assertEqual(ex[0], 'timed out')
        finally:
            sock.close()


if __name__ == '__main__':
    unittest.main()