File: versioned_rows.py

package info (click to toggle)
sqlalchemy 1.0.15%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 13,056 kB
  • ctags: 26,600
  • sloc: python: 169,901; ansic: 1,346; makefile: 260; xml: 17
file content (105 lines) | stat: -rw-r--r-- 2,821 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Illustrates a method to intercept changes on objects, turning
an UPDATE statement on a single row into an INSERT statement, so that a new
row is inserted with the new data, keeping the old row intact.

"""
from sqlalchemy.orm import *
from sqlalchemy import *
from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import attributes

class Versioned(object):
    def new_version(self, session):
        # if on SQLA 0.6.1 or earlier,
        # make sure 'id' isn't expired.
        # self.id

        # make us transient (removes persistent
        # identity).
        make_transient(self)

        # set 'id' to None.
        # a new PK will be generated on INSERT.
        self.id = None

class VersionExtension(SessionExtension):
    def before_flush(self, session, flush_context, instances):
        for instance in session.dirty:
            if not isinstance(instance, Versioned):
                continue
            if not session.is_modified(instance, passive=True):
                continue

            if not attributes.instance_state(instance).has_identity:
                continue

            # make it transient
            instance.new_version(session)
            # re-add
            session.add(instance)

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(engine, extension=[VersionExtension()])

# example 1, simple versioning

class Example(Versioned, Base):
    __tablename__ = 'example'
    id = Column(Integer, primary_key=True)
    data = Column(String)

Base.metadata.create_all(engine)

session = Session()
e1 = Example(data='e1')
session.add(e1)
session.commit()

e1.data = 'e2'
session.commit()

assert session.query(Example.id, Example.data).order_by(Example.id).all() == \
        [(1, 'e1'), (2, 'e2')]

# example 2, versioning with a parent

class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True)
    child_id = Column(Integer, ForeignKey('child.id'))
    child = relationship("Child", backref=backref('parent', uselist=False))

class Child(Versioned, Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    data = Column(String)

    def new_version(self, session):
        # expire parent's reference to us
        session.expire(self.parent, ['child'])

        # create new version
        Versioned.new_version(self, session)

        # re-add ourselves to the parent
        self.parent.child = self

Base.metadata.create_all(engine)

session = Session()

p1 = Parent(child=Child(data='c1'))
session.add(p1)
session.commit()

p1.child.data = 'c2'
session.commit()

assert p1.child_id == 2
assert session.query(Child.id, Child.data).order_by(Child.id).all() == \
    [(1, 'c1'), (2, 'c2')]