File: command.py

package info (click to toggle)
python-os-ken 3.1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,320 kB
  • sloc: python: 100,703; erlang: 14,517; ansic: 594; sh: 338; makefile: 136
file content (271 lines) | stat: -rw-r--r-- 9,381 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
from collections import namedtuple
import json
import logging
import pprint
import re


(STATUS_OK, STATUS_ERROR) = range(2)

CommandsResponse = namedtuple('CommandsResponse', ['status', 'value'])

LOG = logging.getLogger('bgpspeaker.operator.command')


def default_help_formatter(quick_helps):
    """Apply default formatting for help messages

        :param quick_helps: list of tuples containing help info
     """
    ret = ''
    for line in quick_helps:
        cmd_path, param_hlp, cmd_hlp = line
        ret += ' '.join(cmd_path) + ' '
        if param_hlp:
            ret += param_hlp + ' '
        ret += '- ' + cmd_hlp + '\n'
    return ret


class Command(object):
    """Command class is used as a node in tree of commands.

    Each command can do some action or have some sub-commands, just like in IOS
    Command with it's sub-commands form tree.
    Each command can have one or more parameters. Parameters have to be
    distinguishable from sub-commands.
        One can inject dependency into command Cmd(api=my_object).
    This dependency will be injected to every sub-command. And can be used
    to interact with model/data etc.
        Example of path in command tree `show count all`.
    """

    help_msg = ''
    param_help_msg = ''
    command = ''
    cli_resp_line_template = '{0}: {1}\n'

    def __init__(self, api=None, parent=None,
                 help_formatter=default_help_formatter,
                 resp_formatter_name='cli'):
        """:param api: object which is saved as self.api
                 and re-injected to every sub-command. You can use it to
                 manipulate your model from inside Commands'
           :param parent: parent command instance.
           :param help_formatter: function used to format
                output of '?'command. Is re-injected to every
                sub-command as well.
           :param resp_formatter_name: used to select function to format
                output of _action. cli_resp_formatter and json_resp_formatter
                are defined by default, but you can define your own formatters.
                If you use custom formatter(not cli nor json) remember to
                implement it for every sub-command.
        """

        self.resp_formatter_name = resp_formatter_name

        if hasattr(self, resp_formatter_name + '_resp_formatter'):
            self.resp_formatter = \
                getattr(self, resp_formatter_name + '_resp_formatter')
        else:
            self.resp_formatter = self.cli_resp_formatter

        self.api = api
        self.parent_cmd = parent
        self.help_formatter = help_formatter
        if not hasattr(self, 'subcommands'):
            self.subcommands = {}

    def __call__(self, params):
        """You run command by calling it.

        :param params: As params you give list of subcommand names
            and params to final subcommand. Kind of like in
            cisco ios cli, ie. show int eth1 / 1, where show is command,
            int subcommand and eth1 / 1 is param for subcommand.
        :return: returns tuple of CommandsResponse and class of
            sub - command on which _action was called. (last sub - command)
            CommandsResponse.status is action status,
            and CommandsResponse.value is formatted response.
        """
        if len(params) == 0:
            return self._action_wrapper([])

        first_param = params[0]

        if first_param == '?':
            return self.question_mark()

        if first_param in self.subcommands:
            return self._instantiate_subcommand(first_param)(params[1:])

        return self._action_wrapper(params)

    @classmethod
    def cli_resp_formatter(cls, resp):
        """Override this method to provide custom formatting of cli response.
        """
        if not resp.value:
            return ''

        if resp.status == STATUS_OK:

            if type(resp.value) in (str, bool, int, float, str):
                return str(resp.value)

            ret = ''
            val = resp.value
            if not isinstance(val, list):
                val = [val]
            for line in val:
                for k, v in line.items():
                    if isinstance(v, dict):
                        ret += cls.cli_resp_line_template.format(
                            k, '\n' + pprint.pformat(v)
                        )
                    else:
                        ret += cls.cli_resp_line_template.format(k, v)
            return ret
        else:
            return "Error: {0}".format(resp.value)

    @classmethod
    def json_resp_formatter(cls, resp):
        """Override this method to provide custom formatting of json response.
        """
        return json.dumps(resp.value)

    @classmethod
    def dict_resp_formatter(cls, resp):
        return resp.value

    def _action_wrapper(self, params):
        filter_params = []
        if '|' in params:
            ind = params.index('|')
            new_params = params[:ind]
            filter_params = params[ind:]
            params = new_params

        action_resp = self.action(params)
        if len(filter_params) > 1:
            # we don't pass '|' around so filter_params[1:]
            action_resp = self.filter_resp(action_resp, filter_params[1:])
        action_resp = CommandsResponse(
            action_resp.status,
            self.resp_formatter(action_resp)
        )
        return action_resp, self.__class__

    def action(self, params):
        """Override this method to define what command should do.

        :param params: list of text parameters applied to this command.
        :return: returns CommandsResponse instance.
                 CommandsResponse.status can be STATUS_OK or STATUS_ERROR
                 CommandsResponse.value should be dict or str
        """
        return CommandsResponse(STATUS_ERROR, 'Not implemented')

    def filter_resp(self, action_resp, filter_params):
        """Filter response of action. Used to make printed results more
        specific

        :param action_resp: named tuple (CommandsResponse)
            containing response from action.
        :param filter_params: params used after '|' specific for given filter
        :return: filtered response.
        """
        if action_resp.status == STATUS_OK:
            try:
                return CommandsResponse(
                    STATUS_OK,
                    TextFilter.filter(action_resp.value, filter_params)
                )
            except FilterError as e:
                return CommandsResponse(STATUS_ERROR, str(e))
        else:
            return action_resp

    def question_mark(self):
        """Shows help for this command and it's sub-commands.
        """
        ret = []
        if self.param_help_msg or len(self.subcommands) == 0:
            ret.append(self._quick_help())

        if len(self.subcommands) > 0:
            for k, _ in sorted(self.subcommands.items()):
                command_path, param_help, cmd_help = \
                    self._instantiate_subcommand(k)._quick_help(nested=True)
                if command_path or param_help or cmd_help:
                    ret.append((command_path, param_help, cmd_help))

        return (
            CommandsResponse(STATUS_OK, self.help_formatter(ret)),
            self.__class__
        )

    def _quick_help(self, nested=False):
        """:param nested: True if help is requested directly for this command
                    and False when help is requested for a list of possible
                    completions.
        """
        if nested:
            return self.command_path(), None, self.help_msg
        else:
            return self.command_path(), self.param_help_msg, self.help_msg

    def command_path(self):
        if self.parent_cmd:
            return self.parent_cmd.command_path() + [self.command]
        else:
            return [self.command]

    def _instantiate_subcommand(self, key):
        return self.subcommands[key](
            api=self.api,
            parent=self,
            help_formatter=self.help_formatter,
            resp_formatter_name=self.resp_formatter_name
        )


class TextFilter(object):

    @classmethod
    def filter(cls, action_resp_value, filter_params):
        try:
            action, expected_value = filter_params
        except ValueError:
            raise FilterError('Wrong number of filter parameters')
        if action == 'regexp':

            if isinstance(action_resp_value, list):
                resp = list(action_resp_value)
                iterator = enumerate(action_resp_value)
            else:
                resp = dict(action_resp_value)
                iterator = iter(action_resp_value.items())

            remove = []

            for key, value in iterator:
                if not re.search(expected_value, str(value)):
                    remove.append(key)

            if isinstance(resp, list):
                resp = [resp[key] for key, value in enumerate(resp)
                        if key not in remove]
            else:
                resp = dict([(key, value)
                             for key, value in resp.items()
                             if key not in remove])

            return resp
        else:
            raise FilterError('Unknown filter')


class FilterError(Exception):
    pass