1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
# -*- encoding: utf-8 -*-
from __future__ import print_function, unicode_literals, division, absolute_import
import os
import logging
from sys import version_info
from collections import OrderedDict
from bs4 import BeautifulSoup
import enocean.utils
# Left as a helper
from enocean.protocol.constants import RORG # noqa: F401
class EEP(object):
logger = logging.getLogger('enocean.protocol.eep')
def __init__(self):
self.init_ok = False
self.telegrams = {}
eep_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EEP.xml')
try:
if version_info[0] > 2:
with open(eep_path, 'r', encoding='UTF-8') as xml_file:
self.soup = BeautifulSoup(xml_file.read(), "html.parser")
else:
with open(eep_path, 'r') as xml_file:
self.soup = BeautifulSoup(xml_file.read(), "html.parser")
self.init_ok = True
self.__load_xml()
except IOError:
# Impossible to test with the current structure?
# To be honest, as the XML is included with the library,
# there should be no possibility of ever reaching this...
self.logger.warn('Cannot load protocol file!')
self.init_ok = False
def __load_xml(self):
self.telegrams = {
enocean.utils.from_hex_string(telegram['rorg']): {
enocean.utils.from_hex_string(function['func']): {
enocean.utils.from_hex_string(type['type'], ): type
for type in function.find_all('profile')
}
for function in telegram.find_all('profiles')
}
for telegram in self.soup.find_all('telegram')
}
@staticmethod
def _get_raw(source, bitarray):
''' Get raw data as integer, based on offset and size '''
offset = int(source['offset'])
size = int(source['size'])
return int(''.join(['1' if digit else '0' for digit in bitarray[offset:offset + size]]), 2)
@staticmethod
def _set_raw(target, raw_value, bitarray):
''' put value into bit array '''
offset = int(target['offset'])
size = int(target['size'])
for digit in range(size):
bitarray[offset+digit] = (raw_value >> (size-digit-1)) & 0x01 != 0
return bitarray
@staticmethod
def _get_rangeitem(source, raw_value):
for rangeitem in source.find_all('rangeitem'):
if raw_value in range(int(rangeitem.get('start', -1)), int(rangeitem.get('end', -1)) + 1):
return rangeitem
def _get_value(self, source, bitarray):
''' Get value, based on the data in XML '''
raw_value = self._get_raw(source, bitarray)
rng = source.find('range')
rng_min = float(rng.find('min').text)
rng_max = float(rng.find('max').text)
scl = source.find('scale')
scl_min = float(scl.find('min').text)
scl_max = float(scl.find('max').text)
return {
source['shortcut']: {
'description': source.get('description'),
'unit': source['unit'],
'value': (scl_max - scl_min) / (rng_max - rng_min) * (raw_value - rng_min) + scl_min,
'raw_value': raw_value,
}
}
def _get_enum(self, source, bitarray):
''' Get enum value, based on the data in XML '''
raw_value = self._get_raw(source, bitarray)
# Find value description.
value_desc = source.find('item', {'value': str(raw_value)}) or self._get_rangeitem(source, raw_value)
return {
source['shortcut']: {
'description': source.get('description'),
'unit': source.get('unit', ''),
'value': value_desc['description'].format(value=raw_value),
'raw_value': raw_value,
}
}
def _get_boolean(self, source, bitarray):
''' Get boolean value, based on the data in XML '''
raw_value = self._get_raw(source, bitarray)
return {
source['shortcut']: {
'description': source.get('description'),
'unit': source.get('unit', ''),
'value': True if raw_value else False,
'raw_value': raw_value,
}
}
def _set_value(self, target, value, bitarray):
''' set given numeric value to target field in bitarray '''
# derive raw value
rng = target.find('range')
rng_min = float(rng.find('min').text)
rng_max = float(rng.find('max').text)
scl = target.find('scale')
scl_min = float(scl.find('min').text)
scl_max = float(scl.find('max').text)
raw_value = (value - scl_min) * (rng_max - rng_min) / (scl_max - scl_min) + rng_min
# store value in bitfield
return self._set_raw(target, int(raw_value), bitarray)
def _set_enum(self, target, value, bitarray):
''' set given enum value (by string or integer value) to target field in bitarray '''
# derive raw value
if isinstance(value, int):
# check whether this value exists
if target.find('item', {'value': value}) or self._get_rangeitem(target, value):
# set integer values directly
raw_value = value
else:
raise ValueError('Enum value "%s" not found in EEP.' % (value))
else:
value_item = target.find('item', {'description': value})
if value_item is None:
raise ValueError('Enum description for value "%s" not found in EEP.' % (value))
raw_value = int(value_item['value'])
return self._set_raw(target, raw_value, bitarray)
@staticmethod
def _set_boolean(target, data, bitarray):
''' set given value to target bit in bitarray '''
bitarray[int(target['offset'])] = data
return bitarray
def find_profile(self, bitarray, eep_rorg, rorg_func, rorg_type, direction=None, command=None):
''' Find profile and data description, matching RORG, FUNC and TYPE '''
if not self.init_ok:
self.logger.warn('EEP.xml not loaded!')
return None
if eep_rorg not in self.telegrams.keys():
self.logger.warn('Cannot find rorg in EEP!')
return None
if rorg_func not in self.telegrams[eep_rorg].keys():
self.logger.warn('Cannot find func in EEP!')
return None
if rorg_type not in self.telegrams[eep_rorg][rorg_func].keys():
self.logger.warn('Cannot find type in EEP!')
return None
profile = self.telegrams[eep_rorg][rorg_func][rorg_type]
if command:
# multiple commands can be defined, with the command id always in same location (per RORG-FUNC-TYPE).
eep_command = profile.find('command', recursive=False)
# If commands are not set in EEP, or command is None,
# get the first data as a "best guess".
if not eep_command:
return profile.find('data', recursive=False)
# If eep_command is defined, so should be data.command
return profile.find('data', {'command': str(command)}, recursive=False)
# extract data description
# the direction tag is optional
if direction is None:
return profile.find('data', recursive=False)
return profile.find('data', {'direction': direction}, recursive=False)
def get_values(self, profile, bitarray, status):
''' Get keys and values from bitarray '''
if not self.init_ok or profile is None:
return [], {}
output = OrderedDict({})
for source in profile.contents:
if not source.name:
continue
if source.name == 'value':
output.update(self._get_value(source, bitarray))
if source.name == 'enum':
output.update(self._get_enum(source, bitarray))
if source.name == 'status':
output.update(self._get_boolean(source, status))
return output.keys(), output
def set_values(self, profile, data, status, properties):
''' Update data based on data contained in properties '''
if not self.init_ok or profile is None:
return data, status
for shortcut, value in properties.items():
# find the given property from EEP
target = profile.find(shortcut=shortcut)
if not target:
# TODO: Should we raise an error?
self.logger.warning('Cannot find data description for shortcut %s', shortcut)
continue
# update bit_data
if target.name == 'value':
data = self._set_value(target, value, data)
if target.name == 'enum':
data = self._set_enum(target, value, data)
if target.name == 'status':
status = self._set_boolean(target, value, status)
return data, status
|