1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
# Copyright (c) 2014 Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from array import array
from .errors import CompletionCodeError, DecodingError
from .utils import check_completion_code, ByteBuffer
from .msgs import create_request_by_name
from .msgs import constants
from .event import EVENT_ASSERTION, EVENT_DEASSERTION
from .helper import clear_repository_helper
from .state import State
class Sel(object):
def get_sel_entries_count(self):
info = SelInfo(self.send_message_with_name('GetSelInfo'))
return info.entries
def get_sel_reservation_id(self):
rsp = self.send_message_with_name('ReserveSel')
return rsp.reservation_id
def _clear_sel(self, cmd, reservation):
rsp = self.send_message_with_name('ClearSel',
reservation_id=reservation,
cmd=cmd)
return rsp.status.erase_in_progress
def clear_sel(self, retry=5):
clear_repository_helper(self.get_sel_reservation_id,
self._clear_sel, retry)
def delete_sel_entry(self, record_id, reservation=0):
rsp = self.send_message_with_name('DeleteSelEntry',
reservation_id=reservation,
record_id=record_id)
return rsp.record_id
def get_and_clear_sel_entry(self, record_id):
"""Atomically gets and clears the specified SEL record"""
while True:
reservation = self.get_sel_reservation_id()
try:
sel_entry, _ = self.get_sel_entry(record_id, reservation)
except CompletionCodeError as e:
if e.cc == constants.CC_RES_CANCELED:
continue
else:
raise
try:
self.delete_sel_entry(record_id, reservation)
except CompletionCodeError as e:
if e.cc == constants.CC_RES_CANCELED:
continue
else:
raise
return sel_entry
def get_sel_entry(self, record_id, reservation=0):
ENTIRE_RECORD = 0xff
req = create_request_by_name('GetSelEntry')
req.reservation_id = reservation
req.record_id = record_id
req.offset = 0
self.max_req_len = ENTIRE_RECORD
record_data = ByteBuffer()
while True:
req.length = self.max_req_len
if (self.max_req_len != 0xff
and (req.offset + req.length) > 16):
req.length = 16 - req.offset
rsp = self.send_message(req)
if rsp.completion_code == constants.CC_CANT_RET_NUM_REQ_BYTES:
if self.max_req_len == 0xff:
self.max_req_len = 16
else:
self.max_req_len -= 1
continue
else:
check_completion_code(rsp.completion_code)
record_data.extend(rsp.record_data)
req.offset = len(record_data)
if len(record_data) >= 16:
break
return (SelEntry(record_data), rsp.next_record_id)
def sel_entries(self):
"""Generator which returns all SEL entries."""
START_SEL_RECORD_ID = 0
END_SEL_RECORD_ID = 0xffff
if self.get_sel_entries_count() == 0:
return
reservation_id = self.get_sel_reservation_id()
next_record_id = START_SEL_RECORD_ID
while True:
(sel_entry, next_record_id) = self.get_sel_entry(next_record_id,
reservation_id)
yield sel_entry
if next_record_id == END_SEL_RECORD_ID:
break
def get_sel_entries(self):
"""Return all SEL entries as a list."""
return list(self.sel_entries())
class SelInfo(State):
def _from_response(self, rsp):
self.version = rsp.version
self.entries = rsp.entries
self.free_bytes = rsp.free_bytes
self.most_recent_addition = rsp.most_recent_addition
self.most_recent_erase = rsp.most_recent_erase
self.operation_support = []
if rsp.operation_support.get_sel_allocation_info:
self.operation_support.append('get_sel_allocation_info')
if rsp.operation_support.reserve_sel:
self.operation_support.append('reserve_sel')
if rsp.operation_support.partial_add_sel_entry:
self.operation_support.append('partial_add_sel_entry')
if rsp.operation_support.delete_sel:
self.operation_support.append('delete_sel')
if rsp.operation_support.overflow_flag:
self.operation_support.append('overflow_flag')
class SelEntry(State):
TYPE_SYSTEM_EVENT = 0x02
TYPE_OEM_TIMESTAMPED_RANGE = list(range(0xc0, 0xe0))
TYPE_OEM_NON_TIMESTAMPED_RANGE = list(range(0xe0, 0x100))
def __str__(self):
raw = '[%s]' % (' '.join(['0x%02x' % b for b in self.data]))
string = []
string.append('SEL Record ID 0x%04x' % self.record_id)
string.append(' Raw: %s' % raw)
string.append(' Type: %d' % self.type)
string.append(' Timestamp: %d' % self.timestamp)
string.append(' Generator: %d' % self.generator_id)
string.append(' EvM rev: %d' % self.evm_rev)
string.append(' Sensor Type: 0x%02x' % self.sensor_type)
string.append(' Sensor Number: %d' % self.sensor_number)
string.append(' Event Direction: %d' % self.event_direction)
string.append(' Event Type: 0x%02x' % self.event_type)
string.append(' Event Data: %s' % array('B', self.event_data).tolist())
return "\n".join(string)
@staticmethod
def type_to_string(entry_type):
string = None
if entry_type == SelEntry.TYPE_SYSTEM_EVENT:
string = 'System Event'
elif entry_type in SelEntry.TYPE_OEM_TIMESTAMPED_RANGE:
string = 'OEM timestamped (0x%02x)' % entry_type
elif entry_type in SelEntry.TYPE_OEM_NON_TIMESTAMPED_RANGE:
string = 'OEM non-timestamped (0x%02x)' % entry_type
return string
def _from_response(self, data):
if len(data) != 16:
raise DecodingError('Invalid SEL record length (%d)' % len(data))
self.data = data
# pop will change data, therefore copy it
buffer = ByteBuffer(data)
self.record_id = buffer.pop_unsigned_int(2)
self.type = buffer.pop_unsigned_int(1)
if (self.type != self.TYPE_SYSTEM_EVENT
and self.type not in self.TYPE_OEM_TIMESTAMPED_RANGE
and self.type not in self.TYPE_OEM_NON_TIMESTAMPED_RANGE):
raise DecodingError('Unknown SEL type (0x%02x)' % self.type)
self.timestamp = buffer.pop_unsigned_int(4)
self.generator_id = buffer.pop_unsigned_int(2)
self.evm_rev = buffer.pop_unsigned_int(1)
self.sensor_type = buffer.pop_unsigned_int(1)
self.sensor_number = buffer.pop_unsigned_int(1)
event_desc = buffer.pop_unsigned_int(1)
if event_desc & 0x80:
self.event_direction = EVENT_DEASSERTION
else:
self.event_direction = EVENT_ASSERTION
self.event_type = event_desc & 0x7f
self.event_data = [buffer.pop_unsigned_int(1) for _ in range(3)]
|