1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
"""A plugin structure"""
import dataclasses
import errno
import inspect
import os
import posix
import pkgutil
def err_msg(err: int) -> str:
"""Build the message to expect when an OS error occurs."""
raw = str(OSError(err, posix.strerror(err)))
return raw.replace("[", "\\[").replace("]", "\\]")
ERR_CONN_RESET = err_msg(errno.ECONNRESET)
@dataclasses.dataclass(frozen=True)
class LogEvent():
"""The base class for an event."""
etype: str
level: int
log: str
class Plugin():
"""Base class that each plugin must inherit from. Within this class
you must define the methods that all of your plugins must implement
"""
# pylint: disable=too-few-public-methods
def __init__(self):
self.description = 'UNKNOWN'
async def perform_operation(self, cfg, logger):
"""The method that we expect all plugins to implement. This is the
method that our framework will call
"""
raise NotImplementedError
class PluginCollection():
"""Upon creation, this class will read the plugins package for modules
that contain a class definition that is inheriting from the Plugin class
"""
def __init__(self, cfg, logger, plugin_package):
"""Constructor that initiates the reading of all available plugins
when an instance of the PluginCollection object is created
"""
self.plugin_package = plugin_package
self.cfg = cfg
self.logger = logger
self.plugins = []
self.seen_paths = []
def __await__(self):
return self.async_init().__await__()
async def async_init(self):
"""Constructor that initiates the reading of all available plugins
when an instance of the PluginCollection object is created
"""
await self.reload_plugins()
await self.apply_all_plugins()
async def reload_plugins(self):
"""Reset the list of all plugins and initiate the walk over the main
provided plugin package to load all available plugins
"""
await self.cfg.mainq.put(
LogEvent(
etype="log",
level=10,
log=f'[plugins] Looking for plugins under package {self.plugin_package}'
)
)
await self.walk_package(self.plugin_package)
async def apply_all_plugins(self):
"""Apply all of the plugins
"""
await self.cfg.mainq.put(
LogEvent(
etype="log",
level=10,
log='[plugins] ----- Applying all plugins -----'
)
)
for plugin in self.plugins:
await self.cfg.mainq.put(LogEvent(etype="log", level=20, log=""))
await self.cfg.mainq.put(
LogEvent(
etype="log",
level=20,
log=f'[plugins] >>>>> Applying \'{plugin.description}\' <<<<<'
)
)
await plugin.perform_operation(self.cfg, self.logger)
async def walk_package(self, package):
"""Recursively walk the supplied package to retrieve all plugins
"""
imported_package = __import__(package, fromlist=['blah'])
for _, pluginname, ispkg in pkgutil.iter_modules(
imported_package.__path__, imported_package.__name__ + '.'
):
if not ispkg:
plugin_module = __import__(pluginname, fromlist=['blah'])
clsmembers = inspect.getmembers(plugin_module, inspect.isclass)
for (_, cls) in clsmembers:
# Only add classes that are a sub class of Plugin, but NOT Plugin itself
if issubclass(cls, Plugin) & (cls is not Plugin):
await self.cfg.mainq.put(
LogEvent(
etype="log",
level=10,
log=f'[plugins] Found plugin class: {cls.__module__}.{cls.__name__}'
)
)
self.plugins.append(cls())
# Now that we have looked at all the modules in the current package, start looking
# recursively for additional modules in sub packages
all_current_paths = []
if isinstance(imported_package.__path__, str):
all_current_paths.append(imported_package.__path__)
else:
all_current_paths.extend(imported_package.__path__)
for pkg_path in all_current_paths:
if pkg_path not in self.seen_paths:
self.seen_paths.append(pkg_path)
# Get all sub directory of the current package path directory
child_pkgs = [
p for p in os.listdir(pkg_path)
if os.path.isdir(os.path.join(pkg_path, p))
]
# For each sub directory, apply the walk_package method recursively
for child_pkg in child_pkgs:
await self.walk_package(package + '.' + child_pkg)
|