1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
|
import asyncio
import os
import sys
import time
import pytest
import pytest_asyncio
from cockpit.channel import ChannelError
from cockpit.packages import BridgeConfig
from cockpit.peer import ConfiguredPeer, PeerRoutingRule
from cockpit.protocol import CockpitProtocolError
from cockpit.router import Router
from cockpit.transports import SubprocessTransport
from . import mockpeer
from .mocktransport import MockTransport
PEER_CONFIG = BridgeConfig({
"spawn": [sys.executable, mockpeer.__file__],
"environ": ['PYTHONPATH=' + ':'.join(sys.path)],
"match": {"payload": "test"},
})
class Bridge(Router):
init_host = 'localhost'
def __init__(self):
rule = PeerRoutingRule(self, PEER_CONFIG)
super().__init__([rule])
def do_send_init(self):
pass
@pytest.fixture
def bridge():
return Bridge()
# requires an event loop for queue in MockTransport
@pytest_asyncio.fixture()
async def transport(bridge): # noqa: RUF029
return MockTransport(bridge)
@pytest.fixture
def rule(bridge):
return bridge.routing_rules[0]
@pytest.mark.asyncio
async def test_shutdown(transport, rule):
await transport.check_open('test')
await transport.check_open('xest', problem='not-supported')
# Force the Peer closed
rule.peer.close()
await transport.assert_msg('', command='close', channel='channel.1', problem='terminated')
# But it should spawn again
await transport.check_open('test')
await transport.check_open('xest', problem='not-supported')
rule.peer.close()
@pytest.mark.asyncio
@pytest.mark.parametrize('init_type', ['wrong-command', 'channel-control', 'data', 'break-protocol'])
async def test_init_failure(rule, init_type, monkeypatch, transport):
monkeypatch.setenv('INIT_TYPE', init_type)
await transport.check_open('test', problem='protocol-error')
# this doesn't use await, but requires an event loop
@pytest.mark.asyncio
async def test_immediate_shutdown(rule): # noqa: RUF029
peer = rule.apply_rule({'payload': 'test'})
assert peer is not None
peer.close()
@pytest.mark.asyncio
async def test_shutdown_before_init(monkeypatch, transport, rule):
monkeypatch.setenv('INIT_TYPE', 'silence')
channel = transport.send_open('test')
assert rule.peer is not None
assert rule.peer.transport is None
while rule.peer.transport is None:
await asyncio.sleep(0)
rule.peer.close()
await transport.assert_msg('', command='close', channel=channel, problem='terminated')
@pytest.mark.asyncio
async def test_exit_without_init(monkeypatch, transport):
monkeypatch.setenv('INIT_TYPE', 'exit')
await transport.check_open('test', problem='terminated')
@pytest.mark.asyncio
async def test_exit_not_found(monkeypatch, transport):
monkeypatch.setenv('INIT_TYPE', 'exit-not-found')
await transport.check_open('test', problem='no-cockpit')
@pytest.mark.asyncio
async def test_killed(monkeypatch, transport, rule):
channel = await transport.check_open('test')
os.kill(rule.peer.transport._process.pid, 9)
await transport.assert_msg('', command='close', channel=channel, problem='terminated')
@pytest.mark.asyncio
@pytest.mark.parametrize('init_type', ['wrong-command', 'channel-control', 'data', 'break-protocol'])
async def test_await_failure(init_type, monkeypatch, bridge):
monkeypatch.setenv('INIT_TYPE', init_type)
peer = ConfiguredPeer(bridge, PEER_CONFIG)
with pytest.raises(CockpitProtocolError):
await peer.start()
peer.close()
@pytest.mark.asyncio
async def test_await_broken_connect(bridge):
class BrokenConnect(ConfiguredPeer):
async def do_connect_transport(self):
_ = 42 / 0
peer = BrokenConnect(bridge, PEER_CONFIG)
with pytest.raises(ZeroDivisionError):
await peer.start()
peer.close()
@pytest.mark.asyncio
async def test_await_broken_after_connect(bridge):
class BrokenConnect(ConfiguredPeer):
async def do_connect_transport(self):
await super().do_connect_transport()
_ = 42 / 0
peer = BrokenConnect(bridge, PEER_CONFIG)
with pytest.raises(ZeroDivisionError):
await peer.start()
peer.close()
class CancellableConnect(ConfiguredPeer):
was_cancelled = False
async def do_connect_transport(self):
await super().do_connect_transport()
try:
# We should get cancelled here when the mockpeer sends "init"
await asyncio.sleep(10000)
except asyncio.CancelledError:
self.was_cancelled = True
raise
@pytest.mark.asyncio
async def test_await_cancellable_connect_init(bridge):
peer = CancellableConnect(bridge, PEER_CONFIG)
await peer.start()
peer.close()
while len(asyncio.all_tasks()) > 1:
await asyncio.sleep(0.1)
assert peer.was_cancelled
@pytest.mark.asyncio
async def test_await_cancellable_connect_close(monkeypatch, bridge):
monkeypatch.setenv('INIT_TYPE', 'silence') # make sure we never get "init"
peer = CancellableConnect(bridge, PEER_CONFIG)
asyncio.get_running_loop().call_later(0.1, peer.close) # call peer.close() after .start() is running
with pytest.raises(asyncio.CancelledError):
await peer.start()
# we already called .close()
while len(asyncio.all_tasks()) > 1:
await asyncio.sleep(0.1)
assert peer.was_cancelled
@pytest.mark.asyncio
async def test_spawn_broken_pipe(bridge):
class BrokenPipePeer(ConfiguredPeer):
def __init__(self, *, specific_error=False):
super().__init__(bridge, PEER_CONFIG)
self.specific_error = specific_error
async def do_connect_transport(self) -> None:
transport = await self.spawn(['sh', '-c', 'read a; exit 9'], ())
assert isinstance(transport, SubprocessTransport)
# Make the process exit by writing a newline (causing `read` to finish)
transport.write(b'\n')
# The process will exit soon — try writing to it until a write fails.
while not transport.is_closing():
transport.write(b'x')
time.sleep(0.1)
while transport.get_returncode() is None:
await asyncio.sleep(0.1)
if self.specific_error:
raise ChannelError('not-supported', message='kaputt')
# BrokenPipe bubbles up without an error returned by do_connect_transport
peer = BrokenPipePeer(specific_error=False)
with pytest.raises(BrokenPipeError):
await peer.start()
peer.close()
# BrokenPipe gets trumped by specific error returned by do_connect_transport
peer = BrokenPipePeer(specific_error=True)
with pytest.raises(ChannelError) as raises:
await peer.start()
assert raises.value.attrs == {'message': 'kaputt', 'problem': 'not-supported'}
peer.close()
|