
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import array
import pytest
from pyipmi.session import Session
from pyipmi.interfaces.rmcp import (AsfMsg, AsfPing, AsfPong, IpmiMsg, RmcpMsg)
from pyipmi.utils import py3_array_tobytes
from pyipmi.errors import DecodingError
class TestRmcpMsg:
def test_rmcpmsg_pack(self):
m = RmcpMsg(0x7)
pdu = m.pack(None, 0xff)
assert pdu == b'\x06\x00\xff\x07'
m = RmcpMsg(0x7)
pdu = m.pack(b'\x11\x22\x33\x44', 0xff)
assert pdu == b'\x06\x00\xff\x07\x11\x22\x33\x44'
def test_rmcpmsg_unpack(self):
pdu = b'\x06\x00\xee\x07\x44\x33\x22\x11'
m = RmcpMsg()
sdu = m.unpack(pdu)
assert m.version == 6
assert m.seq_number == 0xee
assert m.class_of_msg == 0x7
assert sdu == b'\x44\x33\x22\x11'
class TestAsfMsg:
def test_pack(self):
m = AsfMsg()
pdu = m.pack()
assert pdu == b'\x00\x00\x11\xbe\x00\x00\x00\x00'
def test_unpack(self):
pdu = b'\x00\x00\x11\xbe\x00\x00\x00\x00'
msg = AsfMsg()
msg.unpack(pdu)
def test_tostr(self):
m = AsfMsg()
m.data = b'\xaa\xbb\xcc'
assert str(m) == 'aa bb cc'
class TestAsfPing():
def test_pack(self):
m = AsfPing()
pdu = m.pack()
assert pdu == b'\x00\x00\x11\xbe\x80\x00\x00\x00'
class TestAsfPong():
def test_unpack(self):
pdu = b'\x00\x00\x11\xbe\x40\x00\x00\x10\x00\x00\x11\xbe\x00\x00\x00\x00\x81\x00\x00\x00\x00\x00\x00\x00'
m = AsfPong()
m.unpack(pdu)
class TestIpmiMsg:
def test_ipmimsg_pack(self):
m = IpmiMsg()
pdu = m.pack(None)
assert pdu == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_password(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
m = IpmiMsg(session=s)
psw = m._padd_password()
assert psw == b'admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
s = Session()
s.set_auth_type_user(b'admin', b'admin')
m = IpmiMsg(session=s)
psw = m._padd_password()
assert psw == b'admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_with_data(self):
data = py3_array_tobytes(array.array('B', (1, 2, 3, 4)))
m = IpmiMsg()
pdu = m.pack(data)
assert pdu == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x01\x02\x03\x04'
def test_ipmimsg_pack_with_session(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
s.sequence_number = 0x14131211
s.sid = 0x18171615
m = IpmiMsg(session=s)
pdu = m.pack(None)
assert pdu == b'\x04\x11\x12\x13\x14\x15\x16\x17\x18admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
def test_ipmimsg_pack_auth_md5(self):
s = Session()
s.set_auth_type_user('admin', 'admin')
s.sid = 0x02f99b85
m = IpmiMsg(session=s)
sdu = b'\x20\x18\xc8\x81\x0c\x3a\x02\x04\xe1\x2c\xb4\xd3\x17\xdc\x40\xdf\xe9\x78\x1e\x6d\x8e\x10\xad\xeb\x2c\xe8\x5c\xa0\x5b'
auth = m._pack_auth_code_md5(sdu)
assert auth == b'\x40\x46\xb1\x51\x4c\x89\x7f\x73\xc2\xfb\xa7\x4d\xf8\x03\x73\x8c'
def test_ipmimsg_unpack(self):
pdu = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x00'
m = IpmiMsg()
m.unpack(pdu)
assert m.auth_type == 0
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
def test_ipmimsg_unpack_auth(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x00'
m = IpmiMsg()
m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
def test_ipmimsg_unpack_check_sdu_length(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02\x00\x00\x00'
m = IpmiMsg()
with pytest.raises(DecodingError):
# data len is 2 ( byte 25 = \x02) but actual payload length is 3
# (\x00\x00\x00) so we have len(pdu) != header_len + data_len
m.unpack(pdu)
def test_ipmimsg_unpack_no_check_sdu_length(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02\x00\x00\x00'
m = IpmiMsg(ignore_sdu_length=True)
# Same PDU here, we have len(pdu) != header_len + data_len
sdu = m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
assert sdu == b'\x00\x00\x00'
def tests_ipmimsg_unpack_no_check_sdu_length_empty_sdu(self):
pdu = b'\x01\x11\x22\x33\x44\x55\x66\x77\x88\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x02'
m = IpmiMsg(ignore_sdu_length=True)
# We have len(pdu) != header_len + data_len
sdu = m.unpack(pdu)
assert m.auth_type == 1
assert m.sequence_number == 0x11223344
assert m.session_id == 0x55667788
assert m.auth_code == [1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16]
assert sdu is None
class TestRmcp:
# def test_send_and_receive_raw(self):
# mock_send = MagicMock()
# mock_recv = MagicMock()
# mock_recv.return_value = (b'\x06\x00\xee\x07\x00\x00\x00\x00\x00\x00'
# b'\x00\x00\x00\x06'
# b'\x01\x02\x03\x04\x05\x06', 0)
# target = Target()
# target.ipmb_address = 0x20
# rmcp = Rmcp()
# rmcp.host = '10.10.10.10'
# rmcp.port = 637
# rmcp._sock.sendto = mock_send
# rmcp._sock.recvfrom = mock_recv
# rmcp.send_and_receive_raw(target, 0, 0, b'\x00')
# rmcp._send_ipmi_msg.assert_called_with(1)
def test_send_and_receive(self):
pass
|