1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Abstract rule class definition and rule engine implementation
"""
from abc import ABCMeta, abstractmethod
import inspect
from itertools import groupby
from logging import getLogger
from .utils import is_iterable
from toposort import toposort
from . import debug
log = getLogger(__name__).log
class Consequence(metaclass=ABCMeta):
"""
Definition of a consequence to apply.
"""
@abstractmethod
def then(self, matches, when_response, context): # pragma: no cover
"""
Action implementation.
:param matches:
:type matches: rebulk.match.Matches
:param context:
:type context:
:param when_response: return object from when call.
:type when_response: object
:return: True if the action was runned, False if it wasn't.
:rtype: bool
"""
class Condition(metaclass=ABCMeta):
"""
Definition of a condition to check.
"""
@abstractmethod
def when(self, matches, context): # pragma: no cover
"""
Condition implementation.
:param matches:
:type matches: rebulk.match.Matches
:param context:
:type context:
:return: truthy if rule should be triggered and execute then action, falsy if it should not.
:rtype: object
"""
class CustomRule(Condition, Consequence, metaclass=ABCMeta):
"""
Definition of a rule to apply
"""
# pylint: disable=unused-argument, abstract-method
priority = 0
name = None
dependency = None
properties = {}
def __init__(self, log_level=None):
self.defined_at = debug.defined_at()
if log_level is None and not hasattr(self, 'log_level'):
self.log_level = debug.LOG_LEVEL
def enabled(self, context):
"""
Disable rule.
:param context:
:type context:
:return: True if rule is enabled, False if disabled
:rtype: bool
"""
return True
def __lt__(self, other):
return self.priority > other.priority
def __repr__(self):
defined = ""
if self.defined_at:
defined = f"@{self.defined_at}"
return f"<{self.name if self.name else self.__class__.__name__}{defined}>"
def __eq__(self, other):
return self.__class__ == other.__class__
def __hash__(self):
return hash(self.__class__)
class Rule(CustomRule):
"""
Definition of a rule to apply
"""
# pylint:disable=abstract-method
consequence = None
def then(self, matches, when_response, context):
assert self.consequence
if is_iterable(self.consequence):
if not is_iterable(when_response):
when_response = [when_response]
iterator = iter(when_response)
for cons in self.consequence: #pylint: disable=not-an-iterable
if inspect.isclass(cons):
cons = cons()
cons.then(matches, next(iterator), context)
else:
cons = self.consequence
if inspect.isclass(cons):
cons = cons() # pylint:disable=not-callable
cons.then(matches, when_response, context)
class RemoveMatch(Consequence): # pylint: disable=abstract-method
"""
Remove matches returned by then
"""
def then(self, matches, when_response, context):
if is_iterable(when_response):
ret = []
when_response = list(when_response)
for match in when_response:
if match in matches:
matches.remove(match)
ret.append(match)
return ret
if when_response in matches:
matches.remove(when_response)
return when_response
class AppendMatch(Consequence): # pylint: disable=abstract-method
"""
Append matches returned by then
"""
def __init__(self, match_name=None):
self.match_name = match_name
def then(self, matches, when_response, context):
if is_iterable(when_response):
ret = []
when_response = list(when_response)
for match in when_response:
if match not in matches:
if self.match_name:
match.name = self.match_name
matches.append(match)
ret.append(match)
return ret
if self.match_name:
when_response.name = self.match_name
if when_response not in matches:
matches.append(when_response)
return when_response
class RenameMatch(Consequence): # pylint: disable=abstract-method
"""
Rename matches returned by then
"""
def __init__(self, match_name):
self.match_name = match_name
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
match.name = self.match_name
elif removed:
removed.name = self.match_name
if removed:
self.append.then(matches, removed, context)
class AppendTags(Consequence): # pylint: disable=abstract-method
"""
Add tags to returned matches
"""
def __init__(self, tags):
self.tags = tags
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
match.tags.extend(self.tags)
elif removed:
removed.tags.extend(self.tags) # pylint: disable=no-member
if removed:
self.append.then(matches, removed, context)
class RemoveTags(Consequence): # pylint: disable=abstract-method
"""
Remove tags from returned matches
"""
def __init__(self, tags):
self.tags = tags
self.remove = RemoveMatch()
self.append = AppendMatch()
def then(self, matches, when_response, context):
removed = self.remove.then(matches, when_response, context)
if is_iterable(removed):
removed = list(removed)
for match in removed:
for tag in self.tags:
if tag in match.tags:
match.tags.remove(tag)
elif removed:
for tag in self.tags:
if tag in removed.tags: # pylint: disable=no-member
removed.tags.remove(tag) # pylint: disable=no-member
if removed:
self.append.then(matches, removed, context)
class Rules(list):
"""
list of rules ready to execute.
"""
def __init__(self, *rules):
super().__init__()
self.load(*rules)
def load(self, *rules):
"""
Load rules from a Rule module, class or instance
:param rules:
:type rules:
:return:
:rtype:
"""
for rule in rules:
if inspect.ismodule(rule):
self.load_module(rule)
elif inspect.isclass(rule):
self.load_class(rule)
else:
self.append(rule)
def load_module(self, module):
"""
Load a rules module
:param module:
:type module:
:return:
:rtype:
"""
# pylint: disable=unused-variable
for name, obj in inspect.getmembers(module,
lambda member: hasattr(member, '__module__')
and member.__module__ == module.__name__
and inspect.isclass):
self.load_class(obj)
def load_class(self, class_):
"""
Load a Rule class.
:param class_:
:type class_:
:return:
:rtype:
"""
self.append(class_())
def execute_all_rules(self, matches, context):
"""
Execute all rules from this rules list. All when condition with same priority will be performed before
calling then actions.
:param matches:
:type matches:
:param context:
:type context:
:return:
:rtype:
"""
ret = []
for priority, priority_rules in groupby(sorted(self), lambda rule: rule.priority):
sorted_rules = toposort_rules(list(priority_rules)) # Group by dependency graph toposort
for rules_group in sorted_rules:
rules_group = list(sorted(rules_group, key=self.index)) # Sort rules group based on initial ordering.
group_log_level = None
for rule in rules_group:
if group_log_level is None or group_log_level < rule.log_level:
group_log_level = rule.log_level
log(group_log_level, "%s independent rule(s) at priority %s.", len(rules_group), priority)
for rule in rules_group:
when_response = execute_rule(rule, matches, context)
if when_response is not None:
ret.append((rule, when_response))
return ret
def execute_rule(rule, matches, context):
"""
Execute the given rule.
:param rule:
:type rule:
:param matches:
:type matches:
:param context:
:type context:
:return:
:rtype:
"""
if rule.enabled(context):
log(rule.log_level, "Checking rule condition: %s", rule)
when_response = rule.when(matches, context)
if when_response:
log(rule.log_level, "Rule was triggered: %s", when_response)
log(rule.log_level, "Running rule consequence: %s %s", rule, when_response)
rule.then(matches, when_response, context)
return when_response
else:
log(rule.log_level, "Rule is disabled: %s", rule)
def toposort_rules(rules):
"""
Sort given rules using toposort with dependency parameter.
:param rules:
:type rules:
:return:
:rtype:
"""
graph = {}
class_dict = {}
for rule in rules:
if rule.__class__ in class_dict:
raise ValueError(f"Duplicate class rules are not allowed: {rule.__class__}")
class_dict[rule.__class__] = rule
for rule in rules:
if not is_iterable(rule.dependency) and rule.dependency:
rule_dependencies = [rule.dependency]
else:
rule_dependencies = rule.dependency
dependencies = set()
if rule_dependencies:
for dependency in rule_dependencies:
if inspect.isclass(dependency):
dependency = class_dict.get(dependency)
if dependency:
dependencies.add(dependency)
graph[rule] = dependencies
return toposort(graph)
|