1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
#!/usr/bin/env python
"""
Pymodbus Asynchronous Client Examples
--------------------------------------------------------------------------
The following is an example of how to use the asynchronous modbus
client implementation from pymodbus.
"""
# --------------------------------------------------------------------------- #
# import needed libraries
# --------------------------------------------------------------------------- #
from twisted.internet import reactor
from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient
# from pymodbus.client.asynchronous.udp import AsyncModbusUDPClient
from pymodbus.client.asynchronous import schedulers
# --------------------------------------------------------------------------- #
# choose the requested modbus protocol
# --------------------------------------------------------------------------- #
from twisted.internet import reactor, protocol
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s'
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# --------------------------------------------------------------------------- #
# helper method to test deferred callbacks
# --------------------------------------------------------------------------- #
def err(*args, **kwargs):
logging.error("Err-{}-{}".format(args, kwargs))
def dassert(deferred, callback):
def _assertor(value):
assert value
deferred.addCallback(lambda r: _assertor(callback(r)))
deferred.addErrback(err)
# --------------------------------------------------------------------------- #
# specify slave to query
# --------------------------------------------------------------------------- #
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
# --------------------------------------------------------------------------- #
UNIT = 0x01
def processResponse(result):
log.debug(result)
def exampleRequests(client):
rr = client.read_coils(1, 1, unit=0x02)
rr.addCallback(processResponse)
rr = client.read_holding_registers(1, 1, unit=0x02)
rr.addCallback(processResponse)
rr = client.read_discrete_inputs(1, 1, unit=0x02)
rr.addCallback(processResponse)
rr = client.read_input_registers(1, 1, unit=0x02)
rr.addCallback(processResponse)
stopAsynchronousTest(client)
# --------------------------------------------------------------------------- #
# example requests
# --------------------------------------------------------------------------- #
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that unlike the
# synchronous version of the client, the asynchronous version returns
# deferreds which can be thought of as a handle to the callback to send
# the result of the operation. We are handling the result using the
# deferred assert helper(dassert).
# --------------------------------------------------------------------------- #
def stopAsynchronousTest(client):
# ----------------------------------------------------------------------- #
# close the client at some time later
# ----------------------------------------------------------------------- #
reactor.callLater(1, client.transport.loseConnection)
reactor.callLater(2, reactor.stop)
def beginAsynchronousTest(client):
rq = client.write_coil(1, True, unit=UNIT)
rr = client.read_coils(1, 1, unit=UNIT)
dassert(rq, lambda r: not r.isError()) # test for no error
dassert(rr, lambda r: r.bits[0] == True) # test the expected value
rq = client.write_coils(1, [True]*8, unit=UNIT)
rr = client.read_coils(1, 8, unit=UNIT)
dassert(rq, lambda r: not r.isError()) # test for no error
dassert(rr, lambda r: r.bits == [True]*8) # test the expected value
rq = client.write_coils(1, [False]*8, unit=UNIT)
rr = client.read_discrete_inputs(1, 8, unit=UNIT)
dassert(rq, lambda r: not r.isError()) # test for no error
dassert(rr, lambda r: r.bits == [True]*8) # test the expected value
rq = client.write_register(1, 10, unit=UNIT)
rr = client.read_holding_registers(1, 1, unit=UNIT)
dassert(rq, lambda r: not r.isError()) # test for no error
dassert(rr, lambda r: r.registers[0] == 10) # test the expected value
rq = client.write_registers(1, [10]*8, unit=UNIT)
rr = client.read_input_registers(1, 8, unit=UNIT)
dassert(rq, lambda r: not r.isError()) # test for no error
dassert(rr, lambda r: r.registers == [17]*8) # test the expected value
arguments = {
'read_address': 1,
'read_count': 8,
'write_address': 1,
'write_registers': [20]*8,
}
rq = client.readwrite_registers(arguments, unit=UNIT)
rr = client.read_input_registers(1, 8, unit=UNIT)
dassert(rq, lambda r: r.registers == [20]*8) # test the expected value
dassert(rr, lambda r: r.registers == [17]*8) # test the expected value
stopAsynchronousTest(client)
# ----------------------------------------------------------------------- #
# close the client at some time later
# ----------------------------------------------------------------------- #
# reactor.callLater(1, client.transport.loseConnection)
reactor.callLater(2, reactor.stop)
# --------------------------------------------------------------------------- #
# extra requests
# --------------------------------------------------------------------------- #
# If you are performing a request that is not available in the client
# mixin, you have to perform the request like this instead::
#
# from pymodbus.diag_message import ClearCountersRequest
# from pymodbus.diag_message import ClearCountersResponse
#
# request = ClearCountersRequest()
# response = client.execute(request)
# if isinstance(response, ClearCountersResponse):
# ... do something with the response
#
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# choose the client you want
# --------------------------------------------------------------------------- #
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
protocol, deferred = AsyncModbusTCPClient(schedulers.REACTOR, port=5020)
# protocol, deferred = AsyncModbusUDPClient(schedulers.REACTOR, port=5020)
# callback=beginAsynchronousTest,
# errback=err)
deferred.addCallback(beginAsynchronousTest)
deferred.addErrback(err)
|