1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import array
import pytest
from pyipmi.session import Session
from pyipmi.interfaces.rmcp import (AsfMsg, AsfPing, AsfPong, IpmiMsg, RmcpMsg)
from pyipmi.utils import py3_array_tobytes
from pyipmi.errors import DecodingError
class TestRmcpMsg:
def test_rmcpmsg_pack(self):
m = RmcpMsg(0x7)
pdu = m.pack(None, 0xff)
assert pdu == b'\x06\x00\xff\x07'
m = RmcpMsg(0x7)
pdu = m.pack(b'\x11\x22\x33\x44', 0xff)
assert pdu == b'\x06\x00\xff\x07\x11\x22\x33\x44'
def test_rmcpmsg_unpack(self):
pdu = b'\x06\x00\xee\x07\x44\x33\x22\x11'
m = RmcpMsg()
sdu = m.unpack(pdu)
assert m.version == 6
assert m.seq_number == 0xee
assert m.class_of_msg == 0x7
assert sdu == b'\x44\x33\x22\x11'
class TestAsfMsg:
def test_pack(self):
m = AsfMsg()
pdu = m.pack()
assert pdu == b'\x00\x00\x11\xbe\x00\x00\x00\x00'
def test_unpack(self):
pdu = b'\x00\x00\x11\xbe\x00\x00\x00\x00'
msg = AsfMsg()
msg.unpack(pdu)
def test_tostr(self):
m = AsfMsg()
m.data = b'\xaa\xbb\xcc'
assert str(m) == 'aa bb cc'
class TestAsfPing():
def test_pack(self):
m = AsfPing()
pdu = m.pack()
assert pdu == b'\x00\x00\x11\xbe\x80\x00\x00\x00'
class TestAsfPong():
def test_unpack(self):
pdu = b'\x00\x00\x11\xbe\x40\x00\x00\x10\x00\x00\x11\xbe\x00\x00\x00\x00\x81\x00\x00\x00\x00\x00\x00\x00'
m = AsfPong()
m.unpack(pdu)
class TestIpmiMsg:
def test_ipmimsg_pack(self):
m = IpmiMsg()
pdu = m.pack(None)
assert pdu == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_password(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
m = IpmiMsg(session=s)
psw = m._padd_password()
assert psw == b'admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
s = Session()
s.set_auth_type_user(b'admin', b'admin')
m = IpmiMsg(session=s)
psw = m._padd_password()
assert psw == b'admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_with_data(self):
data = py3_array_tobytes(array.array('B', (1, 2, 3, 4)))
m = IpmiMsg()
pdu = m.pack(data)
assert pdu == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x01\x02\x03\x04'
def test_ipmimsg_pack_with_session(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
s.sequence_number = 0x14131211
s.sid = 0x18171615
m = IpmiMsg(session=s)
pdu = m.pack(None)
assert pdu == b'\x04\x11\x12\x13\x14\x15\x16\x17\x18admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_auth_md5(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
s.sid = 0x02f99b85
m = IpmiMsg(session=s)
sdu = b'\x20\x18\xc8\x81\x0c\x3a\x02\x04\xe1\x2c\xb4\xd3\x17\xdc\x40\xdf\xe9\x78\x1e\x6d\x8e\x10\xad\xeb\x2c\xe8\x5c\xa0\x5b'
auth = m._pack_auth_code_md5(sdu)
assert auth == b'\x40\x46\xb1\x51\x4c\x89\x7f\x73\xc2\xfb\xa7\x4d\xf8\x03\x73\x8c'
def test_ipmimsg_unpack(self):
pdu = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x00'
m = IpmiMsg()
m.unpack(pdu)
assert m.auth_type == 0
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
def test_ipmimsg_unpack_auth(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x00'
m = IpmiMsg()
m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
def test_ipmimsg_unpack_check_sdu_length(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02\x00\x00\x00'
m = IpmiMsg()
with pytest.raises(DecodingError):
# data len is 2 ( byte 25 = \x02) but actual payload length is 3
# (\x00\x00\x00) so we have len(pdu) != header_len + data_len
m.unpack(pdu)
def test_ipmimsg_unpack_no_check_sdu_length(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02\x00\x00\x00'
m = IpmiMsg(ignore_sdu_length=True)
# Same PDU here, we have len(pdu) != header_len + data_len
sdu = m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
assert sdu == b'\x00\x00\x00'
def tests_ipmimsg_unpack_no_check_sdu_length_empty_sdu(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02'
m = IpmiMsg(ignore_sdu_length=True)
# We have len(pdu) != header_len + data_len
sdu = m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
assert sdu is None
class TestRmcp:
# def test_send_and_receive_raw(self):
# mock_send = MagicMock()
# mock_recv = MagicMock()
# mock_recv.return_value = (b'\x06\x00\xee\x07\x00\x00\x00\x00\x00\x00'
# b'\x00\x00\x00\x06'
# b'\x01\x02\x03\x04\x05\x06', 0)
# target = Target()
# target.ipmb_address = 0x20
# rmcp = Rmcp()
# rmcp.host = '10.10.10.10'
# rmcp.port = 637
# rmcp._sock.sendto = mock_send
# rmcp._sock.recvfrom = mock_recv
# rmcp.send_and_receive_raw(target, 0, 0, b'\x00')
# rmcp._send_ipmi_msg.assert_called_with(1)
def test_send_and_receive(self):
pass
|