1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
|
from collections import namedtuple
import json
import logging
import pprint
import re
(STATUS_OK, STATUS_ERROR) = range(2)
CommandsResponse = namedtuple('CommandsResponse', ['status', 'value'])
LOG = logging.getLogger('bgpspeaker.operator.command')
def default_help_formatter(quick_helps):
"""Apply default formatting for help messages
:param quick_helps: list of tuples containing help info
"""
ret = ''
for line in quick_helps:
cmd_path, param_hlp, cmd_hlp = line
ret += ' '.join(cmd_path) + ' '
if param_hlp:
ret += param_hlp + ' '
ret += '- ' + cmd_hlp + '\n'
return ret
class Command(object):
"""Command class is used as a node in tree of commands.
Each command can do some action or have some sub-commands, just like in IOS
Command with it's sub-commands form tree.
Each command can have one or more parameters. Parameters have to be
distinguishable from sub-commands.
One can inject dependency into command Cmd(api=my_object).
This dependency will be injected to every sub-command. And can be used
to interact with model/data etc.
Example of path in command tree `show count all`.
"""
help_msg = ''
param_help_msg = ''
command = ''
cli_resp_line_template = '{0}: {1}\n'
def __init__(self, api=None, parent=None,
help_formatter=default_help_formatter,
resp_formatter_name='cli'):
""":param api: object which is saved as self.api
and re-injected to every sub-command. You can use it to
manipulate your model from inside Commands'
:param parent: parent command instance.
:param help_formatter: function used to format
output of '?'command. Is re-injected to every
sub-command as well.
:param resp_formatter_name: used to select function to format
output of _action. cli_resp_formatter and json_resp_formatter
are defined by default, but you can define your own formatters.
If you use custom formatter(not cli nor json) remember to
implement it for every sub-command.
"""
self.resp_formatter_name = resp_formatter_name
if hasattr(self, resp_formatter_name + '_resp_formatter'):
self.resp_formatter = \
getattr(self, resp_formatter_name + '_resp_formatter')
else:
self.resp_formatter = self.cli_resp_formatter
self.api = api
self.parent_cmd = parent
self.help_formatter = help_formatter
if not hasattr(self, 'subcommands'):
self.subcommands = {}
def __call__(self, params):
"""You run command by calling it.
:param params: As params you give list of subcommand names
and params to final subcommand. Kind of like in
cisco ios cli, ie. show int eth1 / 1, where show is command,
int subcommand and eth1 / 1 is param for subcommand.
:return: returns tuple of CommandsResponse and class of
sub - command on which _action was called. (last sub - command)
CommandsResponse.status is action status,
and CommandsResponse.value is formatted response.
"""
if len(params) == 0:
return self._action_wrapper([])
first_param = params[0]
if first_param == '?':
return self.question_mark()
if first_param in self.subcommands:
return self._instantiate_subcommand(first_param)(params[1:])
return self._action_wrapper(params)
@classmethod
def cli_resp_formatter(cls, resp):
"""Override this method to provide custom formatting of cli response.
"""
if not resp.value:
return ''
if resp.status == STATUS_OK:
if type(resp.value) in (str, bool, int, float, str):
return str(resp.value)
ret = ''
val = resp.value
if not isinstance(val, list):
val = [val]
for line in val:
for k, v in line.items():
if isinstance(v, dict):
ret += cls.cli_resp_line_template.format(
k, '\n' + pprint.pformat(v)
)
else:
ret += cls.cli_resp_line_template.format(k, v)
return ret
else:
return "Error: {0}".format(resp.value)
@classmethod
def json_resp_formatter(cls, resp):
"""Override this method to provide custom formatting of json response.
"""
return json.dumps(resp.value)
@classmethod
def dict_resp_formatter(cls, resp):
return resp.value
def _action_wrapper(self, params):
filter_params = []
if '|' in params:
ind = params.index('|')
new_params = params[:ind]
filter_params = params[ind:]
params = new_params
action_resp = self.action(params)
if len(filter_params) > 1:
# we don't pass '|' around so filter_params[1:]
action_resp = self.filter_resp(action_resp, filter_params[1:])
action_resp = CommandsResponse(
action_resp.status,
self.resp_formatter(action_resp)
)
return action_resp, self.__class__
def action(self, params):
"""Override this method to define what command should do.
:param params: list of text parameters applied to this command.
:return: returns CommandsResponse instance.
CommandsResponse.status can be STATUS_OK or STATUS_ERROR
CommandsResponse.value should be dict or str
"""
return CommandsResponse(STATUS_ERROR, 'Not implemented')
def filter_resp(self, action_resp, filter_params):
"""Filter response of action. Used to make printed results more
specific
:param action_resp: named tuple (CommandsResponse)
containing response from action.
:param filter_params: params used after '|' specific for given filter
:return: filtered response.
"""
if action_resp.status == STATUS_OK:
try:
return CommandsResponse(
STATUS_OK,
TextFilter.filter(action_resp.value, filter_params)
)
except FilterError as e:
return CommandsResponse(STATUS_ERROR, str(e))
else:
return action_resp
def question_mark(self):
"""Shows help for this command and it's sub-commands.
"""
ret = []
if self.param_help_msg or len(self.subcommands) == 0:
ret.append(self._quick_help())
if len(self.subcommands) > 0:
for k, _ in sorted(self.subcommands.items()):
command_path, param_help, cmd_help = \
self._instantiate_subcommand(k)._quick_help(nested=True)
if command_path or param_help or cmd_help:
ret.append((command_path, param_help, cmd_help))
return (
CommandsResponse(STATUS_OK, self.help_formatter(ret)),
self.__class__
)
def _quick_help(self, nested=False):
""":param nested: True if help is requested directly for this command
and False when help is requested for a list of possible
completions.
"""
if nested:
return self.command_path(), None, self.help_msg
else:
return self.command_path(), self.param_help_msg, self.help_msg
def command_path(self):
if self.parent_cmd:
return self.parent_cmd.command_path() + [self.command]
else:
return [self.command]
def _instantiate_subcommand(self, key):
return self.subcommands[key](
api=self.api,
parent=self,
help_formatter=self.help_formatter,
resp_formatter_name=self.resp_formatter_name
)
class TextFilter(object):
@classmethod
def filter(cls, action_resp_value, filter_params):
try:
action, expected_value = filter_params
except ValueError:
raise FilterError('Wrong number of filter parameters')
if action == 'regexp':
if isinstance(action_resp_value, list):
resp = list(action_resp_value)
iterator = enumerate(action_resp_value)
else:
resp = dict(action_resp_value)
iterator = iter(action_resp_value.items())
remove = []
for key, value in iterator:
if not re.search(expected_value, str(value)):
remove.append(key)
if isinstance(resp, list):
resp = [resp[key] for key, value in enumerate(resp)
if key not in remove]
else:
resp = dict([(key, value)
for key, value in resp.items()
if key not in remove])
return resp
else:
raise FilterError('Unknown filter')
class FilterError(Exception):
pass
|