File: registries.py

package info (click to toggle)
python-persisting-theory 1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 196 kB
  • sloc: python: 423; makefile: 3
file content (275 lines) | stat: -rw-r--r-- 8,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
from collections import OrderedDict
import inspect
import operator

import importlib


class DoesNotExist(ValueError):
    pass


class MultipleObjectsReturned(ValueError):
    pass


class QuerySet(object):
    def __init__(self, values):
        self.values = list(values)

    def __iter__(self):
        for value in self.values:
            yield value

    def __len__(self):
        return len(self.values)

    def __repr__(self):
        return "<{0}: {1}>".format(self.__class__.__name__, str(list(self.values)))

    def __getitem__(self, i):
        return self.values[i]

    def __eq__(self, other):
        return self.values == list(other)

    def _clone(self, new_values):
        return self.__class__(new_values)

    def _build_filter(self, **kwargs):
        """build a single filter function used to match arbitrary object"""

        def object_filter(obj):
            for key, value in kwargs.items():
                # we replace dango-like lookup by dots, so attrgetter can do his job
                key = key.replace("__", ".")

                getter = operator.attrgetter(key)
                if not getter(obj) == value:
                    return False
            return True

        return object_filter

    def exists(self):
        return len(self) > 0

    def order_by(self, key):
        reverse = False
        if key.startswith("-"):
            reverse = True
            key = key[1:]

        return self._clone(
            sorted(self.values, key=operator.attrgetter(key), reverse=reverse)
        )

    def all(self):
        return self._clone(self.values)

    def count(self):
        return len(self)

    def first(self):
        try:
            return self.values[0]
        except IndexError:
            return None

    def last(self):
        try:
            return self.values[-1]
        except IndexError:
            return None

    def filter(self, **kwargs):
        _filter = self._build_filter(**kwargs)
        return self._clone(filter(_filter, self.values))

    def exclude(self, **kwargs):
        _filter = self._build_filter(**kwargs)
        return self._clone(filter(lambda v: not _filter(v), self.values))

    def get(self, **kwargs):

        matches = self.filter(**kwargs)
        if len(matches) == 0:
            raise DoesNotExist()
        if len(matches) > 1:
            raise MultipleObjectsReturned()
        return matches[0]


class Manager(object):
    """Used to retrieve / order / filter preferences pretty much as django's ORM managers"""

    def __init__(self, registry, queryset_class):
        self.registry = registry
        self.queryset_class = queryset_class

    def get_queryset(self):
        return self.queryset_class(self.registry.values())

    def all(self):
        return self.get_queryset().all()

    def __getattr__(self, attr):
        try:
            return super().__getattr__(attr)
        except AttributeError:
            # Try to proxy on queryset if possible
            return getattr(self.get_queryset(), attr)


class Registry(OrderedDict):
    manager_class = Manager
    queryset_class = QuerySet

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.objects = self.manager_class(self, self.queryset_class)

    def register_decorator_factory(self, **kwargs):
        """
        Return an actual decorator for registering objects into registry
        """
        name = kwargs.get("name")

        def decorator(decorated):
            self.register_func(data=decorated, name=name)
            return decorated

        return decorator

    def register(self, data=None, name=None, **kwargs):
        """
        Use this method as a decorator on class/function you want to register:

        @registry.register(name="test")
        class Test:
            pass

        :param:data: Something to register in the registry
        :param:name: The unique name that will identify registered data.
        If None, by default, registry will try to deduce name from class name (if object is a class or an object).
        You can change this behaviour by overriding :py::method:`prepare_name`

        """
        if data is None:
            return self.register_decorator_factory(data=data, name=name, **kwargs)
        else:
            self.register_func(data=data, name=name, **kwargs)
            return data

    def get_object_name(self, data):
        """
        Return a name from an element (object, class, function...)
        """
        if callable(data):
            return data.__name__

        elif inspect.isclass(data):
            return data.__class__.__name__

        else:
            raise ValueError(
                "Cannot deduce name from given object ({0}). Please user registry.register() with a 'name' argument.".format(
                    data
                )
            )

    def validate(self, data):
        """
        Called before registering a new value into the registry
        Override this method if you want to restrict what type of data cna be registered
        """
        return True

    def prepare_name(self, data, name=None):
        if name is None:
            return self.get_object_name(data)
        return name

    def register_func(self, data, name=None, **kwargs):
        """
        Register abritrary data into the registry
        """
        if self.validate(data):
            o = self.prepare_data(data)
            n = self.prepare_name(data, name)
            self[n] = o
            self.post_register(data=0, name=n)
        else:
            raise ValueError(
                "{0} (type: {0.__class__}) is not a valid value for {1} registry".format(
                    data, self.__class__
                )
            )

    def post_register(self, data, name):
        """
        Will be triggered each time a new element is successfully registered.
        Feel free to override this method
        """
        pass

    def prepare_data(self, data):
        """
        Override this methode if you want to manipulate data before registering it
        You MUST return a value to register
        """
        return data

    def autodiscover(self, apps, force_reload=False):
        """
        Iterate throught every installed apps, trying to import `look_into` package
        :param apps: an iterable of string, refering to python modules the registry will try to import via autodiscover
        """
        for app in apps:
            app_package = __import__(app)
            try:

                package = "{0}.{1}".format(
                    app, self.look_into
                )  # try to import self.package inside current app
                # print(package)
                module = __import__(package)
                if force_reload:
                    importlib.reload(module)
            except ImportError as exc:
                # From django's syncdb
                # This is slightly hackish. We want to ignore ImportErrors
                # if the module itself is missing -- but we don't
                # want to ignore the exception if the module exists
                # but raises an ImportError for some reason. The only way we
                # can do this is to check the text of the exception. Note that
                # we're a bit broad in how we check the text, because different
                # Python implementations may not use the same text.
                # CPython uses the text "No module named"
                # PyPy uses "No module named myproject.myapp"
                msg = exc.args[0]
                if not msg.startswith("No module named") or self.look_into not in msg:
                    raise


class MetaRegistry(Registry):
    """
    Keep a reference to all registries
    """

    look_into = "registries"

    def autodiscover(self, apps, cascade=True, **kwargs):
        """
        :param cascade: If true, will trigger autodiscover on discovered registries
        """
        super().autodiscover(apps, **kwargs)
        if cascade:
            self.autodiscover_registries(apps)

    def autodiscover_registries(self, apps):
        for key, registry in self.items():
            registry.autodiscover(apps)


meta_registry = MetaRegistry()