File: ps-simulator.py

package info (click to toggle)
pytango 10.1.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,304 kB
  • sloc: python: 27,795; cpp: 16,150; sql: 252; sh: 152; makefile: 43
file content (145 lines) | stat: -rwxr-xr-x 3,942 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later

"""
Simple power supply "hardware" simulator, communicating with plain
text commands over TCP.  For uses with the ps1*.py and ps2*.py device
servers.

Requires gevent to be installed.
"""

import time
import logging
import gevent.server

DEFAULT_BIND = ""
DEFAULT_PORT = 45000


class Attr:
    def __init__(
        self,
        *,
        initial_value=0.0,
        encode=lambda x: bytes(str(x), "ascii"),
        decode=float,
    ):
        self.value = initial_value
        self.encode = encode
        self.decode = decode

    def get(self):
        return self.encode(self.value)

    def set(self, value):
        self.value = self.decode(value)


class Calibrate(Attr):
    def set(self, value):
        self.ts = time.time()
        super().set(value)


class State(Attr):
    def __init__(self, calib, *args, **kwargs):
        kwargs["initial_value"] = 0
        kwargs["decode"] = int
        super().__init__(*args, **kwargs)
        self.calib = calib
        calib.ts = 0

    def get(self):
        self.value = 0
        if time.time() - self.calib.ts < 2:
            self.value = 1
        return super().get()


class PSSimulator(gevent.server.StreamServer):
    class Error(Exception):
        pass

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.log = logging.getLogger(f"simulator.{self.server_port}")
        calib = Calibrate(initial_value=0)
        self.attrs = {
            b"stat": State(calib),
            b"vol": Attr(initial_value=0.1),
            b"curr": Attr(initial_value=0.0),
            b"calib": calib,
        }

    def __getitem__(self, name):
        return self.attrs[name].get()

    def __setitem__(self, name, value):
        self.attrs[name].set(value)

    def handle(self, sock, addr):
        log = self.log
        log.info("new connection from %r", addr)
        fileobj = sock.makefile(mode="rb")
        while True:
            request = fileobj.readline()
            if not request:
                log.info("disconnected %r", addr)
                break
            log.info("request %r", request)
            reply = b"ERROR"
            try:
                reply = self.handle_request(request)
            except PSSimulator.Error:
                pass
            except Exception:
                log.exception("Unforeseen error")
            gevent.sleep(1e-1)
            sock.sendall(reply + b"\n")
            log.info("replyed %r", reply)
        fileobj.close()

    def handle_request(self, request):
        req_lower = request.strip().lower()
        is_query = b"?" in req_lower
        pars = req_lower.split()
        name = pars[0]
        if is_query:
            name = name[:-1]  # take out '?'
        if is_query:
            return self[name]
        self[name] = pars[1]
        return b"OK"


def main(number=1, bind=DEFAULT_BIND, port=DEFAULT_PORT, **kwargs):
    servers = []
    logging.info("starting simulator...")
    for i in range(number):
        address = bind, port + i
        server = PSSimulator(address)
        server.start()
        servers.append(server)
        server.log.info("simulator listenning on %r!", address)
    try:
        while True:
            gevent.sleep(1)
    except KeyboardInterrupt:
        logging.info("Ctrl-C pressed. Bailing out!")
        for server in servers:
            server.stop()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--port", type=int, default=DEFAULT_PORT)
    parser.add_argument("--bind", default=DEFAULT_BIND)
    parser.add_argument("--log-level", default="info")
    parser.add_argument("--number", type=int, default=1)
    args = parser.parse_args()
    logging.basicConfig(level=args.log_level.upper())
    main(**vars(args))