File: xtest__server_close.py

package info (click to toggle)
python-gevent 1.0.1-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 9,948 kB
  • ctags: 12,954
  • sloc: python: 39,061; ansic: 26,289; sh: 13,582; makefile: 833; awk: 18
file content (69 lines) | stat: -rw-r--r-- 2,090 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import unittest
from gevent import socket
import gevent
import errno
import sys
import os
from test__server import SimpleStreamServer


class Test(unittest.TestCase):

    ServerSubClass = SimpleStreamServer

    def makefile(self, timeout=0.1, bufsize=1):
        sock = socket.create_connection((self.server.server_host, self.server.server_port))
        sock.settimeout(timeout)
        return sock.makefile(bufsize=bufsize)

    def assertConnectionRefused(self):
        try:
            conn = self.makefile()
            raise AssertionError('Connection was not refused: %r' % (conn._sock, ))
        except socket.error:
            ex = sys.exc_info()[1]
            if ex.args[0] != errno.ECONNREFUSED:
                raise

    def assertRequestSucceeded(self):
        conn = self.makefile()
        conn.write('GET /ping HTTP/1.0\r\n\r\n')
        result = conn.read()
        assert result.endswith('\r\n\r\nPONG'), repr(result)

    def init_server(self):
        self.server = self.ServerSubClass(('127.0.0.1', 0))
        self.server.start()
        gevent.sleep(0.01)

    def test_socket_shutdown(self):
        self.init_server()
        self.server.socket.shutdown(socket.SHUT_RDWR)
        self.assertConnectionRefused()
        assert not self.server.started, self.server

    def test_socket_close(self):
        self.server = self.ServerSubClass(('127.0.0.1', 0))
        self.server.start()
        self.server.socket.close()
        self.assertConnectionRefused()
        #assert not self.server.started

    def test_socket_close_fileno(self):
        self.server = self.ServerSubClass(('127.0.0.1', 0))
        self.server.start()
        os.close(self.server.socket.fileno())
        self.assertConnectionRefused()
        #assert not self.server.started

    def test_socket_file(self):
        self.server = self.ServerSubClass(('127.0.0.1', 0))
        self.server.start()
        os.close(self.server.socket.fileno())
        f = open("/dev/zero", "r")
        self.assertConnectionRefused()
        del f


if __name__ == '__main__':
    unittest.main()