1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
# -*- coding: utf-8 -*-
# slip.util.hookable -- run hooks on changes in objects
#
# Copyright © 2008 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Nils Philippsen <nils@redhat.com>
"""This module contains variants of certain base types which call registered
hooks on changes."""
import collections
from six import with_metaclass
__all__ = ["Hookable", "HookableSet"]
class HookableType(type):
def __new__(cls, name, bases, dct):
if '_hookable_change_methods' in dct:
try:
base = dct["_hookable_base_class"]
except KeyError:
base = None
for base_candidate in (x for x in bases if x != Hookable):
if base:
raise TypeError(
"too many base classes: %s" % str(bases))
else:
base = base_candidate
for methodname in dct["_hookable_change_methods"]:
dct[methodname] = HookableType.wrap_method(base, methodname)
return type.__new__(cls, name, bases, dct)
@classmethod
def wrap_method(cls, base, methodname):
func = getattr(base, methodname)
def methodwrapper(self, *p, **k):
retval = func(self, *p, **k)
self._run_hooks()
return retval
methodwrapper.__name__ = methodname
return methodwrapper
class _HookEntry(object):
def __init__(self, hook, args, kwargs, hookable=None):
assert(isinstance(hook, collections.Callable))
assert(isinstance(hookable, Hookable))
for n, x in enumerate(args):
try:
hash(x)
except TypeError:
raise TypeError(
"Positional argument %d is not hashable: %r" %
(n, x))
for k, x in kwargs.items():
try:
hash(x)
except TypeError:
raise TypeError(
"Keyword argument %r is not hashable: %r" %
(k, x))
if not isinstance(args, tuple):
args = tuple(args)
self.__hook = hook
self.__args = args
self.__kwargs = kwargs
self.__hookable = hookable
self.__hash = None
def __cmp__(self, obj):
return (
self.__hook == obj.__hook and
self.__args == obj.__args and
self.__kwargs == obj.__kwargs)
def __hash__(self):
if not self.__hash:
self.__hash = self._compute_hash()
return self.__hash
def _compute_hash(self):
hashvalue = hash(self.__hook)
hashvalue = hash(hashvalue) ^ hash(self.__args)
hashvalue = hash(hashvalue) ^ hash(
tuple(sorted(self.__kwargs.items())))
return hashvalue
def run(self):
if self.__hookable:
self.__hook(self.__hookable, *self.__args, **self.__kwargs)
else:
self.__hook(*self.__args, **self.__kwargs)
class Hookable(with_metaclass(HookableType, object)):
"""An object which calls registered hooks on changes."""
@property
def __hooks__(self, *p, **k):
if not hasattr(self, "__real_hooks__"):
self.__real_hooks__ = set()
return self.__real_hooks__
def _get_hooks_enabled(self):
if not hasattr(self, "__hooks_enabled__"):
self.__hooks_enabled__ = True
return self.__hooks_enabled__
def _set_hooks_enabled(self, enabled):
self.__hooks_enabled__ = enabled
hooks_enabled = property(_get_hooks_enabled, _set_hooks_enabled)
def _get_hooks_frozen(self):
if not hasattr(self, "__hooks_frozen__"):
self.__hooks_frozen__ = False
return self.__hooks_frozen__
def _set_hooks_frozen(self, freeze):
if freeze == self.hooks_frozen:
return
self.__hooks_frozen__ = freeze
if freeze:
self.__hooks_frozen_entries__ = set()
else:
for hookentry in self.__hooks_frozen_entries__:
hookentry.run()
del self.__hooks_frozen_entries__
hooks_frozen = property(_get_hooks_frozen, _set_hooks_frozen)
def freeze_hooks(self):
self.hooks_frozen = True
def thaw_hooks(self):
self.hooks_frozen = False
def add_hook(self, hook, *args, **kwargs):
self.__add_hook(hook, None, *args, **kwargs)
def add_hook_hookable(self, hook, *args, **kwargs):
self.__add_hook(hook, self, *args, **kwargs)
def __add_hook(self, hook, _hookable, *args, **kwargs):
assert isinstance(hook, collections.Callable)
assert isinstance(_hookable, Hookable)
hookentry = _HookEntry(hook, args, kwargs, hookable=_hookable)
self.__hooks__.add(hookentry)
def remove_hook(self, hook, *args, **kwargs):
self.__hooks__.remove(_HookEntry(hook, args, kwargs))
def _run_hooks(self):
if self.hooks_enabled:
if not self.hooks_frozen:
for hookentry in self.__hooks__:
hookentry.run()
else:
self.__hooks_frozen_entries__.update(self.__hooks__)
class HookableSet(set, Hookable):
"""A set object which calls registered hooks on changes."""
_hookable_change_methods = (
"add", "clear", "difference_update", "discard", "intersection_update",
"pop", "remove", "symmetric_difference_update", "update")
def copy(self):
obj = set.copy(self)
obj.__real_hooks__ = set()
return obj
|