1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
|
# Copyright (c) 2014 Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from __future__ import print_function
import os
import codecs
import struct
import collections
import hashlib
import time
from array import array
from .errors import CompletionCodeError, HpmError, IpmiTimeoutError
from .msgs import create_request_by_name
from .msgs import constants
from .utils import check_completion_code, bcd_search, chunks
from .utils import py3dec_unic_bytes_fix, py3_array_tobytes
from .state import State
from .fields import VersionField
PROPERTY_GENERAL_PROPERTIES = 0
PROPERTY_CURRENT_VERSION = 1
PROPERTY_DESCRIPTION_STRING = 2
PROPERTY_ROLLBACK_VERSION = 3
PROPERTY_DEFERRED_VERSION = 4
PROPERTY_OEM = list(range(192, 255))
ACTION_BACKUP_COMPONENT = 0x00
ACTION_PREPARE_COMPONENT = 0x01
ACTION_UPLOAD_FOR_UPGRADE = 0x02
ACTION_UPLOAD_FOR_COMPARE = 0x03
CC_LONG_DURATION_CMD_IN_PROGRESS = 0x80
CC_GET_COMP_PROP_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81
CC_GET_COMP_PROP_INVALID_COMPONENT = 0x82
CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR = 0x83
CC_INITIATE_UPGRADE_CMD_IN_PROGRESS = 0x80
CC_INITIATE_UPGRADE_INVALID_COMPONENT = 0x81
CC_QUERY_SELFTEST_COMPLETED = 0x00
CC_QUERY_SELFTEST_IN_PROGRESS = 0x80
CC_QUERY_SELFTEST_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81
CC_QUERY_SELFTEST_NO_RESULTS_AVAILABLE = 0xD5
CC_ABORT_UPGRADE_CANNOT_ABORT = 0x80
CC_ABORT_UPGRADE_CANNOT_RESUME_OPERATION = 0x81
class Hpm(object):
@staticmethod
def _get_component_count(components):
"""Return the number of components."""
return bin(components).count('1')
def get_target_upgrade_capabilities(self):
rsp = self.send_message_with_name('GetTargetUpgradeCapabilities')
return TargetUpgradeCapabilities(rsp)
def get_component_property(self, component_id, property_id):
rsp = self.send_message_with_name('GetComponentProperties',
id=component_id,
selector=property_id)
return ComponentProperty.from_data(property_id, rsp.data)
def get_component_properties(self, component_id):
properties = []
for p in (PROPERTY_GENERAL_PROPERTIES, PROPERTY_CURRENT_VERSION,
PROPERTY_DESCRIPTION_STRING, PROPERTY_ROLLBACK_VERSION,
PROPERTY_DEFERRED_VERSION):
try:
prop = self.get_component_property(component_id, p)
if prop is not None:
properties.append(prop)
except CompletionCodeError as e:
if e.cc == CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR:
continue
return properties
def find_component_id_by_descriptor(self, descriptor):
caps = self.get_target_upgrade_capabilities()
for component_id in caps.components:
prop = self.get_component_property(component_id,
PROPERTY_DESCRIPTION_STRING)
if prop is not None:
if prop.description == descriptor:
return component_id
return None
def abort_firmware_upgrade(self):
self.send_message_with_name('AbortFirmwareUpgrade')
def initiate_upgrade_action(self, components_mask, action):
"""Initiate Upgrade Action.
components:
action:
ACTION_BACKUP_COMPONENT = 0x00
ACTION_PREPARE_COMPONENT = 0x01
ACTION_UPLOAD_FOR_UPGRADE = 0x02
ACTION_UPLOAD_FOR_COMPARE = 0x03
"""
if action in (ACTION_UPLOAD_FOR_UPGRADE, ACTION_UPLOAD_FOR_COMPARE):
if self._get_component_count(components_mask) != 1:
raise HpmError("more than 1 component not support for action")
self.send_message_with_name('InitiateUpgradeAction',
components=components_mask, action=action)
def initiate_upgrade_action_and_wait(self, components_mask, action,
timeout=2, interval=0.1):
"""Initiate Upgrade Action and wait for long running command."""
try:
self.initiate_upgrade_action(components_mask, action)
except CompletionCodeError as e:
if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
self.wait_for_long_duration_command(
constants.CMDID_HPM_INITIATE_UPGRADE_ACTION,
timeout, interval)
else:
raise HpmError('initiate_upgrade_action CC=0x%02x' % e.cc)
def upload_firmware_block(self, block_number, data):
if isinstance(data, str):
data = [ord(c) for c in data]
self.send_message_with_name('UploadFirmwareBlock', number=block_number,
data=data)
@staticmethod
def _determine_max_block_size():
return 22
def upload_binary(self, binary, timeout=2, interval=0.1, retry=3):
"""Upload all firmware blocks from a binary."""
block_number = 0
block_size = self._determine_max_block_size()
for chunk in chunks(binary, block_size):
try:
self.upload_firmware_block(block_number, chunk)
except CompletionCodeError as e:
if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
self.wait_for_long_duration_command(
constants.CMDID_HPM_UPLOAD_FIRMWARE_BLOCK,
timeout, interval)
else:
raise HpmError('upload_firmware_block CC=0x%02x' % e.cc)
except IpmiTimeoutError:
retry -= 1
if retry == 0:
raise IpmiTimeoutError()
block_number += 1
block_number &= 0xff
def finish_firmware_upload(self, component, length):
return self.send_message_with_name('FinishFirmwareUpload',
component_id=component,
image_length=length)
def finish_upload_and_wait(self, component, length,
timeout=2, interval=0.1):
"""Finish, upload and for the firmware."""
try:
rsp = self.finish_firmware_upload(component, length)
check_completion_code(rsp.completion_code)
except CompletionCodeError as e:
if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
self.wait_for_long_duration_command(
constants.CMDID_HPM_FINISH_FIRMWARE_UPLOAD,
timeout, interval)
else:
raise HpmError('finish_firmware_upload CC=0x%02x' % e.cc)
def get_upgrade_status(self):
return UpgradeStatus(self.send_message_with_name('GetUpgradeStatus'))
def wait_for_long_duration_command(self, expected_cmd, timeout, interval):
start_time = time.time()
while time.time() < start_time + timeout:
try:
status = self.get_upgrade_status()
if status.command_in_progress is not expected_cmd \
and status.command_in_progress != 0x34:
pass
if status.last_completion_code \
== CC_LONG_DURATION_CMD_IN_PROGRESS:
time.sleep(interval)
else:
return
except IpmiTimeoutError:
time.sleep(interval)
except IOError:
time.sleep(interval)
def activate_firmware(self, rollback_override=None):
req = create_request_by_name('ActivateFirmware')
if rollback_override is not None:
req.rollback_override_policy = rollback_override
rsp = self.send_message(req)
check_completion_code(rsp.completion_code)
def activate_firmware_and_wait(self, rollback_override=None,
timeout=2, interval=1):
"""Activate and wait for the new uploaded firmware."""
try:
self.activate_firmware(rollback_override)
except CompletionCodeError as e:
if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
self.wait_for_long_duration_command(
constants.CMDID_HPM_ACTIVATE_FIRMWARE,
timeout, interval)
else:
raise HpmError('activate_firmware CC=0x%02x' % e.cc)
except IpmiTimeoutError:
# controller is in reset and flashed new firmware
pass
def query_selftest_results(self):
return SelfTestResult(
self.send_message_with_name('QuerySelftestResults'))
def query_rollback_status(self):
return RollbackStatus(
self.send_message_with_name('QueryRollbackStatus'))
def initiate_manual_rollback(self):
return RollbackStatus(
self.send_message_with_name('InitiateManualRollback'))
def initiate_manual_rollback_and_wait(self, timeout=2, interval=0.1):
try:
self.initiate_manual_rollback()
except CompletionCodeError as e:
if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
self.wait_for_long_duration_command(
constants.CMDID_HPM_INITIATE_MANUAL_ROLLBACK,
60, interval)
else:
raise HpmError('activate_firmware CC=0x%02x' % e.cc)
except IpmiTimeoutError:
# controller is in reset and flashed new firmware
pass
@staticmethod
def open_upgrade_image(filename):
return UpgradeImage(filename)
@staticmethod
def get_upgrade_version_from_file(filename):
image = UpgradeImage(filename)
for action in image.actions:
if isinstance(action, UpgradeActionRecordUploadForUpgrade):
return action.firmware_version
return None
@staticmethod
def _do_upgrade_action_backup(image):
for action in image.actions:
if isinstance(action, UpgradeActionRecordBackup):
pass
@staticmethod
def _do_upgrade_action_prepare(image):
for action in image.actions:
if isinstance(action, UpgradeActionRecordPrepare):
print("do ACTION_PREPARE_COMPONENT")
@staticmethod
def _do_upgrade_action_upload(image):
for action in image.actions:
if isinstance(action, UpgradeActionRecordUploadForUpgrade):
print("do ACTION_UPLOAD_FOR_UPGRADE")
def preparation_stage(self, image):
####################################################
# match device ID, manfuacturer ID, etc.
device_id = self.get_device_id()
header = image.header
if header.device_id != device_id.device_id:
raise HpmError('Device ID: image=0x%x device=0x%x'
% (header.device_id, device_id.device_id))
if header.manufacturer_id != device_id.manufacturer_id:
raise HpmError('Manufacturer ID: image=0x%x device=0x%x'
% (header.manufacturer_id,
device_id.manufacturer_id))
if header.product_id != device_id.product_id:
raise HpmError('Product ID: image=0x%x device=0x%x'
% (header.product_id, device_id.product_id))
# tbd check version
####################################################
# compare current revision with upgrade image earlist comp rev
targetCap = self.get_target_upgrade_capabilities()
# tbd check version
####################################################
# Match IPM Controller capabilities with Upgrade Image capabilities
support = False
for imageComponent in header.components:
if imageComponent in targetCap.components:
support = True
if support is not True:
raise HpmError('no supported component in image')
def upgrade_stage(self, image, component):
for action in image.actions:
if action.components & (1 << component) == 0:
continue
self.initiate_upgrade_action_and_wait(1 << component,
action.action_type)
if isinstance(action, UpgradeActionRecordUploadForUpgrade):
self.upload_binary(action.firmware_image_data)
self.finish_upload_and_wait(component, action.firmware_length)
def _activation_state_do_self_testing(self):
pass
def wait_until_new_firmware_comes_up(self, timeout, interval):
start_time = time.time()
while time.time() < start_time + timeout:
try:
self.get_upgrade_status()
self.get_device_id()
except IpmiTimeoutError:
time.sleep(interval)
except IOError:
time.sleep(interval)
time.sleep(5)
def activation_stage(self, image, component):
self.activate_firmware_and_wait(
image.header.inaccessibility_timeout, 1)
self.wait_until_new_firmware_comes_up(
image.header.inaccessibility_timeout, 1)
self._activation_state_do_self_testing()
def install_component_from_image(self, image, component):
self.abort_firmware_upgrade()
if component not in image.header.components:
raise HpmError('component=%d not in image' % component)
self.preparation_stage(image)
self.upgrade_stage(image, component)
self.activation_stage(image, component)
def install_component_from_file(self, filename, component):
image = UpgradeImage(filename)
self.install_component_from_image(image, component)
class UpgradeStatus(State):
def _from_response(self, rsp):
self.command_in_progress = rsp.command_in_progress
self.last_completion_code = rsp.last_completion_code
def __str__(self):
string = []
string.append("cmd=0x%02x cc=0x%02x" %
(self.command_in_progress, self.last_completion_code))
return "\n".join(string)
class TargetUpgradeCapabilities(State):
def _from_response(self, rsp):
self.version = rsp.hpm_1_version
self.components = []
for i in range(8):
if rsp.component_present & (1 << i):
self.components.append(i)
def __str__(self):
string = []
string.append("Target Upgrade Capabilities")
string.append(" HPM.1 version: %s" % self.version)
string.append(" Components: %s" % self.components)
return "\n".join(string)
codecs.register(bcd_search)
class ComponentProperty(object):
def __init__(self, data=None):
if (data):
self._from_rsp_data(data)
@staticmethod
def from_data(component_id, data):
if isinstance(data, str):
data = [ord(c) for c in data]
if component_id is PROPERTY_GENERAL_PROPERTIES:
return ComponentPropertyGeneral(data)
elif component_id is PROPERTY_CURRENT_VERSION:
return ComponentPropertyCurrentVersion(data)
elif component_id is PROPERTY_DESCRIPTION_STRING:
return ComponentPropertyDescriptionString(data)
elif component_id is PROPERTY_ROLLBACK_VERSION:
return ComponentPropertyRollbackVersion(data)
elif component_id is PROPERTY_DEFERRED_VERSION:
return ComponentPropertyDeferredVersion(data)
elif component_id in PROPERTY_OEM:
raise NotImplementedError
class ComponentPropertyGeneral(ComponentProperty):
ROLLBACK_SUPPORT_MASK = 0x03
PREPARATION_SUPPORT_MASK = 0x04
COMPARISON_SUPPORT_MASK = 0x08
DEFERRED_ACTIVATION_SUPPORT_MASK = 0x10
PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK = 0x20
def _from_rsp_data(self, data):
support = []
cap = data[0]
if cap & self.ROLLBACK_SUPPORT_MASK == 0:
support.append('rollback_backup_not_supported')
elif cap & self.ROLLBACK_SUPPORT_MASK == 1:
support.append('rollback_is_supported')
elif cap & self.ROLLBACK_SUPPORT_MASK == 2:
support.append('rollback_is_supported')
elif cap & self.ROLLBACK_SUPPORT_MASK == 3:
support.append('reserved')
if cap & self.PREPARATION_SUPPORT_MASK:
support.append('prepartion')
if cap & self.COMPARISON_SUPPORT_MASK:
support.append('comparison')
if cap & self.DEFERRED_ACTIVATION_SUPPORT_MASK:
support.append('deferred_activation')
if cap & self.PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK:
support.append('payload_cold_reset_required')
self.general = support
class ComponentPropertyCurrentVersion(ComponentProperty):
def _from_rsp_data(self, data):
self.version = VersionField(data)
class ComponentPropertyDescriptionString(ComponentProperty):
def _from_rsp_data(self, data):
descr = py3_array_tobytes(array('B', data))
descr = py3dec_unic_bytes_fix(descr)
# strip '\x00'
descr = descr.replace('\0', '')
self.description = descr
class ComponentPropertyRollbackVersion(ComponentProperty):
def _from_rsp_data(self, data):
self.version = VersionField(data)
class ComponentPropertyDeferredVersion(ComponentProperty):
def _from_rsp_data(self, data):
self.version = VersionField(data)
class ComponentPropertyOem(ComponentProperty):
def _from_rsp_data(self, data):
self.oem_data = data
class SelfTestResult(State):
CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES = 0x57
def _from_response(self, rsp):
self.status = rsp.selftest_result_1
result2 = rsp.selftest_result_2
if self.status != self.CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES:
self.fail_sel = (result2 & 0x80) >> 7
self.fail_sdrr = (result2 & 0x40) >> 6
self.fail_bmc_fru = (result2 & 0x20) >> 5
self.fail_ipmb = (result2 & 0x10) >> 4
self.fail_sdrr_empty = (result2 & 0x08) >> 3
self.fail_bmc_fru_interanl_area = (result2 & 0x04) >> 2
self.fail_bootblock = (result2 & 0x02) >> 1
self.fail_mc = (result2 & 0x01) >> 0
class RollbackStatus(object):
def __init__(self, rsp=None):
if rsp:
self._from_rsp(rsp)
def _from_rsp(self, rsp):
if rsp.completion_estimate:
self.percent_complete = rsp.completion_estimate
image_header = collections.namedtuple('image_header',
['field_name', 'format', 'start', 'len'])
class UpgradeImageHeaderRecord(object):
FORMAT = [
image_header('format_version', 'B', 8, 1),
image_header('device_id', 'B', 9, 1),
image_header('product_id', '<H', 13, 2),
image_header('time', '<L', 15, 4),
image_header('capabilities', 'B', 19, 1),
image_header('selftest_timeout', 'B', 21, 1),
image_header('rollback_timeout', 'B', 22, 1),
image_header('inaccessibility_timeout', 'B', 23, 1),
image_header('earliest_compatible_revision', '<H', 24, 2),
image_header('oem_data_length', '<H', 32, 2),
]
def __init__(self, data=None):
for a in self.FORMAT:
setattr(self, a.field_name, None)
if data:
self._from_data(data)
def _from_data(self, data):
self.signature = data[0:8]
for a in self.FORMAT:
setattr(self, a.field_name, struct.unpack(
a.format, data[a.start:a.start+a.len])[0])
if isinstance(data, str):
data = [ord(c) for c in data]
self.manufacturer_id = data[10] | data[11] << 8 | data[12] << 16
self.components = []
for i in range(8):
if data[20] & (1 << i):
self.components.append(i)
self.earliest_compatible_revision = \
VersionField(data[24:24 + VersionField.VERSION_FIELD_LEN])
self.firmware_revision = \
VersionField(data[26:26 + VersionField.VERSION_WITH_AUX_FIELD_LEN])
if self.oem_data_length:
self.oem_data = data[34:-1]
# XXX checksum check
self.checksum = data[34 + self.oem_data_length]
self.length = 34 + self.oem_data_length+1
def __str__(self):
str = []
str.append("HPM Upgrade Image header")
str.append(" Signature: %s" % self.signature)
str.append(" Format Version: %s" % self.format_version)
str.append(" Device ID: %s" % self.device_id)
str.append(" Manufacturer: %s" % self.manufacturer_id)
str.append(" Product ID: %s" % self.product_id)
str.append(" Time: %s" % self.time)
str.append(" Image Cap: 0x%02x" % self.capabilities)
str.append(" Components: %s" % self.components)
str.append(" Selftest Timeout: %s" % self.selftest_timeout)
str.append(" Rollback Timeout: %s" % self.rollback_timeout)
str.append(" Inacc. Timeout: %s" % self.inaccessibility_timeout)
str.append(" Earliest comp.: %s" % self.earliest_compatible_revision)
str.append(" firmware Revision:%s" % self.firmware_revision)
str.append(" OEM data len: %s" % self.oem_data_length)
return "\n".join(str)
class UpgradeActionRecord(object):
ACTIONS = (
"Backup",
"Prepare",
"Upload for Upgrade",
"Upload for Compare"
)
def __init__(self, data=None):
self.action_type = array('B', data)[0]
if data:
(self.action, self.components, self.checksum) \
= struct.unpack('BBB', data[0:3])
self.length = 3
@staticmethod
def create_from_data(data):
action_type = array('B', data)[0]
if action_type == ACTION_BACKUP_COMPONENT:
return UpgradeActionRecordBackup(data)
elif action_type == ACTION_PREPARE_COMPONENT:
return UpgradeActionRecordPrepare(data)
elif action_type == ACTION_UPLOAD_FOR_UPGRADE:
return UpgradeActionRecordUploadForUpgrade(data)
elif action_type == ACTION_UPLOAD_FOR_COMPARE:
return UpgradeActionRecordUploadForCompare(data)
else:
raise HpmError('unsupported ActionRecord')
def __str__(self):
str = []
str.append("Action Record Type: 0x%x (%s) " %
(self.action, self.ACTIONS[self.action]))
str.append(" Components: 0x%02x" % self.components)
return "\n".join(str)
class UpgradeActionRecordBackup(UpgradeActionRecord):
pass
class UpgradeActionRecordPrepare(UpgradeActionRecord):
pass
class UpgradeActionRecordUploadForUpgrade(UpgradeActionRecord):
def __init__(self, data=None):
UpgradeActionRecord.__init__(self, data)
if data:
self.firmware_version = \
VersionField(
data[3:3 + VersionField.VERSION_WITH_AUX_FIELD_LEN])
self.firmware_description_string \
= py3dec_unic_bytes_fix(data[9:30])
self.firmware_length = struct.unpack('<L', data[30:34])[0]
self.firmware_image_data = data[34:(34 + self.firmware_length)]
self.length += 31 + self.firmware_length
class UpgradeActionRecordUploadForCompare(UpgradeActionRecord):
pass
class ImageChecksumRecord(object):
def __init__(self, data=None):
if data:
self._from_data(data)
def _from_data(self, data):
self.data = data[0:16]
HPM_IMAGE_CHECKSUM_SIZE = 16
class UpgradeImage(object):
def __init__(self, filename=None):
self.actions = None
if filename:
self._from_file(filename)
def __str__(self):
str = []
return "\n".join(str)
def _check_md5_sum(self, filedata):
summer = hashlib.md5()
self.checksum_actual \
= summer.update(filedata[:-HPM_IMAGE_CHECKSUM_SIZE])
self.checksum_expected = filedata[-HPM_IMAGE_CHECKSUM_SIZE:]
def _from_file(self, filename):
try:
file = open(filename, "rb")
except IOError:
print('Error open file "%s"' % filename)
################################
# get file size
file_size = os.stat(filename).st_size
file_data = file.read(file_size)
################################
# get image checksum
self._check_md5_sum(file_data)
# XXX verify checksum
################################
# Upgrade Image Header
self.header = UpgradeImageHeaderRecord(file_data)
off = self.header.length
################################
# Upgrade Actions
self.actions = []
while (off + HPM_IMAGE_CHECKSUM_SIZE) < len(file_data):
action = UpgradeActionRecord.create_from_data(file_data[off:])
self.actions.append(action)
off += action.length
################################
# Image checksum
self.checksum = ImageChecksumRecord(file_data[off:file_size])
file.close()
|