1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
import sqlalchemy as sa
import sqlalchemy.orm
from sqlalchemy.sql.util import ClauseAdapter
from .chained_join import chained_join # noqa
def path_to_relationships(path, cls):
relationships = []
for path_name in path.split('.'):
rel = getattr(cls, path_name)
relationships.append(rel)
cls = rel.mapper.class_
return relationships
def adapt_expr(expr, *selectables):
for selectable in selectables:
expr = ClauseAdapter(selectable).traverse(expr)
return expr
def inverse_join(selectable, left_alias, right_alias, relationship):
if relationship.property.secondary is not None:
secondary_alias = sa.alias(relationship.property.secondary)
return selectable.join(
secondary_alias,
adapt_expr(
relationship.property.secondaryjoin,
sa.inspect(left_alias).selectable,
secondary_alias,
),
).join(
right_alias,
adapt_expr(
relationship.property.primaryjoin,
sa.inspect(right_alias).selectable,
secondary_alias,
),
)
else:
join = sa.orm.join(right_alias, left_alias, relationship)
onclause = join.onclause
return selectable.join(right_alias, onclause)
def relationship_to_correlation(relationship, alias):
if relationship.property.secondary is not None:
return adapt_expr(
relationship.property.primaryjoin,
alias,
)
else:
return sa.orm.join(relationship.parent, alias, relationship).onclause
def chained_inverse_join(relationships, leaf_model):
selectable = sa.inspect(leaf_model).selectable
aliases = [leaf_model]
for index, relationship in enumerate(relationships[1:]):
aliases.append(sa.orm.aliased(relationship.mapper.class_))
selectable = inverse_join(
selectable, aliases[index], aliases[index + 1], relationships[index]
)
if relationships[-1].property.secondary is not None:
secondary_alias = sa.alias(relationships[-1].property.secondary)
selectable = selectable.join(
secondary_alias,
adapt_expr(
relationships[-1].property.secondaryjoin,
secondary_alias,
sa.inspect(aliases[-1]).selectable,
),
)
aliases.append(secondary_alias)
return selectable, aliases
def select_correlated_expression(
root_model, expr, path, leaf_model, from_obj=None, order_by=None, correlate=True
):
relationships = list(reversed(path_to_relationships(path, root_model)))
query = sa.select(expr)
join_expr, aliases = chained_inverse_join(relationships, leaf_model)
if order_by:
query = query.order_by(
*[
adapt_expr(o, *(sa.inspect(alias).selectable for alias in aliases))
for o in order_by
]
)
condition = relationship_to_correlation(relationships[-1], aliases[-1])
if from_obj is not None:
condition = adapt_expr(condition, from_obj)
query = query.select_from(join_expr.selectable)
if correlate:
query = query.correlate(from_obj if from_obj is not None else root_model)
return query.where(condition)
|