1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
import os
import errno
import py
from rpython.rlib.rsocket import *
from rpython.rlib.rpoll import *
from rpython.rtyper.test.test_llinterp import interpret
if os.name == 'nt':
has_poll = False
else:
has_poll = True
def setup_module(mod):
rsocket_startup()
def one_in_event(events, fd):
assert len(events) == 1
assert events[0][0] == fd
assert events[0][1] & POLLIN
def one_out_event(events, fd):
assert len(events) == 1
assert events[0][0] == fd
assert events[0][1] & POLLOUT
@py.test.mark.skipif('has_poll')
def test_no_poll():
try:
poll
except NameError:
pass
else:
assert False
@py.test.mark.skipif('not has_poll')
def test_simple():
serv = RSocket(AF_INET, SOCK_STREAM)
serv.bind(INETAddress('127.0.0.1', INADDR_ANY))
serv.listen(1)
servaddr = serv.getsockname()
events = poll({serv.fd: POLLIN}, timeout=100)
assert len(events) == 0
cli = RSocket(AF_INET, SOCK_STREAM)
cli.setblocking(False)
err = cli.connect_ex(servaddr)
assert err != 0
events = poll({serv.fd: POLLIN}, timeout=500)
one_in_event(events, serv.fd)
servconn_fd, cliaddr = serv.accept()
servconn = RSocket(AF_INET, fd=servconn_fd)
events = poll({serv.fd: POLLIN,
cli.fd: POLLOUT}, timeout=500)
one_out_event(events, cli.fd)
err = cli.connect_ex(servaddr)
# win32: returns WSAEISCONN when the connection finally succeed.
# Mac OS/X: returns EISCONN.
assert (err == 0 or err == 10056 or
err == getattr(errno, 'EISCONN', '???'))
events = poll({servconn.fd: POLLIN,
cli.fd: POLLIN}, timeout=100)
assert len(events) == 0
events = poll({servconn.fd: POLLOUT,
cli.fd: POLLOUT}, timeout=100)
assert len(events) >= 1
cli.close()
servconn.close()
serv.close()
@py.test.mark.skipif('not has_poll')
def test_exchange():
serv = RSocket(AF_INET, SOCK_STREAM)
serv.bind(INETAddress('127.0.0.1', INADDR_ANY))
serv.listen(1)
servaddr = serv.getsockname()
events = poll({serv.fd: POLLIN}, timeout=100)
assert len(events) == 0
cli = RSocket(AF_INET, SOCK_STREAM)
cli.setblocking(True)
err = cli.connect_ex(servaddr)
assert err == 0
events = poll({serv.fd: POLLIN}, timeout=500)
one_in_event(events, serv.fd)
servconn_fd, cliaddr = serv.accept()
servconn = RSocket(AF_INET, fd=servconn_fd)
events = poll({serv.fd: POLLIN,
cli.fd: POLLOUT}, timeout=500)
one_out_event(events, cli.fd)
#send some data
events = poll({cli.fd: POLLOUT}, timeout=500)
one_out_event(events, cli.fd)
cli.send("g'day, mate")
events = poll({servconn.fd: POLLIN}, timeout=500)
one_in_event(events, servconn.fd)
answer = servconn.recv(1024)
assert answer == "g'day, mate"
#send a reply
events = poll({servconn.fd: POLLOUT}, timeout=500)
one_out_event(events, servconn.fd)
servconn.send("you mean hello?")
events = poll({cli.fd: POLLIN}, timeout=500)
one_in_event(events, cli.fd)
answer = cli.recv(1024)
assert answer == "you mean hello?"
#send more data
events = poll({cli.fd: POLLOUT}, timeout=500)
one_out_event(events, cli.fd)
cli.send("sorry, wrong channel")
events = poll({servconn.fd: POLLIN}, timeout=500)
one_in_event(events, servconn.fd)
answer = servconn.recv(1024)
assert answer == "sorry, wrong channel"
events = poll({servconn.fd: POLLOUT}, timeout=500)
one_out_event(events, servconn.fd)
servconn.send("np bye")
events = poll({cli.fd: POLLIN}, timeout=500)
one_in_event(events, cli.fd)
answer = cli.recv(1024)
assert answer == "np bye"
cli.close()
servconn.close()
serv.close()
def test_select():
if os.name == 'nt':
py.test.skip('cannot select on file handles on windows')
def f():
readend, writeend = os.pipe()
try:
iwtd, owtd, ewtd = select([readend], [], [], 0.0)
assert iwtd == owtd == ewtd == []
os.write(writeend, 'X')
iwtd, owtd, ewtd = select([readend], [], [])
assert iwtd == [readend]
assert owtd == ewtd == []
finally:
os.close(readend)
os.close(writeend)
f()
interpret(f, [])
def test_select_timeout():
if os.name == 'nt':
py.test.skip('cannot select on file handles on windows')
from time import time
def f():
# once there was a bug where the sleeping time was doubled
a = time()
iwtd, owtd, ewtd = select([], [], [], 5.0)
diff = time() - a
assert 4.8 < diff < 9.0
interpret(f, [])
def test_translate_select():
from rpython.translator.c.test.test_genc import compile
def func():
select([], [], [], 0.0)
compile(func, [])
@py.test.mark.skipif('not has_poll')
def test_translate_poll():
from rpython.translator.c.test.test_genc import compile
def func():
poll({})
compile(func, [])
|