File: async_twisted_client.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,704 kB
  • sloc: python: 17,594; makefile: 83; sh: 8
file content (172 lines) | stat: -rwxr-xr-x 7,207 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
"""
Pymodbus Asynchronous Client Examples
--------------------------------------------------------------------------

The following is an example of how to use the asynchronous modbus
client implementation from pymodbus.
"""
# --------------------------------------------------------------------------- #
# import needed libraries
# --------------------------------------------------------------------------- #

from twisted.internet import reactor

from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient
# from pymodbus.client.asynchronous.udp import AsyncModbusUDPClient
from pymodbus.client.asynchronous import schedulers

# --------------------------------------------------------------------------- #
# choose the requested modbus protocol
# --------------------------------------------------------------------------- #

from twisted.internet import reactor, protocol

# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s'
          ' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)

# --------------------------------------------------------------------------- #
# helper method to test deferred callbacks
# --------------------------------------------------------------------------- #


def err(*args, **kwargs):
    logging.error("Err-{}-{}".format(args, kwargs))


def dassert(deferred, callback):
    def _assertor(value):
        assert value
    deferred.addCallback(lambda r: _assertor(callback(r)))
    deferred.addErrback(err)

# --------------------------------------------------------------------------- #
# specify slave to query
# --------------------------------------------------------------------------- #
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
# --------------------------------------------------------------------------- #


UNIT = 0x01


def processResponse(result):
    log.debug(result)


def exampleRequests(client):
    rr = client.read_coils(1, 1, unit=0x02)
    rr.addCallback(processResponse)
    rr = client.read_holding_registers(1, 1, unit=0x02)
    rr.addCallback(processResponse)
    rr = client.read_discrete_inputs(1, 1, unit=0x02)
    rr.addCallback(processResponse)
    rr = client.read_input_registers(1, 1, unit=0x02)
    rr.addCallback(processResponse)
    stopAsynchronousTest(client)

# --------------------------------------------------------------------------- #
# example requests
# --------------------------------------------------------------------------- #
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that unlike the
# synchronous version of the client, the asynchronous version returns
# deferreds which can be thought of as a handle to the callback to send
# the result of the operation.  We are handling the result using the
# deferred assert helper(dassert).
# --------------------------------------------------------------------------- #


def stopAsynchronousTest(client):
    # ----------------------------------------------------------------------- #
    # close the client at some time later
    # ----------------------------------------------------------------------- #
    reactor.callLater(1, client.transport.loseConnection)
    reactor.callLater(2, reactor.stop)


def beginAsynchronousTest(client):
    rq = client.write_coil(1, True, unit=UNIT)
    rr = client.read_coils(1, 1, unit=UNIT)
    dassert(rq, lambda r: not r.isError())     # test for no error
    dassert(rr, lambda r: r.bits[0] == True)          # test the expected value

    rq = client.write_coils(1, [True]*8, unit=UNIT)
    rr = client.read_coils(1, 8, unit=UNIT)
    dassert(rq, lambda r: not r.isError())     # test for no error
    dassert(rr, lambda r: r.bits == [True]*8)        # test the expected value

    rq = client.write_coils(1, [False]*8, unit=UNIT)
    rr = client.read_discrete_inputs(1, 8, unit=UNIT)
    dassert(rq, lambda r: not r.isError())     # test for no error
    dassert(rr, lambda r: r.bits == [True]*8)        # test the expected value

    rq = client.write_register(1, 10, unit=UNIT)
    rr = client.read_holding_registers(1, 1, unit=UNIT)
    dassert(rq, lambda r: not r.isError())     # test for no error
    dassert(rr, lambda r: r.registers[0] == 10)       # test the expected value

    rq = client.write_registers(1, [10]*8, unit=UNIT)
    rr = client.read_input_registers(1, 8, unit=UNIT)
    dassert(rq, lambda r: not r.isError())     # test for no error
    dassert(rr, lambda r: r.registers == [17]*8)      # test the expected value

    arguments = {
        'read_address':    1,
        'read_count':      8,
        'write_address':   1,
        'write_registers': [20]*8,
    }
    rq = client.readwrite_registers(arguments, unit=UNIT)
    rr = client.read_input_registers(1, 8, unit=UNIT)
    dassert(rq, lambda r: r.registers == [20]*8)      # test the expected value
    dassert(rr, lambda r: r.registers == [17]*8)      # test the expected value
    stopAsynchronousTest(client)

    # ----------------------------------------------------------------------- #
    # close the client at some time later
    # ----------------------------------------------------------------------- #
    # reactor.callLater(1, client.transport.loseConnection)
    reactor.callLater(2, reactor.stop)

# --------------------------------------------------------------------------- #
# extra requests
# --------------------------------------------------------------------------- #
# If you are performing a request that is not available in the client
# mixin, you have to perform the request like this instead::
#
# from pymodbus.diag_message import ClearCountersRequest
# from pymodbus.diag_message import ClearCountersResponse
#
# request  = ClearCountersRequest()
# response = client.execute(request)
# if isinstance(response, ClearCountersResponse):
#     ... do something with the response
#
# --------------------------------------------------------------------------- #

# --------------------------------------------------------------------------- #
# choose the client you want
# --------------------------------------------------------------------------- #
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
# --------------------------------------------------------------------------- #


if __name__ == "__main__":
    protocol, deferred = AsyncModbusTCPClient(schedulers.REACTOR, port=5020)
    # protocol, deferred = AsyncModbusUDPClient(schedulers.REACTOR, port=5020)
                             # callback=beginAsynchronousTest,
                             # errback=err)
    deferred.addCallback(beginAsynchronousTest)
    deferred.addErrback(err)