1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
import sqlalchemy as sa
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint
from sqlalchemy.sql.expression import ClauseElement, Executable
from sqlalchemy_utils.functions import get_columns
class CreateView(DDLElement):
def __init__(self, name, selectable, materialized=False, replace=False):
if materialized and replace:
raise ValueError("Cannot use CREATE OR REPLACE with materialized views")
self.name = name
self.selectable = selectable
self.materialized = materialized
self.replace = replace
@compiler.compiles(CreateView)
def compile_create_materialized_view(element, compiler, **kw):
return 'CREATE {}{}VIEW {} AS {}'.format(
'OR REPLACE ' if element.replace else '',
'MATERIALIZED ' if element.materialized else '',
compiler.dialect.identifier_preparer.quote(element.name),
compiler.sql_compiler.process(element.selectable, literal_binds=True),
)
class DropView(DDLElement):
def __init__(self, name, materialized=False, cascade=True):
self.name = name
self.materialized = materialized
self.cascade = cascade
@compiler.compiles(DropView)
def compile_drop_materialized_view(element, compiler, **kw):
return 'DROP {}VIEW IF EXISTS {} {}'.format(
'MATERIALIZED ' if element.materialized else '',
compiler.dialect.identifier_preparer.quote(element.name),
'CASCADE' if element.cascade else ''
)
def create_table_from_selectable(
name,
selectable,
indexes=None,
metadata=None,
aliases=None,
**kwargs
):
if indexes is None:
indexes = []
if metadata is None:
metadata = sa.MetaData()
if aliases is None:
aliases = {}
args = [
sa.Column(
c.name,
c.type,
key=aliases.get(c.name, c.name),
primary_key=c.primary_key
)
for c in get_columns(selectable)
] + indexes
table = sa.Table(name, metadata, *args, **kwargs)
if not any([c.primary_key for c in get_columns(selectable)]):
table.append_constraint(
PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)])
)
return table
def create_materialized_view(
name,
selectable,
metadata,
indexes=None,
aliases=None
):
""" Create a view on a given metadata
:param name: The name of the view to create.
:param selectable: An SQLAlchemy selectable e.g. a select() statement.
:param metadata:
An SQLAlchemy Metadata instance that stores the features of the
database being described.
:param indexes: An optional list of SQLAlchemy Index instances.
:param aliases:
An optional dictionary containing with keys as column names and values
as column aliases.
Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
statement is emitted instead of a ``CREATE VIEW``.
"""
table = create_table_from_selectable(
name=name,
selectable=selectable,
indexes=indexes,
metadata=None,
aliases=aliases
)
sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, materialized=True)
)
@sa.event.listens_for(metadata, 'after_create')
def create_indexes(target, connection, **kw):
for idx in table.indexes:
idx.create(connection)
sa.event.listen(
metadata,
'before_drop',
DropView(name, materialized=True)
)
return table
def create_view(
name,
selectable,
metadata,
cascade_on_drop=True,
replace=False,
):
""" Create a view on a given metadata
:param name: The name of the view to create.
:param selectable: An SQLAlchemy selectable e.g. a select() statement.
:param metadata:
An SQLAlchemy Metadata instance that stores the features of the
database being described.
:param cascade_on_drop: If ``True`` the view will be dropped with
``CASCADE``, deleting all dependent objects as well.
:param replace: If ``True`` the view will be created with ``OR REPLACE``,
replacing an existing view with the same name.
The process for creating a view is similar to the standard way that a
table is constructed, except that a selectable is provided instead of
a set of columns. The view is created once a ``CREATE`` statement is
executed against the supplied metadata (e.g. ``metadata.create_all(..)``),
and dropped when a ``DROP`` is executed against the metadata.
To create a view that performs basic filtering on a table. ::
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('fullname', String),
Column('premium_user', Boolean, default=False),
)
premium_members = select(users).where(users.c.premium_user == True)
# sqlalchemy 1.3:
# premium_members = select([users]).where(users.c.premium_user == True)
create_view('premium_users', premium_members, metadata)
metadata.create_all(engine) # View is created at this point
"""
table = create_table_from_selectable(
name=name,
selectable=selectable,
metadata=None
)
sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, replace=replace),
)
@sa.event.listens_for(metadata, 'after_create')
def create_indexes(target, connection, **kw):
for idx in table.indexes:
idx.create(connection)
sa.event.listen(
metadata,
'before_drop',
DropView(name, cascade=cascade_on_drop)
)
return table
class RefreshMaterializedView(Executable, ClauseElement):
inherit_cache = True
def __init__(self, name, concurrently):
self.name = name
self.concurrently = concurrently
@compiler.compiles(RefreshMaterializedView)
def compile_refresh_materialized_view(element, compiler):
return 'REFRESH MATERIALIZED VIEW {concurrently}{name}'.format(
concurrently='CONCURRENTLY ' if element.concurrently else '',
name=compiler.dialect.identifier_preparer.quote(element.name),
)
def refresh_materialized_view(session, name, concurrently=False):
""" Refreshes an already existing materialized view
:param session: An SQLAlchemy Session instance.
:param name: The name of the materialized view to refresh.
:param concurrently:
Optional flag that causes the ``CONCURRENTLY`` parameter
to be specified when the materialized view is refreshed.
"""
# Since session.execute() bypasses autoflush, we must manually flush in
# order to include newly-created/modified objects in the refresh.
session.flush()
session.execute(RefreshMaterializedView(name, concurrently))
|