1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
#!/usr/bin/env python3
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
"""
Simple power supply "hardware" simulator, communicating with plain
text commands over TCP. For uses with the ps1*.py and ps2*.py device
servers.
Requires gevent to be installed.
"""
import time
import logging
import gevent.server
DEFAULT_BIND = ""
DEFAULT_PORT = 45000
class Attr:
def __init__(
self,
*,
initial_value=0.0,
encode=lambda x: bytes(str(x), "ascii"),
decode=float,
):
self.value = initial_value
self.encode = encode
self.decode = decode
def get(self):
return self.encode(self.value)
def set(self, value):
self.value = self.decode(value)
class Calibrate(Attr):
def set(self, value):
self.ts = time.time()
super().set(value)
class State(Attr):
def __init__(self, calib, *args, **kwargs):
kwargs["initial_value"] = 0
kwargs["decode"] = int
super().__init__(*args, **kwargs)
self.calib = calib
calib.ts = 0
def get(self):
self.value = 0
if time.time() - self.calib.ts < 2:
self.value = 1
return super().get()
class PSSimulator(gevent.server.StreamServer):
class Error(Exception):
pass
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.log = logging.getLogger(f"simulator.{self.server_port}")
calib = Calibrate(initial_value=0)
self.attrs = {
b"stat": State(calib),
b"vol": Attr(initial_value=0.1),
b"curr": Attr(initial_value=0.0),
b"calib": calib,
}
def __getitem__(self, name):
return self.attrs[name].get()
def __setitem__(self, name, value):
self.attrs[name].set(value)
def handle(self, sock, addr):
log = self.log
log.info("new connection from %r", addr)
fileobj = sock.makefile(mode="rb")
while True:
request = fileobj.readline()
if not request:
log.info("disconnected %r", addr)
break
log.info("request %r", request)
reply = b"ERROR"
try:
reply = self.handle_request(request)
except PSSimulator.Error:
pass
except Exception:
log.exception("Unforeseen error")
gevent.sleep(1e-1)
sock.sendall(reply + b"\n")
log.info("replyed %r", reply)
fileobj.close()
def handle_request(self, request):
req_lower = request.strip().lower()
is_query = b"?" in req_lower
pars = req_lower.split()
name = pars[0]
if is_query:
name = name[:-1] # take out '?'
if is_query:
return self[name]
self[name] = pars[1]
return b"OK"
def main(number=1, bind=DEFAULT_BIND, port=DEFAULT_PORT, **kwargs):
servers = []
logging.info("starting simulator...")
for i in range(number):
address = bind, port + i
server = PSSimulator(address)
server.start()
servers.append(server)
server.log.info("simulator listenning on %r!", address)
try:
while True:
gevent.sleep(1)
except KeyboardInterrupt:
logging.info("Ctrl-C pressed. Bailing out!")
for server in servers:
server.stop()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=DEFAULT_PORT)
parser.add_argument("--bind", default=DEFAULT_BIND)
parser.add_argument("--log-level", default="info")
parser.add_argument("--number", type=int, default=1)
args = parser.parse_args()
logging.basicConfig(level=args.log_level.upper())
main(**vars(args))
|