File: messaging.py

package info (click to toggle)
python-ipmi 0.5.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,132 kB
  • sloc: python: 12,645; makefile: 2
file content (198 lines) | stat: -rw-r--r-- 7,404 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright (c) 2016  Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA

from enum import Enum

from .session import Session
from .msgs import create_request_by_name
from .utils import check_completion_code, check_rsp_completion_code
from .state import State


class PasswordOperation(int, Enum):
    DISABLE = 0b00
    ENABLE = 0b01
    SET_PASSWORD = 0b10
    TEST_PASSWORD = 0b11


class UserPrivilegeLevel(str, Enum):
    RESERVED = "reserved"
    CALLBACK = "callback"
    USER = "user"
    OPERATOR = "operator"
    ADMINISTRATOR = "administrator"
    OEM = "oem"
    NO_ACCESS = "no access"


CONVERT_RAW_TO_USER_PRIVILEGE = {
    0x00: UserPrivilegeLevel.RESERVED,
    0x01: UserPrivilegeLevel.CALLBACK,
    0x02: UserPrivilegeLevel.USER,
    0x03: UserPrivilegeLevel.OPERATOR,
    0x04: UserPrivilegeLevel.ADMINISTRATOR,
    0x05: UserPrivilegeLevel.OEM,
    0x0F: UserPrivilegeLevel.NO_ACCESS
}

CONVERT_USER_PRIVILEGE_TO_RAW = {
    UserPrivilegeLevel.RESERVED:      0x00,
    UserPrivilegeLevel.CALLBACK:      0x01,
    UserPrivilegeLevel.USER:          0x02,
    UserPrivilegeLevel.OPERATOR:      0x03,
    UserPrivilegeLevel.ADMINISTRATOR: 0x04,
    UserPrivilegeLevel.OEM:           0x05,
    UserPrivilegeLevel.NO_ACCESS:     0x0F
}


class Messaging(object):
    def get_channel_authentication_capabilities(self, channel, priv_lvl):
        req = create_request_by_name('GetChannelAuthenticationCapabilities')
        req.channel.number = channel
        req.privilege_level.requested = priv_lvl
        rsp = self.send_and_receive(req)
        check_completion_code(rsp.completion_code)
        caps = ChannelAuthenticationCapabilities(rsp)
        return caps

    def set_username(self, userid=0, username=''):
        req = create_request_by_name('SetUserName')
        req.userid.userid = userid
        req.user_name = username.ljust(16, '\x00')
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)

    def get_username(self, userid=0):
        req = create_request_by_name('GetUserName')
        req.userid.userid = userid
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)
        return rsp.user_name

    def get_user_access(self, userid=0, channel=0):
        req = create_request_by_name('GetUserAccess')
        req.userid.userid = userid
        req.channel.channel_number = channel
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)
        return UserAccess(rsp)

    def set_user_access(self, userid, ipmi_msg, link_auth, callback_only,
                        priv_level, channel=0, enable_change=1, user_session_limit=0):
        req = create_request_by_name('SetUserAccess')
        req.channel_access.channel_number = channel
        req.channel_access.ipmi_msg = ipmi_msg
        req.channel_access.link_auth = link_auth
        req.channel_access.callback = callback_only
        req.channel_access.enable_change = enable_change
        req.userid.userid = userid
        req.privilege.privilege_level = CONVERT_USER_PRIVILEGE_TO_RAW.get(
            priv_level, 0x0F)
        req.session_limit.simultaneous_session_limit = user_session_limit
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)

    def set_user_password(self, userid, password=''):
        if len(password) > 16:
            raise ValueError("Password length cannot be greater than 20.")
        req = create_request_by_name('SetUserPassword')
        req.userid.userid = userid
        req.operation.operation = PasswordOperation.SET_PASSWORD
        req.password = password.ljust(16, '\x00')
        rsp = self.send_message(req)
        check_rsp_completion_code(rsp)

    def enable_user(self, userid):
        req = create_request_by_name('SetUserPassword')
        req.userid.userid = userid
        req.operation.operation = PasswordOperation.ENABLE
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)

    def disable_user(self, userid):
        req = create_request_by_name('SetUserPassword')
        req.userid.userid = userid
        req.operation.operation = PasswordOperation.DISABLE
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)


class ChannelAuthenticationCapabilities(State):

    _functions = {
        'none': Session.AUTH_TYPE_NONE,
        'md2': Session.AUTH_TYPE_MD2,
        'md5': Session.AUTH_TYPE_MD5,
        'straight': Session.AUTH_TYPE_PASSWORD,
        'oem_proprietary': Session.AUTH_TYPE_OEM,
    }

    def _from_response(self, rsp):
        self.channel = rsp.channel_number
        self.auth_types = []

        self.ipmi_1_5 = False
        self.ipmi_2_0 = False

        if rsp.support.ipmi_2_0:
            self.ipmi_2_0 = True
        else:
            self.ipmi_1_5 = True

        for function in self._functions.keys():
            if hasattr(rsp.support, function):
                if getattr(rsp.support, function):
                    self.auth_types.append(function)

    def get_max_auth_type(self):
        for auth_type in ('md5', 'md2', 'straight', 'oem_proprietary', 'none'):
            if auth_type in self.auth_types:
                return self._functions[auth_type]
        return None

    def __str__(self):
        s = 'Authentication Capabilities:\n'
        s += '  IPMI v1.5: %s\n' % self.ipmi_1_5
        s += '  IPMI v2.0: %s\n' % self.ipmi_2_0
        s += '  Auth. types: %s\n' % ' '.join(self.auth_types)
        s += '  Max Auth. type: %s\n' % self.get_max_auth_type()
        return s


class UserAccess(State):

    def _from_response(self, rsp):
        self.user_count = rsp.max_user.max_user
        self.enabled_user_count = rsp.enabled_user.count
        self.enabled_status = rsp.enabled_user.status
        self.fixed_name_user_count = rsp.fixed_names.count
        self.privilege_level = CONVERT_RAW_TO_USER_PRIVILEGE.get(rsp.channel_access.privilege, UserPrivilegeLevel.RESERVED)
        self.ipmi_messaging = rsp.channel_access.ipmi_msg == 1
        self.link_auth = rsp.channel_access.link_auth == 1
        self.callback_only = rsp.channel_access.callback == 1

    def __str__(self):
        s = 'User Access:\n'
        s += '  Max user number: %i\n' % self.user_count
        s += '  Enabled user: %i\n' % self.enabled_user_count
        s += '  Enabled status: %i\n' % self.enabled_status
        s += '  Fixed name user: %i\n' % self.fixed_name_user_count
        s += '  Privilege level: %s\n' % self.privilege_level
        s += '  IPMI messaging: %s\n' % self.ipmi_messaging
        s += '  Link Auth.: %s\n' % self.link_auth
        s += '  Callback only: %s' % self.callback_only