File: hookable.py

package info (click to toggle)
python-slip 0.6.5-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster
  • size: 252 kB
  • sloc: python: 951; makefile: 61
file content (206 lines) | stat: -rw-r--r-- 6,238 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# -*- coding: utf-8 -*-

# slip.util.hookable -- run hooks on changes in objects
#
# Copyright © 2008 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Nils Philippsen <nils@redhat.com>

"""This module contains variants of certain base types which call registered
hooks on changes."""

import collections
from six import with_metaclass

__all__ = ["Hookable", "HookableSet"]


class HookableType(type):

    def __new__(cls, name, bases, dct):

        if '_hookable_change_methods' in dct:
            try:
                base = dct["_hookable_base_class"]
            except KeyError:
                base = None
                for base_candidate in (x for x in bases if x != Hookable):
                    if base:
                        raise TypeError(
                            "too many base classes: %s" % str(bases))
                    else:
                        base = base_candidate

            for methodname in dct["_hookable_change_methods"]:
                dct[methodname] = HookableType.wrap_method(base, methodname)

        return type.__new__(cls, name, bases, dct)

    @classmethod
    def wrap_method(cls, base, methodname):
        func = getattr(base, methodname)

        def methodwrapper(self, *p, **k):
            retval = func(self, *p, **k)
            self._run_hooks()
            return retval

        methodwrapper.__name__ = methodname
        return methodwrapper


class _HookEntry(object):

    def __init__(self, hook, args, kwargs, hookable=None):

        assert(isinstance(hook, collections.Callable))
        assert(isinstance(hookable, Hookable))

        for n, x in enumerate(args):
            try:
                hash(x)
            except TypeError:
                raise TypeError(
                        "Positional argument %d is not hashable: %r" %
                        (n, x))

        for k, x in kwargs.items():
            try:
                hash(x)
            except TypeError:
                raise TypeError(
                        "Keyword argument %r is not hashable: %r" %
                        (k, x))

        if not isinstance(args, tuple):
            args = tuple(args)

        self.__hook = hook
        self.__args = args
        self.__kwargs = kwargs
        self.__hookable = hookable

        self.__hash = None

    def __cmp__(self, obj):
        return (
            self.__hook == obj.__hook and
            self.__args == obj.__args and
            self.__kwargs == obj.__kwargs)

    def __hash__(self):
        if not self.__hash:
            self.__hash = self._compute_hash()
        return self.__hash

    def _compute_hash(self):
        hashvalue = hash(self.__hook)
        hashvalue = hash(hashvalue) ^ hash(self.__args)
        hashvalue = hash(hashvalue) ^ hash(
                tuple(sorted(self.__kwargs.items())))
        return hashvalue

    def run(self):
        if self.__hookable:
            self.__hook(self.__hookable, *self.__args, **self.__kwargs)
        else:
            self.__hook(*self.__args, **self.__kwargs)


class Hookable(with_metaclass(HookableType, object)):

    """An object which calls registered hooks on changes."""

    @property
    def __hooks__(self, *p, **k):
        if not hasattr(self, "__real_hooks__"):
            self.__real_hooks__ = set()
        return self.__real_hooks__

    def _get_hooks_enabled(self):
        if not hasattr(self, "__hooks_enabled__"):
            self.__hooks_enabled__ = True
        return self.__hooks_enabled__

    def _set_hooks_enabled(self, enabled):
        self.__hooks_enabled__ = enabled

    hooks_enabled = property(_get_hooks_enabled, _set_hooks_enabled)

    def _get_hooks_frozen(self):
        if not hasattr(self, "__hooks_frozen__"):
            self.__hooks_frozen__ = False
        return self.__hooks_frozen__

    def _set_hooks_frozen(self, freeze):
        if freeze == self.hooks_frozen:
            return

        self.__hooks_frozen__ = freeze

        if freeze:
            self.__hooks_frozen_entries__ = set()
        else:
            for hookentry in self.__hooks_frozen_entries__:
                hookentry.run()
            del self.__hooks_frozen_entries__

    hooks_frozen = property(_get_hooks_frozen, _set_hooks_frozen)

    def freeze_hooks(self):
        self.hooks_frozen = True

    def thaw_hooks(self):
        self.hooks_frozen = False

    def add_hook(self, hook, *args, **kwargs):
        self.__add_hook(hook, None, *args, **kwargs)

    def add_hook_hookable(self, hook, *args, **kwargs):
        self.__add_hook(hook, self, *args, **kwargs)

    def __add_hook(self, hook, _hookable, *args, **kwargs):
        assert isinstance(hook, collections.Callable)
        assert isinstance(_hookable, Hookable)
        hookentry = _HookEntry(hook, args, kwargs, hookable=_hookable)
        self.__hooks__.add(hookentry)

    def remove_hook(self, hook, *args, **kwargs):

        self.__hooks__.remove(_HookEntry(hook, args, kwargs))

    def _run_hooks(self):
        if self.hooks_enabled:
            if not self.hooks_frozen:
                for hookentry in self.__hooks__:
                    hookentry.run()
            else:
                self.__hooks_frozen_entries__.update(self.__hooks__)


class HookableSet(set, Hookable):

    """A set object which calls registered hooks on changes."""

    _hookable_change_methods = (
        "add", "clear", "difference_update", "discard", "intersection_update",
        "pop", "remove", "symmetric_difference_update", "update")

    def copy(self):
        obj = set.copy(self)
        obj.__real_hooks__ = set()
        return obj