File: display.py

package info (click to toggle)
python-cliff 4.13.2-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 720 kB
  • sloc: python: 5,401; makefile: 32; sh: 21
file content (203 lines) | stat: -rw-r--r-- 7,323 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.

"""Application base class for displaying data."""

import abc
import argparse
import collections.abc
from itertools import compress
import typing as ty

import stevedore

from cliff import app
from cliff import _argparse
from cliff import command
from cliff.formatters import base as base_formatters

_T = ty.TypeVar("_T")


class DisplayCommandBase(
    command.Command,
    ty.Generic[base_formatters.FormatterT],
    metaclass=abc.ABCMeta,
):
    """Command base class for displaying data about a single object."""

    _formatter_plugins: stevedore.ExtensionManager[base_formatters.FormatterT]
    formatter: base_formatters.FormatterT

    def __init__(
        self,
        app: app.App,
        app_args: argparse.Namespace | None,
        cmd_name: str | None = None,
    ) -> None:
        super().__init__(app, app_args, cmd_name=cmd_name)
        self._formatter_plugins = self._load_formatter_plugins()

    @property
    @abc.abstractmethod
    def formatter_namespace(self) -> str:
        """Namespace to use for loading formatter plugins."""

    @property
    @abc.abstractmethod
    def formatter_default(self) -> str:
        """String specifying the name of the default formatter."""

    def _load_formatter_plugins(
        self,
    ) -> stevedore.ExtensionManager[base_formatters.FormatterT]:
        # Here so tests can override
        return stevedore.ExtensionManager(
            self.formatter_namespace,
            invoke_on_load=True,
        )

    def get_parser(self, prog_name: str) -> _argparse.ArgumentParser:
        parser = super().get_parser(prog_name)
        formatter_group = parser.add_argument_group(
            title='output formatters',
            description='output formatter options',
        )
        self._formatter_group = formatter_group
        formatter_choices = sorted(self._formatter_plugins.names())
        formatter_default = self.formatter_default
        if formatter_default not in formatter_choices:
            formatter_default = formatter_choices[0]
        formatter_group.add_argument(
            '-f',
            '--format',
            dest='formatter',
            action='store',
            choices=formatter_choices,
            default=formatter_default,
            help=f'the output format, defaults to {formatter_default}',
        )
        formatter_group.add_argument(
            '-c',
            '--column',
            action='append',
            default=[],
            dest='columns',
            metavar='COLUMN',
            help=(
                'specify the column(s) to include, can be '
                'repeated to show multiple columns'
            ),
        )
        for formatter in self._formatter_plugins:
            assert formatter.obj is not None  # noqa
            formatter.obj.add_argument_group(parser)
        return parser

    @abc.abstractmethod
    def produce_output(
        self,
        parsed_args: argparse.Namespace,
        column_names: collections.abc.Sequence[str],
        data: ty.Any,
    ) -> int:
        """Use the formatter to generate the output.

        :param parsed_args: argparse.Namespace instance with argument values
        :param column_names: sequence of strings containing names
                             of output columns
        :param data: iterable with values matching the column names
        :returns: a status code
        """

    def _generate_columns_and_selector(
        self,
        parsed_args: argparse.Namespace,
        column_names: collections.abc.Sequence[str],
    ) -> tuple[list[str], list[bool] | None]:
        """Generate included columns and selector according to parsed args.

        We normalize the column names so that someone can do e.g. '-c
        server_name' when the output field is actually called 'Server Name'.

        :param parsed_args: argparse.Namespace instance with argument values
        :param column_names: sequence of strings containing names
                             of output columns
        """
        if not parsed_args.columns:
            return list(column_names), None

        def normalize_column(column_name: str) -> str:
            return column_name.lower().strip().replace(' ', '_')

        requested_columns = [normalize_column(c) for c in parsed_args.columns]
        columns_to_include = [
            c for c in column_names if normalize_column(c) in requested_columns
        ]
        if not columns_to_include:
            raise ValueError(
                f'No recognized column names in {str(parsed_args.columns)}. '
                f'Recognized columns are {str(column_names)}.'
            )

        # Set up argument to compress()
        selector = [(c in columns_to_include) for c in column_names]
        return columns_to_include, selector

    def run(self, parsed_args: argparse.Namespace) -> int:
        parsed_args = self._run_before_hooks(parsed_args)
        formatter = self._formatter_plugins[parsed_args.formatter].obj
        assert formatter is not None  # noqa
        self.formatter = formatter
        column_names, data = self.take_action(parsed_args)
        column_names, data = self._run_after_hooks(
            parsed_args, (column_names, data)
        )
        self.produce_output(parsed_args, column_names, data)
        return 0

    def _run_after_hooks(  # type: ignore[override]
        self,
        parsed_args: argparse.Namespace,
        data: tuple[
            collections.abc.Sequence[str], collections.abc.Iterable[ty.Any]
        ],
    ) -> tuple[
        collections.abc.Sequence[str], collections.abc.Iterable[ty.Any]
    ]:
        """Calls after() method of the hooks.

        This method is intended to be called from the run() method after
        take_action() is called.

        This method should only be overridden by developers creating new
        command base classes and only if it is necessary to have different
        hook processing behavior.
        """
        for hook in self._hooks:
            # we need to ignore the types since CommandHook states that it
            # should return an integer, but they'll actually return a tuple of
            # column names and data when used with DisplayCommandBase
            # subclasses
            ret = hook.obj.after(parsed_args, data)  # type: ignore
            # If the return is None do not change return_code, otherwise
            # set up to pass it to the next hook
            if ret is not None:
                data = ret  # type: ignore
        return data

    @staticmethod
    def _compress_iterable(
        iterable: collections.abc.Iterable[_T],
        selectors: collections.abc.Iterable[ty.Any],
    ) -> collections.abc.Iterator[_T]:
        return compress(iterable, selectors)