File: mockpeer.py

package info (click to toggle)
cockpit 337-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,232 kB
  • sloc: javascript: 47,090; python: 38,766; ansic: 35,470; xml: 6,048; sh: 3,413; makefile: 614
file content (46 lines) | stat: -rw-r--r-- 1,465 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import os
import sys

from cockpit._vendor import systemd_ctypes
from cockpit.router import Router
from cockpit.transports import StdioTransport


class MockPeer(Router):
    def do_send_init(self):
        init_type = os.environ.get('INIT_TYPE', None)
        if init_type == 'wrong-command':
            self.write_control(command='xnit', version=1)
        elif init_type == 'wrong-version':
            self.write_control(command='init', version=2)
        elif init_type == 'channel-control':
            self.write_control(command='init', channel='x')
        elif init_type == 'data':
            self.write_channel_data('x', b'123')
        elif init_type == 'break-protocol':
            print('i like printf debugging', flush=True)
        elif init_type == 'exit':
            sys.exit()
        elif init_type == 'exit-not-found':
            # shell error code for "command not found"
            sys.exit(127)
        elif init_type != 'silence':
            self.write_control(command='init', version=1)

    def channel_control_received(self, channel, command, message):
        if command == 'open':
            self.write_control(command='ready', channel=channel)

    def channel_data_received(self, channel, data):
        pass


async def run():
    protocol = MockPeer([])
    StdioTransport(asyncio.get_running_loop(), protocol)
    await protocol.communicate()


if __name__ == '__main__':
    systemd_ctypes.run_async(run())