1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
import signal
import socket
import threading
try:
from time import monotonic
except ImportError:
from time import time as monotonic
import time
import pytest
from urllib3.util.wait import (
_have_working_poll,
poll_wait_for_socket,
select_wait_for_socket,
wait_for_read,
wait_for_socket,
wait_for_write,
)
from .socketpair_helper import socketpair
@pytest.fixture
def spair():
a, b = socketpair()
yield a, b
a.close()
b.close()
variants = [wait_for_socket, select_wait_for_socket]
if _have_working_poll():
variants.append(poll_wait_for_socket)
@pytest.mark.parametrize("wfs", variants)
def test_wait_for_socket(wfs, spair):
a, b = spair
with pytest.raises(RuntimeError):
wfs(a, read=False, write=False)
assert not wfs(a, read=True, timeout=0)
assert wfs(a, write=True, timeout=0)
b.send(b"x")
assert wfs(a, read=True, timeout=0)
assert wfs(a, read=True, timeout=10)
assert wfs(a, read=True, timeout=None)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except (OSError, socket.error):
pass
# Now it's not writable anymore
assert not wfs(a, write=True, timeout=0)
# But if we ask for read-or-write, that succeeds
assert wfs(a, read=True, write=True, timeout=0)
# Unless we read from it
assert a.recv(1) == b"x"
assert not wfs(a, read=True, write=True, timeout=0)
# But if the remote peer closes the socket, then it becomes readable
b.close()
assert wfs(a, read=True, timeout=0)
# Waiting for a socket that's actually been closed is just a bug, and
# raises some kind of helpful exception (exact details depend on the
# platform).
with pytest.raises(Exception):
wfs(b, read=True)
def test_wait_for_read_write(spair):
a, b = spair
assert not wait_for_read(a, 0)
assert wait_for_write(a, 0)
b.send(b"x")
assert wait_for_read(a, 0)
assert wait_for_write(a, 0)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except (OSError, socket.error):
pass
# Now it's not writable anymore
assert not wait_for_write(a, 0)
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr(wfs, spair):
a, b = spair
interrupt_count = [0]
def handler(sig, frame):
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
wfs(a, read=True, timeout=1)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
end = monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_zero_timeout(wfs, spair):
a, b = spair
interrupt_count = [0]
def handler(sig, frame):
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
try:
# Start delivering SIGALRM 1000 times per second,
# to trigger race conditions such as
# https://github.com/urllib3/urllib3/issues/1396.
signal.setitimer(signal.ITIMER_REAL, 0.001, 0.001)
# Hammer the system call for a while to trigger the
# race.
for i in range(100000):
wfs(a, read=True, timeout=0)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_infinite_timeout(wfs, spair):
a, b = spair
interrupt_count = [0]
def handler(sig, frame):
assert sig == signal.SIGALRM
interrupt_count[0] += 1
def make_a_readable_after_one_second():
time.sleep(1)
b.send(b"x")
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
thread = threading.Thread(target=make_a_readable_after_one_second)
thread.start()
wfs(a, read=True)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
thread.join()
end = monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
|