File: test__example_echoserver.py

package info (click to toggle)
python-gevent 1.0.1-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 9,948 kB
  • ctags: 12,954
  • sloc: python: 39,061; ansic: 26,289; sh: 13,582; makefile: 833; awk: 18
file content (28 lines) | stat: -rw-r--r-- 874 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from __future__ import with_statement
from gevent.socket import create_connection, timeout
from unittest import main
import gevent

import util


class Test(util.TestServer):
    server = 'echoserver.py'

    def _run_all_tests(self):
        def test_client(message):
            conn = create_connection(('127.0.0.1', 6000)).makefile(bufsize=1)
            welcome = conn.readline()
            assert 'Welcome' in welcome, repr(welcome)
            conn.write(message)
            received = conn.read(len(message))
            self.assertEqual(received, message)
            conn._sock.settimeout(0.1)
            self.assertRaises(timeout, conn.read, 1)
        client1 = gevent.spawn(test_client, 'hello\r\n')
        client2 = gevent.spawn(test_client, 'world\r\n')
        gevent.joinall([client1, client2], raise_error=True)


if __name__ == '__main__':
    main()