1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import argparse
from importlib.metadata import packages_distributions
import inspect
import types
import typing as ty
from stevedore import extension
from cliff import _argparse
if ty.TYPE_CHECKING:
from . import app as _app
_T = ty.TypeVar('_T')
_dists_by_mods = None
def _get_distributions_by_modules() -> dict[str, str]:
"""Return dict mapping module name to distribution names.
The python package name (the name used for importing) and the
distribution name (the name used with pip and PyPI) do not
always match. We want to report which distribution caused the
command to be installed, so we need to look up the values.
"""
global _dists_by_mods
if _dists_by_mods is None:
# There can be multiple distribution in the case of namespace packages
# so we'll just grab the first one
_dists_by_mods = {k: v[0] for k, v in packages_distributions().items()}
return _dists_by_mods
def _get_distribution_for_module(
module: types.ModuleType | None,
) -> str | None:
"Return the distribution containing the module."
dist_name = None
if module:
pkg_name = module.__name__.partition('.')[0]
dist_name = _get_distributions_by_modules().get(pkg_name)
return dist_name
class Command(metaclass=abc.ABCMeta):
"""Base class for command plugins.
When the command is instantiated, it loads extensions from a
namespace based on the parent application namespace and the
command name::
app.namespace + '.' + cmd_name.replace(' ', '_')
:param app: Application instance invoking the command.
:param app_args: Parsed arguments from options associated with the
application instance..
:param cmd_name: The name of the command.
"""
app_dist_name: str | None
deprecated = False
conflict_handler = 'ignore'
_description = ''
_epilog = None
def __init__(
self,
app: '_app.App',
app_args: argparse.Namespace | None,
cmd_name: str | None = None,
) -> None:
self.app = app
self.app_args = app_args
self.cmd_name = cmd_name
self._load_hooks()
def _load_hooks(self) -> None:
# Look for command extensions
if self.cmd_name:
namespace = '{}.{}'.format(
self.app.command_manager.namespace,
self.cmd_name.replace(' ', '_'),
)
self._hooks = extension.ExtensionManager(
namespace=namespace,
invoke_on_load=True,
invoke_kwds={
'command': self,
},
)
else:
# Setting _hooks to an empty list allows iteration without
# checking if there are hooks every time.
self._hooks = []
return
def get_description(self) -> str:
"""Return the command description.
The default is to use the first line of the class' docstring
as the description. Set the ``_description`` class attribute
to a one-line description of a command to use a different
value. This is useful for enabling translations, for example,
with ``_description`` set to a string wrapped with a gettext
translation marker.
"""
# NOTE(dhellmann): We need the trailing "or ''" because under
# Python 2.7 the default for the docstring is None instead of
# an empty string, and we always want this method to return a
# string.
desc = self._description or inspect.getdoc(self.__class__) or ''
# The base class command description isn't useful for any
# real commands, so ignore that value.
if desc == inspect.getdoc(Command):
desc = ''
return desc
def get_epilog(self) -> str:
"""Return the command epilog."""
# replace a None in self._epilog with an empty string
parts = [self._epilog or '']
hook_epilogs = [
e for h in self._hooks if (e := h.obj.get_epilog()) is not None
]
parts.extend(hook_epilogs)
app_dist_name = getattr(
self,
'app_dist_name',
_get_distribution_for_module(inspect.getmodule(self.app)),
)
dist_name = _get_distribution_for_module(inspect.getmodule(self))
if dist_name and dist_name != app_dist_name:
parts.append(
f'This command is provided by the {dist_name} plugin.'
)
return '\n\n'.join(parts)
def get_parser(self, prog_name: str) -> _argparse.ArgumentParser:
"""Return an :class:`argparse.ArgumentParser`."""
parser = _argparse.ArgumentParser(
description=self.get_description(),
epilog=self.get_epilog(),
prog=prog_name,
formatter_class=_argparse.SmartHelpFormatter,
conflict_handler=self.conflict_handler,
)
for hook in self._hooks:
hook.obj.get_parser(parser)
return parser
@abc.abstractmethod
def take_action(self, parsed_args: argparse.Namespace) -> ty.Any:
"""Override to do something useful.
The returned value will be returned by the program.
"""
def run(self, parsed_args: argparse.Namespace) -> int:
"""Invoked by the application when the command is run.
Developers implementing commands should override
:meth:`take_action`.
Developers creating new command base classes (such as
:class:`Lister` and :class:`ShowOne`) should override this
method to wrap :meth:`take_action`.
Return the value returned by :meth:`take_action` or 0.
"""
parsed_args = self._run_before_hooks(parsed_args)
return_code = self.take_action(parsed_args) or 0
return_code = self._run_after_hooks(parsed_args, return_code)
return return_code
def _run_before_hooks(
self, parsed_args: argparse.Namespace
) -> argparse.Namespace:
"""Calls before() method of the hooks.
This method is intended to be called from the run() method before
take_action() is called.
This method should only be overridden by developers creating new
command base classes and only if it is necessary to have different
hook processing behavior.
"""
for hook in self._hooks:
ret = hook.obj.before(parsed_args)
# If the return is None do not change parsed_args, otherwise
# set up to pass it to the next hook
if ret is not None:
parsed_args = ret
return parsed_args
def _run_after_hooks(
self, parsed_args: argparse.Namespace, return_code: _T
) -> _T:
"""Calls after() method of the hooks.
This method is intended to be called from the run() method after
take_action() is called.
This method should only be overridden by developers creating new
command base classes and only if it is necessary to have different
hook processing behavior.
"""
for hook in self._hooks:
ret = hook.obj.after(parsed_args, return_code)
# If the return is None do not change return_code, otherwise
# set up to pass it to the next hook
if ret is not None:
return_code = ret
return return_code
|