File: plugin_collection.py

package info (click to toggle)
stunnel4 3%3A5.76-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,396 kB
  • sloc: ansic: 19,572; sh: 4,958; python: 4,950; perl: 437; makefile: 227
file content (151 lines) | stat: -rw-r--r-- 5,122 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""A plugin structure"""

import dataclasses
import errno
import inspect
import os
import posix
import pkgutil


def err_msg(err: int) -> str:
    """Build the message to expect when an OS error occurs."""
    raw = str(OSError(err, posix.strerror(err)))
    return raw.replace("[", "\\[").replace("]", "\\]")


ERR_CONN_RESET = err_msg(errno.ECONNRESET)


@dataclasses.dataclass(frozen=True)
class LogEvent():
    """The base class for an event."""
    etype: str
    level: int
    log: str


class Plugin():
    """Base class that each plugin must inherit from. Within this class
    you must define the methods that all of your plugins must implement
    """
    # pylint: disable=too-few-public-methods

    def __init__(self):
        self.description = 'UNKNOWN'

    async def perform_operation(self, cfg, logger):
        """The method that we expect all plugins to implement. This is the
        method that our framework will call
        """
        raise NotImplementedError


class PluginCollection():
    """Upon creation, this class will read the plugins package for modules
    that contain a class definition that is inheriting from the Plugin class
    """

    def __init__(self, cfg, logger, plugin_package):
        """Constructor that initiates the reading of all available plugins
        when an instance of the PluginCollection object is created
        """
        self.plugin_package = plugin_package
        self.cfg = cfg
        self.logger = logger
        self.plugins = []
        self.seen_paths = []


    def __await__(self):
        return self.async_init().__await__()


    async def async_init(self):
        """Constructor that initiates the reading of all available plugins
        when an instance of the PluginCollection object is created
        """
        await self.reload_plugins()
        await self.apply_all_plugins()


    async def reload_plugins(self):
        """Reset the list of all plugins and initiate the walk over the main
        provided plugin package to load all available plugins
        """
        await self.cfg.mainq.put(
            LogEvent(
                etype="log",
                level=10,
                log=f'[plugins] Looking for plugins under package {self.plugin_package}'
            )
        )
        await self.walk_package(self.plugin_package)


    async def apply_all_plugins(self):
        """Apply all of the plugins
        """
        await self.cfg.mainq.put(
            LogEvent(
                etype="log",
                level=10,
                log='[plugins] ----- Applying all plugins -----'
            )
        )
        for plugin in self.plugins:
            await self.cfg.mainq.put(LogEvent(etype="log", level=20, log=""))
            await self.cfg.mainq.put(
                LogEvent(
                    etype="log",
                    level=20,
                    log=f'[plugins] >>>>> Applying \'{plugin.description}\' <<<<<'
                )
            )
            await plugin.perform_operation(self.cfg, self.logger)

    async def walk_package(self, package):
        """Recursively walk the supplied package to retrieve all plugins
        """
        imported_package = __import__(package, fromlist=['blah'])

        for _, pluginname, ispkg in pkgutil.iter_modules(
            imported_package.__path__, imported_package.__name__ + '.'
        ):
            if not ispkg:
                plugin_module = __import__(pluginname, fromlist=['blah'])
                clsmembers = inspect.getmembers(plugin_module, inspect.isclass)
                for (_, cls) in clsmembers:
                    # Only add classes that are a sub class of Plugin, but NOT Plugin itself
                    if issubclass(cls, Plugin) & (cls is not Plugin):
                        await self.cfg.mainq.put(
                            LogEvent(
                                etype="log",
                                level=10,
                                log=f'[plugins] Found plugin class: {cls.__module__}.{cls.__name__}'
                            )
                        )
                        self.plugins.append(cls())


        # Now that we have looked at all the modules in the current package, start looking
        # recursively for additional modules in sub packages
        all_current_paths = []
        if isinstance(imported_package.__path__, str):
            all_current_paths.append(imported_package.__path__)
        else:
            all_current_paths.extend(imported_package.__path__)

        for pkg_path in all_current_paths:
            if pkg_path not in self.seen_paths:
                self.seen_paths.append(pkg_path)

                # Get all sub directory of the current package path directory
                child_pkgs = [
                    p for p in os.listdir(pkg_path)
                    if os.path.isdir(os.path.join(pkg_path, p))
                ]

                # For each sub directory, apply the walk_package method recursively
                for child_pkg in child_pkgs:
                    await self.walk_package(package + '.' + child_pkg)