1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
import asyncio
import os
import sys
from cockpit._vendor import systemd_ctypes
from cockpit.router import Router
from cockpit.transports import StdioTransport
class MockPeer(Router):
def do_send_init(self):
init_type = os.environ.get('INIT_TYPE', None)
if init_type == 'wrong-command':
self.write_control(command='xnit', version=1)
elif init_type == 'wrong-version':
self.write_control(command='init', version=2)
elif init_type == 'channel-control':
self.write_control(command='init', channel='x')
elif init_type == 'data':
self.write_channel_data('x', b'123')
elif init_type == 'break-protocol':
print('i like printf debugging', flush=True)
elif init_type == 'exit':
sys.exit()
elif init_type == 'exit-not-found':
# shell error code for "command not found"
sys.exit(127)
elif init_type != 'silence':
self.write_control(command='init', version=1)
def channel_control_received(self, channel, command, message):
if command == 'open':
self.write_control(command='ready', channel=channel)
def channel_data_received(self, channel, data):
pass
async def run():
protocol = MockPeer([])
StdioTransport(asyncio.get_running_loop(), protocol)
await protocol.communicate()
if __name__ == '__main__':
systemd_ctypes.run_async(run())
|