File: temporal_range.py

package info (click to toggle)
sqlalchemy 2.0.40%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 26,404 kB
  • sloc: python: 410,002; makefile: 230; sh: 7
file content (140 lines) | stat: -rw-r--r-- 3,754 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Illustrates a custom per-query criteria that will be applied
to selected entities.


"""

import datetime
from functools import partial

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import orm
from sqlalchemy import select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker


class HasTemporal:
    """Mixin that identifies a class as having a timestamp column"""

    timestamp = Column(
        DateTime,
        default=partial(datetime.datetime.now, datetime.timezone.utc),
        nullable=False,
    )


def temporal_range(range_lower, range_upper):
    return orm.with_loader_criteria(
        HasTemporal,
        lambda cls: cls.timestamp.between(range_lower, range_upper),
        include_aliases=True,
    )


if __name__ == "__main__":
    Base = declarative_base()

    class Parent(HasTemporal, Base):
        __tablename__ = "parent"
        id = Column(Integer, primary_key=True)
        children = relationship("Child")

    class Child(HasTemporal, Base):
        __tablename__ = "child"
        id = Column(Integer, primary_key=True)
        parent_id = Column(Integer, ForeignKey("parent.id"), nullable=False)

    engine = create_engine("sqlite://", echo=True)
    Base.metadata.create_all(engine)

    Session = sessionmaker(bind=engine)

    sess = Session()

    c1, c2, c3, c4, c5 = [
        Child(timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00)),
        Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
        Child(timestamp=datetime.datetime(2009, 10, 20, 12, 00, 00)),
        Child(timestamp=datetime.datetime(2009, 10, 12, 12, 00, 00)),
        Child(timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00)),
    ]

    p1 = Parent(
        timestamp=datetime.datetime(2009, 10, 15, 12, 00, 00),
        children=[c1, c2, c3],
    )
    p2 = Parent(
        timestamp=datetime.datetime(2009, 10, 17, 12, 00, 00),
        children=[c4, c5],
    )

    sess.add_all([p1, p2])
    sess.commit()

    # use populate_existing() to ensure the range option takes
    # place for elements already in the identity map

    parents = (
        sess.query(Parent)
        .populate_existing()
        .options(
            temporal_range(
                datetime.datetime(2009, 10, 16, 12, 00, 00),
                datetime.datetime(2009, 10, 18, 12, 00, 00),
            )
        )
        .all()
    )

    assert parents[0] == p2
    assert parents[0].children == [c5]

    sess.expire_all()

    # try it with eager load
    parents = (
        sess.query(Parent)
        .options(
            temporal_range(
                datetime.datetime(2009, 10, 16, 12, 00, 00),
                datetime.datetime(2009, 10, 18, 12, 00, 00),
            )
        )
        .options(selectinload(Parent.children))
        .all()
    )

    assert parents[0] == p2
    assert parents[0].children == [c5]

    sess.expire_all()

    # illustrate a 2.0 style query
    print("------------------")
    parents = (
        sess.execute(
            select(Parent)
            .execution_options(populate_existing=True)
            .options(
                temporal_range(
                    datetime.datetime(2009, 10, 15, 11, 00, 00),
                    datetime.datetime(2009, 10, 18, 12, 00, 00),
                )
            )
            .join(Parent.children)
            .filter(Child.id == 2)
        )
        .scalars()
        .all()
    )

    assert parents[0] == p1
    print("-------------------")
    assert parents[0].children == [c1, c2]