File: listeners.py

package info (click to toggle)
python-sqlalchemy-utils 0.41.2-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 1,252 kB
  • sloc: python: 13,566; makefile: 141
file content (277 lines) | stat: -rw-r--r-- 7,805 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import sqlalchemy as sa

from .exceptions import ImproperlyConfigured


def coercion_listener(mapper, class_):
    """
    Auto assigns coercing listener for all class properties which are of coerce
    capable type.
    """
    for prop in mapper.iterate_properties:
        try:
            listener = prop.columns[0].type.coercion_listener
        except AttributeError:
            continue
        sa.event.listen(
            getattr(class_, prop.key),
            'set',
            listener,
            retval=True
        )


def instant_defaults_listener(target, args, kwargs):
    # insertion order of kwargs matters
    # copy and clear so that we can add back later at the end of the dict
    original = kwargs.copy()
    kwargs.clear()

    for key, column in sa.inspect(target.__class__).columns.items():
        if (
            hasattr(column, 'default') and
            column.default is not None
        ):
            if callable(column.default.arg):
                kwargs[key] = column.default.arg(target)
            else:
                kwargs[key] = column.default.arg

    # supersede w/initial in case target uses setters overriding defaults
    kwargs.update(original)


def force_auto_coercion(mapper=None):
    """
    Function that assigns automatic data type coercion for all classes which
    are of type of given mapper. The coercion is applied to all coercion
    capable properties. By default coercion is applied to all SQLAlchemy
    mappers.

    Before initializing your models you need to call force_auto_coercion.

    ::

        from sqlalchemy_utils import force_auto_coercion


        force_auto_coercion()


    Then define your models the usual way::


        class Document(Base):
            __tablename__ = 'document'
            id = sa.Column(sa.Integer, autoincrement=True)
            name = sa.Column(sa.Unicode(50))
            background_color = sa.Column(ColorType)


    Now scalar values for coercion capable data types will convert to
    appropriate value objects::

        document = Document()
        document.background_color = 'F5F5F5'
        document.background_color  # Color object
        session.commit()

    A useful side effect of this is that additional validation of data will be
    done on the moment it is being assigned to model objects. For example
    without autocorrection set, an invalid
    :class:`sqlalchemy_utils.types.IPAddressType` (eg. ``10.0.0 255.255``)
    would get through without an exception being raised. The database wouldn't
    notice this (as most databases don't have a native type for an IP address,
    so they're usually just stored as a string), and the ``ipaddress``
    package uses a string field as well.

    :param mapper: The mapper which the automatic data type coercion should be
                   applied to
    """
    if mapper is None:
        mapper = sa.orm.Mapper
    sa.event.listen(mapper, 'mapper_configured', coercion_listener)


def force_instant_defaults(mapper=None):
    """
    Function that assigns object column defaults on object initialization
    time. By default calling this function applies instant defaults to all
    your models.

    Setting up instant defaults::


        from sqlalchemy_utils import force_instant_defaults


        force_instant_defaults()

    Example usage::


        class Document(Base):
            __tablename__ = 'document'
            id = sa.Column(sa.Integer, autoincrement=True)
            name = sa.Column(sa.Unicode(50))
            created_at = sa.Column(sa.DateTime, default=datetime.now)


        document = Document()
        document.created_at  # datetime object


    :param mapper: The mapper which the automatic instant defaults forcing
                   should be applied to
    """
    if mapper is None:
        mapper = sa.orm.Mapper
    sa.event.listen(mapper, 'init', instant_defaults_listener)


def auto_delete_orphans(attr):
    """
    Delete orphans for given SQLAlchemy model attribute. This function can be
    used for deleting many-to-many associated orphans easily. For more
    information see
    https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/ManyToManyOrphan.

    Consider the following model definition:

    ::

        from sqlalchemy.ext.associationproxy import association_proxy
        from sqlalchemy import *
        from sqlalchemy.orm import *
        # Necessary in sqlalchemy 1.3:
        # from sqlalchemy.ext.declarative import declarative_base
        from sqlalchemy import event


        Base = declarative_base()

        tagging = Table(
            'tagging',
            Base.metadata,
            Column(
                'tag_id',
                Integer,
                ForeignKey('tag.id', ondelete='CASCADE'),
                primary_key=True
            ),
            Column(
                'entry_id',
                Integer,
                ForeignKey('entry.id', ondelete='CASCADE'),
                primary_key=True
            )
        )

        class Tag(Base):
            __tablename__ = 'tag'
            id = Column(Integer, primary_key=True)
            name = Column(String(100), unique=True, nullable=False)

            def __init__(self, name=None):
                self.name = name

        class Entry(Base):
            __tablename__ = 'entry'

            id = Column(Integer, primary_key=True)

            tags = relationship(
                'Tag',
                secondary=tagging,
                backref='entries'
            )

    Now lets say we want to delete the tags if all their parents get deleted (
    all Entry objects get deleted). This can be achieved as follows:

    ::


        from sqlalchemy_utils import auto_delete_orphans


        auto_delete_orphans(Entry.tags)


    After we've set up this listener we can see it in action.

    ::


        e = create_engine('sqlite://')

        Base.metadata.create_all(e)

        s = Session(e)

        r1 = Entry()
        r2 = Entry()
        r3 = Entry()
        t1, t2, t3, t4 = Tag('t1'), Tag('t2'), Tag('t3'), Tag('t4')

        r1.tags.extend([t1, t2])
        r2.tags.extend([t2, t3])
        r3.tags.extend([t4])
        s.add_all([r1, r2, r3])

        assert s.query(Tag).count() == 4

        r2.tags.remove(t2)

        assert s.query(Tag).count() == 4

        r1.tags.remove(t2)

        assert s.query(Tag).count() == 3

        r1.tags.remove(t1)

        assert s.query(Tag).count() == 2

    .. versionadded: 0.26.4

    :param attr: Association relationship attribute to auto delete orphans from
    """

    parent_class = attr.parent.class_
    target_class = attr.property.mapper.class_

    backref = attr.property.backref
    if not backref:
        raise ImproperlyConfigured(
            'The relationship argument given for auto_delete_orphans needs to '
            'have a backref relationship set.'
        )
    if isinstance(backref, tuple):
        backref = backref[0]

    @sa.event.listens_for(sa.orm.Session, 'after_flush')
    def delete_orphan_listener(session, ctx):
        # Look through Session state to see if we want to emit a DELETE for
        # orphans
        orphans_found = (
            any(
                isinstance(obj, parent_class) and
                sa.orm.attributes.get_history(obj, attr.key).deleted
                for obj in session.dirty
            ) or
            any(
                isinstance(obj, parent_class)
                for obj in session.deleted
            )
        )

        if orphans_found:
            # Emit a DELETE for all orphans
            (
                session.query(target_class)
                .filter(
                    ~getattr(target_class, backref).any()
                )
                .delete(synchronize_session=False)
            )