File: drainage_sim.py

package info (click to toggle)
pymodbus 3.8.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,720 kB
  • sloc: python: 14,867; makefile: 27; sh: 17
file content (68 lines) | stat: -rwxr-xr-x 2,447 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3

# Simulates two Modbus TCP slave servers:
#
# Port 5020: Digital IO (DIO) with 8 discrete inputs and 8 coils. The first two coils each control
#            a simulated pump. Inputs are not used.
#
# Port 5021: Water level meter (WLM) returning the current water level in the input register. It
#            increases chronologically and decreases rapidly when one or two pumps are active.

import asyncio
import logging
from datetime import datetime

from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
from pymodbus.server import StartAsyncTcpServer

INITIAL_WATER_LEVEL = 300
WATER_INFLOW = 1
PUMP_OUTFLOW = 8

logging.basicConfig(level = logging.INFO)

dio_di = ModbusSequentialDataBlock(1, [False] * 8)
dio_co = ModbusSequentialDataBlock(1, [False] * 8)
dio_context = ModbusSlaveContext(di = dio_di, co = dio_co)
wlm_ir = ModbusSequentialDataBlock(1, [INITIAL_WATER_LEVEL])
wlm_context = ModbusSlaveContext(ir = wlm_ir)

async def update():
    while True:
        await asyncio.sleep(1)

        # Update water level based on DIO output values (simulating pumps)
        water_level = wlm_ir.getValues(1, 1)[0]
        dio_outputs = dio_co.getValues(1, 2)

        water_level += WATER_INFLOW
        water_level -= (int(dio_outputs[0]) + int(dio_outputs[1])) * PUMP_OUTFLOW
        water_level = max(0, min(INITIAL_WATER_LEVEL * 10, water_level))
        wlm_ir.setValues(1, [water_level])

async def log():
    while True:
        await asyncio.sleep(10)

        dio_outputs = dio_co.getValues(1, 8)
        wlm_level = wlm_ir.getValues(1, 1)[0]

        logging.info(f"{datetime.now()}: WLM water level: {wlm_level}, DIO outputs: {dio_outputs}")

async def run():
    ctx = ModbusServerContext(slaves = dio_context)
    dio_server = asyncio.create_task(StartAsyncTcpServer(context = ctx, address = ("0.0.0.0", 5020)))
    logging.info("Initialising slave server DIO on port 5020")

    ctx = ModbusServerContext(slaves = wlm_context)
    wlm_server = asyncio.create_task(StartAsyncTcpServer(context = ctx, address = ("0.0.0.0", 5021)))
    logging.info("Initialising slave server WLM on port 5021")

    update_task = asyncio.create_task(update())
    logging_task = asyncio.create_task(log())

    logging.info("Init complete")
    await asyncio.gather(dio_server, wlm_server, update_task, logging_task)

if __name__ == "__main__":
    asyncio.run(run(), debug=True)