File: modbus_mapper.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,704 kB
  • sloc: python: 17,594; makefile: 83; sh: 8
file content (313 lines) | stat: -rw-r--r-- 11,120 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""
Given a modbus mapping file, this is used to generate
decoder blocks so that non-programmers can define the
register values and then decode a modbus device all
without having to write a line of code for decoding.

Currently supported formats are:

* csv
* json
* xml

Here is an example of generating and using a mapping decoder
(note that this is still in the works and will be greatly
simplified in the final api; it is just an example of the
requested functionality)::

    from modbus_mapper import csv_mapping_parser
    from modbus_mapper import mapping_decoder
    from pymodbus.client.sync import ModbusTcpClient
    from pymodbus.payload import BinaryModbusDecoder

    template = ['address', 'size', 'function', 'name', 'description']
    raw_mapping = csv_mapping_parser('input.csv', template)
    mapping = mapping_decoder(raw_mapping)
    
    index, size = 1, 100
    client = ModbusTcpClient('localhost')
    response = client.read_holding_registers(index, size)
    decoder = BinaryModbusDecoder.fromRegisters(response.registers)
    while index < size:
        print "[{}]\t{}".format(i, mapping[i]['type'](decoder))
        index += mapping[i]['size']

Also, using the same input mapping parsers, we can generate
populated slave contexts that can be run behing a modbus server::

    from modbus_mapper import csv_mapping_parser
    from modbus_mapper import modbus_context_decoder
    from pymodbus.client.ssync import StartTcpServer
    from pymodbus.datastore.context import ModbusServerContext

    template = ['address', 'value', 'function', 'name', 'description']
    raw_mapping = csv_mapping_parser('input.csv', template)
    slave_context = modbus_context_decoder(raw_mapping)
    context = ModbusServerContext(slaves=slave_context, single=True)
    StartTcpServer(context)
"""
import csv
import json
from collections import defaultdict

from tokenize import generate_tokens
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.datastore.store import ModbusSparseDataBlock
from pymodbus.compat import IS_PYTHON3
from pymodbus.datastore.context import ModbusSlaveContext
if IS_PYTHON3:
    from io import StringIO
else:
    from StringIO import StringIO

# --------------------------------------------------------------------------- # 
# raw mapping input parsers
# --------------------------------------------------------------------------- # 
# These generate the raw mapping_blocks from some form of input
# which can then be passed to the decoder in question to supply
# the requested output result.
# --------------------------------------------------------------------------- #


def csv_mapping_parser(path, template):
    """ Given a csv file of the the mapping data for
    a modbus device, return a mapping layout that can
    be used to decode an new block.

    .. note:: For the template, a few values are required
    to be defined: address, size, function, and type. All the remaining
    values will be stored, but not formatted by the application.
    So for example::

        template = ['address', 'type', 'size', 'name', 'function']
        mappings = json_mapping_parser('mapping.json', template)

    :param path: The path to the csv input file
    :param template: The row value template
    :returns: The decoded csv dictionary
    """
    mapping_blocks = defaultdict(dict)
    with open(path, 'r') as handle:
        reader = csv.reader(handle)
        reader.next() # skip the csv header
        for row in reader:
            mapping = dict(zip(template, row))
            fid = mapping.pop('function')
            aid = int(mapping['address'])
            mapping_blocks[aid] = mapping
    return mapping_blocks


def json_mapping_parser(path, template):
    """ Given a json file of the the mapping data for
    a modbus device, return a mapping layout that can
    be used to decode an new block.

    .. note:: For the template, a few values are required
    to be mapped: address, size, and type. All the remaining
    values will be stored, but not formatted by the application.
    So for example::

        template = {
            'Start': 'address',
            'DataType': 'type',
            'Length': 'size'
            # the remaining keys will just pass through
        }
        mappings = json_mapping_parser('mapping.json', template)

    :param path: The path to the csv input file
    :param template: The row value template
    :returns: The decoded csv dictionary
    """
    mapping_blocks = {}
    with open(path, 'r') as handle:
        for tid, rows in json.load(handle).iteritems():
            mappings = {}
            for key, values in rows.iteritems():
                mapping = {template.get(k, k) : v for k, v in values.iteritems()}
                mappings[int(key)] = mapping
            mapping_blocks[tid] = mappings
    return mapping_blocks


def xml_mapping_parser(path):
    """ Given an xml file of the the mapping data for
    a modbus device, return a mapping layout that can
    be used to decode an new block.

    .. note:: The input of the xml file is defined as
    follows::

    :param path: The path to the xml input file
    :returns: The decoded csv dictionary
    """
    pass


# --------------------------------------------------------------------------- # 
# modbus context decoders
# --------------------------------------------------------------------------- # 
# These are used to decode a raw mapping_block into a slave context with
# populated function data blocks.
# --------------------------------------------------------------------------- # 
def modbus_context_decoder(mapping_blocks):
    """ Given a mapping block input, generate a backing
    slave context with initialized data blocks.

    .. note:: This expects the following for each block:
    address, value, and function where function is one of
    di (discretes), co (coils), hr (holding registers), or
    ir (input registers).

    :param mapping_blocks: The mapping blocks
    :returns: The initialized modbus slave context
    """
    blocks = defaultdict(dict)
    for block in mapping_blocks.itervalues():
        for mapping in block.itervalues():
            value    = int(mapping['value'])
            address  = int(mapping['address'])
            function = mapping['function']
            blocks[function][address] = value
    return ModbusSlaveContext(**blocks)


# --------------------------------------------------------------------------- # 
# modbus mapping decoder
# --------------------------------------------------------------------------- # 
# These are used to decode a raw mapping_block into a request decoder.
# So this allows one to simply grab a number of registers, and then
# pass them to this decoder which will do the rest.
# --------------------------------------------------------------------------- # 
class ModbusTypeDecoder(object):
    """ This is a utility to determine the correct
    decoder to use given a type name. By default this
    supports all the types available in the default modbus
    decoder, however this can easily be extended this class
    and adding new types to the mapper::

        class CustomTypeDecoder(ModbusTypeDecoder):
            def __init__(self):
                ModbusTypeDecode.__init__(self)
                self.mapper['type-token'] = self.callback

            def parse_my_bitfield(self, tokens):
                return lambda d: d.decode_my_type()

    """
    def __init__(self):
        """ Initializes a new instance of the decoder
        """
        self.default = lambda m: self.parse_16bit_uint
        self.parsers = {
            'uint':    self.parse_16bit_uint,
            'uint8':   self.parse_8bit_uint,
            'uint16':  self.parse_16bit_uint,
            'uint32':  self.parse_32bit_uint,
            'uint64':  self.parse_64bit_uint,
            'int':     self.parse_16bit_int,
            'int8':    self.parse_8bit_int,
            'int16':   self.parse_16bit_int,
            'int32':   self.parse_32bit_int,
            'int64':   self.parse_64bit_int,
            'float':   self.parse_32bit_float,
            'float32': self.parse_32bit_float,
            'float64': self.parse_64bit_float,
            'string':  self.parse_32bit_int,
            'bits':    self.parse_bits,
        }

    # ------------------------------------------------------------ #
    # Type parsers
    # ------------------------------------------------------------ #
    @staticmethod
    def parse_string(tokens):
        _ = tokens.next()
        size = int(tokens.next())
        return lambda d: d.decode_string(size=size)

    @staticmethod
    def parse_bits(tokens):
        return lambda d: d.decode_bits()

    @staticmethod
    def parse_8bit_uint(tokens):
        return lambda d: d.decode_8bit_uint()

    @staticmethod
    def parse_16bit_uint(tokens):
        return lambda d: d.decode_16bit_uint()

    @staticmethod
    def parse_32bit_uint(tokens):
        return lambda d: d.decode_32bit_uint()

    @staticmethod
    def parse_64bit_uint(tokens):
        return lambda d: d.decode_64bit_uint()

    @staticmethod
    def parse_8bit_int(tokens):
        return lambda d: d.decode_8bit_int()

    @staticmethod
    def parse_16bit_int(tokens):
        return lambda d: d.decode_16bit_int()

    @staticmethod
    def parse_32bit_int(tokens):
        return lambda d: d.decode_32bit_int()

    @staticmethod
    def parse_64bit_int(tokens):
        return lambda d: d.decode_64bit_int()

    @staticmethod
    def parse_32bit_float(tokens):
        return lambda d: d.decode_32bit_float()

    @staticmethod
    def parse_64bit_float(tokens):
        return lambda d: d.decode_64bit_float()

    #------------------------------------------------------------
    # Public Interface
    #------------------------------------------------------------
    def tokenize(self, value):
        """ Given a value, return the tokens
    
        :param value: The value to tokenize
        :returns: A token generator
        """
        tokens = generate_tokens(StringIO(value).readline)
        for toknum, tokval, _, _, _ in tokens:
            yield tokval

    def parse(self, value):
        """ Given a type value, return a function
        that supplied with a decoder, will decode
        the correct value.

        :param value: The type of value to parse
        :returns: The decoder method to use
        """
        tokens = self.tokenize(value)
        token  = tokens.next().lower()
        parser = self.parsers.get(token, self.default)
        return parser(tokens)


def mapping_decoder(mapping_blocks, decoder=None):
    """ Given the raw mapping blocks, convert
    them into modbus value decoder map.

    :param mapping_blocks: The mapping blocks
    :param decoder: The type decoder to use
    """
    decoder = decoder or ModbusTypeDecoder()
    for block in mapping_blocks.itervalues():
        for mapping in block.itervalues():
            mapping['address'] = int(mapping['address'])
            mapping['size'] = int(mapping['size'])
            mapping['type'] = decoder.parse(mapping['type'])