1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
"""
Modbus BCD Payload Builder
-----------------------------------------------------------
This is an example of building a custom payload builder
that can be used in the pymodbus library. Below is a
simple binary coded decimal builder and decoder.
"""
from struct import pack, unpack
from pymodbus.constants import Endian
from pymodbus.interfaces import IPayloadBuilder
from pymodbus.utilities import pack_bitstring
from pymodbus.utilities import unpack_bitstring
from pymodbus.exceptions import ParameterException
from pymodbus.payload import BinaryPayloadDecoder
def convert_to_bcd(decimal):
""" Converts a decimal value to a bcd value
:param value: The decimal value to to pack into bcd
:returns: The number in bcd form
"""
place, bcd = 0, 0
while decimal > 0:
nibble = decimal % 10
bcd += nibble << place
decimal /= 10
place += 4
return bcd
def convert_from_bcd(bcd):
""" Converts a bcd value to a decimal value
:param value: The value to unpack from bcd
:returns: The number in decimal form
"""
place, decimal = 1, 0
while bcd > 0:
nibble = bcd & 0xf
decimal += nibble * place
bcd >>= 4
place *= 10
return decimal
def count_bcd_digits(bcd):
""" Count the number of digits in a bcd value
:param bcd: The bcd number to count the digits of
:returns: The number of digits in the bcd string
"""
count = 0
while bcd > 0:
count += 1
bcd >>= 4
return count
class BcdPayloadBuilder(IPayloadBuilder):
"""
A utility that helps build binary coded decimal payload
messages to be written with the various modbus messages.
example::
builder = BcdPayloadBuilder()
builder.add_number(1)
builder.add_number(int(2.234 * 1000))
payload = builder.build()
"""
def __init__(self, payload=None, endian=Endian.Little):
""" Initialize a new instance of the payload builder
:param payload: Raw payload data to initialize with
:param endian: The endianess of the payload
"""
self._payload = payload or []
self._endian = endian
def __str__(self):
""" Return the payload buffer as a string
:returns: The payload buffer as a string
"""
return ''.join(self._payload)
def reset(self):
""" Reset the payload buffer
"""
self._payload = []
def build(self):
""" Return the payload buffer as a list
This list is two bytes per element and can
thus be treated as a list of registers.
:returns: The payload buffer as a list
"""
string = str(self)
length = len(string)
string = string + ('\x00' * (length % 2))
return [string[i:i+2] for i in range(0, length, 2)]
def add_bits(self, values):
""" Adds a collection of bits to be encoded
If these are less than a multiple of eight,
they will be left padded with 0 bits to make
it so.
:param value: The value to add to the buffer
"""
value = pack_bitstring(values)
self._payload.append(value)
def add_number(self, value, size=None):
""" Adds any 8bit numeric type to the buffer
:param value: The value to add to the buffer
"""
encoded = []
value = convert_to_bcd(value)
size = size or count_bcd_digits(value)
while size > 0:
nibble = value & 0xf
encoded.append(pack('B', nibble))
value >>= 4
size -= 1
self._payload.extend(encoded)
def add_string(self, value):
""" Adds a string to the buffer
:param value: The value to add to the buffer
"""
self._payload.append(value)
class BcdPayloadDecoder(object):
"""
A utility that helps decode binary coded decimal payload
messages from a modbus reponse message. What follows is
a simple example::
decoder = BcdPayloadDecoder(payload)
first = decoder.decode_int(2)
second = decoder.decode_int(5) / 100
"""
def __init__(self, payload):
""" Initialize a new payload decoder
:param payload: The payload to decode with
"""
self._payload = payload
self._pointer = 0x00
@staticmethod
def fromRegisters(registers, endian=Endian.Little):
""" Initialize a payload decoder with the result of
reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values.
We have to do this because of how the data has already
been decoded by the rest of the library.
:param registers: The register results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
"""
if isinstance(registers, list): # repack into flat binary
payload = ''.join(pack('>H', x) for x in registers)
return BinaryPayloadDecoder(payload, endian)
raise ParameterException('Invalid collection of registers supplied')
@staticmethod
def fromCoils(coils, endian=Endian.Little):
""" Initialize a payload decoder with the result of
reading a collection of coils from a modbus device.
The coils are treated as a list of bit(boolean) values.
:param coils: The coil results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
"""
if isinstance(coils, list):
payload = pack_bitstring(coils)
return BinaryPayloadDecoder(payload, endian)
raise ParameterException('Invalid collection of coils supplied')
def reset(self):
""" Reset the decoder pointer back to the start
"""
self._pointer = 0x00
def decode_int(self, size=1):
""" Decodes a int or long from the buffer
"""
self._pointer += size
handle = self._payload[self._pointer - size:self._pointer]
return convert_from_bcd(handle)
def decode_bits(self):
""" Decodes a byte worth of bits from the buffer
"""
self._pointer += 1
handle = self._payload[self._pointer - 1:self._pointer]
return unpack_bitstring(handle)
def decode_string(self, size=1):
""" Decodes a string from the buffer
:param size: The size of the string to decode
"""
self._pointer += size
return self._payload[self._pointer - size:self._pointer]
# --------------------------------------------------------------------------- #
# Exported Identifiers
# --------------------------------------------------------------------------- #
__all__ = ["BcdPayloadBuilder", "BcdPayloadDecoder"]
|