1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
from copy import copy
from django.db.models import Manager, sql
from django.db.models.expressions import Ref
from django.db.models.query import Q, QuerySet, ValuesIterable
from django.db.models.sql.datastructures import BaseTable
from .jitmixin import jit_mixin
from .join import QJoin, INNER
from .meta import CTEColumnRef, CTEColumns
from .query import CTEQuery
from ._deprecated import deprecated
__all__ = ["CTE", "with_cte"]
def with_cte(*ctes, select):
"""Add Common Table Expression(s) (CTEs) to a model or queryset
:param *ctes: One or more CTE objects.
:param select: A model class, queryset, or CTE to use as the base
query to which CTEs are attached.
:returns: A queryset with the given CTE added to it.
"""
if isinstance(select, CTE):
select = select.queryset()
elif not isinstance(select, QuerySet):
select = select._default_manager.all()
jit_mixin(select.query, CTEQuery)
select.query._with_ctes += ctes
return select
class CTE:
"""Common Table Expression
:param queryset: A queryset to use as the body of the CTE.
:param name: Optional name parameter for the CTE (default: "cte").
This must be a unique name that does not conflict with other
entities (tables, views, functions, other CTE(s), etc.) referenced
in the given query as well any query to which this CTE will
eventually be added.
:param materialized: Optional parameter (default: False) which enforce
using of MATERIALIZED statement for supporting databases.
"""
def __init__(self, queryset, name="cte", materialized=False):
self.query = None if queryset is None else queryset.query
self.name = name
self.col = CTEColumns(self)
self.materialized = materialized
def __getstate__(self):
return (self.query, self.name, self.materialized)
def __setstate__(self, state):
self.query, self.name, self.materialized = state
self.col = CTEColumns(self)
def __repr__(self):
return "<With {}>".format(self.name)
@classmethod
def recursive(cls, make_cte_queryset, name="cte", materialized=False):
"""Recursive Common Table Expression
:param make_cte_queryset: Function taking a single argument (a
not-yet-fully-constructed cte object) and returning a `QuerySet`
object. The returned `QuerySet` normally consists of an initial
statement unioned with a recursive statement.
:param name: See `name` parameter of `__init__`.
:param materialized: See `materialized` parameter of `__init__`.
:returns: The fully constructed recursive cte object.
"""
cte = cls(None, name, materialized)
cte.query = make_cte_queryset(cte).query
return cte
def join(self, model_or_queryset, *filter_q, **filter_kw):
"""Join this CTE to the given model or queryset
This CTE will be referenced by the returned queryset, but the
corresponding `WITH ...` statement will not be prepended to the
queryset's SQL output; use `with_cte(cte, select=cte.join(...))`
to achieve that outcome.
:param model_or_queryset: Model class or queryset to which the
CTE should be joined.
:param *filter_q: Join condition Q expressions (optional).
:param **filter_kw: Join conditions. All LHS fields (kwarg keys)
are assumed to reference `model_or_queryset` fields. Use
`cte.col.name` on the RHS to recursively reference CTE query
columns. For example: `cte.join(Book, id=cte.col.id)`
:returns: A queryset with the given model or queryset joined to
this CTE.
"""
if isinstance(model_or_queryset, QuerySet):
queryset = model_or_queryset.all()
else:
queryset = model_or_queryset._default_manager.all()
join_type = filter_kw.pop("_join_type", INNER)
query = queryset.query
# based on Query.add_q: add necessary joins to query, but no filter
q_object = Q(*filter_q, **filter_kw)
map = query.alias_map
existing_inner = set(a for a in map if map[a].join_type == INNER)
on_clause, _ = query._add_q(q_object, query.used_aliases)
query.demote_joins(existing_inner)
parent = query.get_initial_alias()
query.join(QJoin(parent, self.name, self.name, on_clause, join_type))
return queryset
def queryset(self):
"""Get a queryset selecting from this CTE
This CTE will be referenced by the returned queryset, but the
corresponding `WITH ...` statement will not be prepended to the
queryset's SQL output; use `with_cte(cte, select=cte)` to do
that.
:returns: A queryset.
"""
cte_query = self.query
qs = cte_query.model._default_manager.get_queryset()
query = jit_mixin(sql.Query(cte_query.model), CTEQuery)
query.join(BaseTable(self.name, None))
query.default_cols = cte_query.default_cols
query.deferred_loading = cte_query.deferred_loading
if cte_query.values_select:
query.set_values(cte_query.values_select)
qs._iterable_class = ValuesIterable
for alias in getattr(cte_query, "selected", None) or ():
if alias not in cte_query.annotations:
col = Ref(alias, cte_query.resolve_ref(alias))
query.add_annotation(col, alias)
if cte_query.annotations:
for alias, value in cte_query.annotations.items():
col = CTEColumnRef(alias, self.name, value.output_field)
query.add_annotation(col, alias)
query.annotation_select_mask = cte_query.annotation_select_mask
qs.query = query
return qs
def _resolve_ref(self, name):
selected = getattr(self.query, "selected", None)
if selected and name in selected and name not in self.query.annotations:
return Ref(name, self.query.resolve_ref(name))
return self.query.resolve_ref(name)
def resolve_expression(self, *args, **kw):
if self.query is None:
raise ValueError("Cannot resolve recursive CTE without a query.")
clone = copy(self)
clone.query = clone.query.resolve_expression(*args, **kw)
return clone
@deprecated("Use `django_cte.CTE` instead.")
class With(CTE):
@staticmethod
@deprecated("Use `django_cte.CTE.recursive` instead.")
def recursive(*args, **kw):
return CTE.recursive(*args, **kw)
@deprecated("CTEQuerySet is deprecated. "
"CTEs can now be applied to any queryset using `with_cte()`")
class CTEQuerySet(QuerySet):
"""QuerySet with support for Common Table Expressions"""
def __init__(self, model=None, query=None, using=None, hints=None):
# Only create an instance of a Query if this is the first invocation in
# a query chain.
super(CTEQuerySet, self).__init__(model, query, using, hints)
jit_mixin(self.query, CTEQuery)
@deprecated("Use `django_cte.with_cte(cte, select=...)` instead.")
def with_cte(self, cte):
qs = self._clone()
qs.query._with_ctes += cte,
return qs
def as_manager(cls):
# Address the circular dependency between
# `CTEQuerySet` and `CTEManager`.
manager = CTEManager.from_queryset(cls)()
manager._built_with_as_manager = True
return manager
as_manager.queryset_only = True
as_manager = classmethod(as_manager)
@deprecated("CTEMAnager is deprecated. "
"CTEs can now be applied to any queryset using `with_cte()`")
class CTEManager(Manager.from_queryset(CTEQuerySet)):
"""Manager for models that perform CTE queries"""
@classmethod
def from_queryset(cls, queryset_class, class_name=None):
if not issubclass(queryset_class, CTEQuerySet):
raise TypeError(
"models with CTE support need to use a CTEQuerySet")
return super(CTEManager, cls).from_queryset(
queryset_class, class_name=class_name)
|