1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Application base class for displaying data."""
import abc
import argparse
import collections.abc
from itertools import compress
import typing as ty
import stevedore
from cliff import app
from cliff import _argparse
from cliff import command
from cliff.formatters import base as base_formatters
_T = ty.TypeVar("_T")
class DisplayCommandBase(
command.Command,
ty.Generic[base_formatters.FormatterT],
metaclass=abc.ABCMeta,
):
"""Command base class for displaying data about a single object."""
_formatter_plugins: stevedore.ExtensionManager[base_formatters.FormatterT]
formatter: base_formatters.FormatterT
def __init__(
self,
app: app.App,
app_args: argparse.Namespace | None,
cmd_name: str | None = None,
) -> None:
super().__init__(app, app_args, cmd_name=cmd_name)
self._formatter_plugins = self._load_formatter_plugins()
@property
@abc.abstractmethod
def formatter_namespace(self) -> str:
"""Namespace to use for loading formatter plugins."""
@property
@abc.abstractmethod
def formatter_default(self) -> str:
"""String specifying the name of the default formatter."""
def _load_formatter_plugins(
self,
) -> stevedore.ExtensionManager[base_formatters.FormatterT]:
# Here so tests can override
return stevedore.ExtensionManager(
self.formatter_namespace,
invoke_on_load=True,
)
def get_parser(self, prog_name: str) -> _argparse.ArgumentParser:
parser = super().get_parser(prog_name)
formatter_group = parser.add_argument_group(
title='output formatters',
description='output formatter options',
)
self._formatter_group = formatter_group
formatter_choices = sorted(self._formatter_plugins.names())
formatter_default = self.formatter_default
if formatter_default not in formatter_choices:
formatter_default = formatter_choices[0]
formatter_group.add_argument(
'-f',
'--format',
dest='formatter',
action='store',
choices=formatter_choices,
default=formatter_default,
help=f'the output format, defaults to {formatter_default}',
)
formatter_group.add_argument(
'-c',
'--column',
action='append',
default=[],
dest='columns',
metavar='COLUMN',
help=(
'specify the column(s) to include, can be '
'repeated to show multiple columns'
),
)
for formatter in self._formatter_plugins:
assert formatter.obj is not None # noqa
formatter.obj.add_argument_group(parser)
return parser
@abc.abstractmethod
def produce_output(
self,
parsed_args: argparse.Namespace,
column_names: collections.abc.Sequence[str],
data: ty.Any,
) -> int:
"""Use the formatter to generate the output.
:param parsed_args: argparse.Namespace instance with argument values
:param column_names: sequence of strings containing names
of output columns
:param data: iterable with values matching the column names
:returns: a status code
"""
def _generate_columns_and_selector(
self,
parsed_args: argparse.Namespace,
column_names: collections.abc.Sequence[str],
) -> tuple[list[str], list[bool] | None]:
"""Generate included columns and selector according to parsed args.
We normalize the column names so that someone can do e.g. '-c
server_name' when the output field is actually called 'Server Name'.
:param parsed_args: argparse.Namespace instance with argument values
:param column_names: sequence of strings containing names
of output columns
"""
if not parsed_args.columns:
return list(column_names), None
def normalize_column(column_name: str) -> str:
return column_name.lower().strip().replace(' ', '_')
requested_columns = [normalize_column(c) for c in parsed_args.columns]
columns_to_include = [
c for c in column_names if normalize_column(c) in requested_columns
]
if not columns_to_include:
raise ValueError(
f'No recognized column names in {str(parsed_args.columns)}. '
f'Recognized columns are {str(column_names)}.'
)
# Set up argument to compress()
selector = [(c in columns_to_include) for c in column_names]
return columns_to_include, selector
def run(self, parsed_args: argparse.Namespace) -> int:
parsed_args = self._run_before_hooks(parsed_args)
formatter = self._formatter_plugins[parsed_args.formatter].obj
assert formatter is not None # noqa
self.formatter = formatter
column_names, data = self.take_action(parsed_args)
column_names, data = self._run_after_hooks(
parsed_args, (column_names, data)
)
self.produce_output(parsed_args, column_names, data)
return 0
def _run_after_hooks( # type: ignore[override]
self,
parsed_args: argparse.Namespace,
data: tuple[
collections.abc.Sequence[str], collections.abc.Iterable[ty.Any]
],
) -> tuple[
collections.abc.Sequence[str], collections.abc.Iterable[ty.Any]
]:
"""Calls after() method of the hooks.
This method is intended to be called from the run() method after
take_action() is called.
This method should only be overridden by developers creating new
command base classes and only if it is necessary to have different
hook processing behavior.
"""
for hook in self._hooks:
# we need to ignore the types since CommandHook states that it
# should return an integer, but they'll actually return a tuple of
# column names and data when used with DisplayCommandBase
# subclasses
ret = hook.obj.after(parsed_args, data) # type: ignore
# If the return is None do not change return_code, otherwise
# set up to pass it to the next hook
if ret is not None:
data = ret # type: ignore
return data
@staticmethod
def _compress_iterable(
iterable: collections.abc.Iterable[_T],
selectors: collections.abc.Iterable[ty.Any],
) -> collections.abc.Iterator[_T]:
return compress(iterable, selectors)
|