File: query.py

package info (click to toggle)
django-hvad 1.8.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 696 kB
  • sloc: python: 3,554; makefile: 110
file content (131 lines) | stat: -rw-r--r-- 4,699 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import django
from django.db.models import Q, FieldDoesNotExist
from django.db.models.expressions import Expression, Col
from django.db.models.sql.where import AND
from collections import namedtuple

__all__ = ()

#===============================================================================
# Generators abstracting walking through internal django structures

QueryTerm = namedtuple('QueryTerm', 'depth term model field translated target many')

def query_terms(model, path):
    """ Yields QueryTerms of given path starting from given model.
        - model can be either a regular model or a translatable model
    """
    bits = path.split('__')
    field = None
    for depth, bit in enumerate(bits):
        # STEP 1 -- Resolve the field
        if bit == 'pk': # handle 'pk' alias
            bit = model._meta.pk.name

        try:
            try:                        # is field on the shared model?
                field = model._meta.get_field.real(bit)
                translated = False
            except FieldDoesNotExist:   # nope, get field from translations model
                field = model._meta.translations_model._meta.get_field(bit)
                translated = True
            except AttributeError:      # current model is a standard model
                field = model._meta.get_field(bit)
                translated = False
            direct = (
                not field.auto_created or
                getattr(field, 'db_column', None) or
                getattr(field, 'attname', None)
            )
        except FieldDoesNotExist:
            break


        # STEP 2 -- Find out the target of the relation, if it is one
        if direct:  # field is on model
            if django.VERSION >= (1, 9):
                if field.remote_field:      # field is a foreign key, follow it
                    target = field.remote_field.model._meta.concrete_model
                else:
                    target = None           # field is a regular field
            else:
                if field.rel:               # field is a foreign key, follow it
                    target = field.rel.to._meta.concrete_model
                else:
                    target = None           # field is a regular field
        else:       # field is a m2m or reverse fk, follow it
            target = field.related_model._meta.concrete_model

        yield QueryTerm(
            depth=depth,
            term=bit,
            model=model,
            field=field,
            translated=translated,
            target=target,
            many=not direct
        )

        # Onto next iteration
        if target is None:
            depth += 1   # we hit a regular field, mark it as yielded then break
            break        # through to lookup/transform flushing
        model = target

    else:
        return  # all bits were recognized as fields, job done

    # STEP 3 -- Flush lookup/transform bits - do not handle invalid stuff, Django will do it
    for depth, bit in enumerate(bits[depth:], depth):
        yield QueryTerm(
            depth=depth,
            term=bit,
            model=model,
            field=None,
            translated=None,
            target=None,
            many=False
        )


def q_children(q):
    ''' Recursively visit a Q object, yielding each (key, value) pair found.
        - q: the Q object to visit
        - Yields a 3-tuple ((key, value), containing_list, index_in_list) so
          as to allow updating the tuple in the list
    '''
    todo = [q]
    while todo:
        q = todo.pop()
        for index, child in enumerate(q.children):
            if isinstance(child, Q):
                todo.append(child)
            else:
                yield child, q.children, index

def expression_nodes(expression):
    ''' Recursively visit an expression object, yielding each node in turn.
        - expression: the expression object to visit
    '''
    todo = [expression]
    while todo:
        expression = todo.pop()
        if expression is not None:
            yield expression
        if isinstance(expression, Expression):
            todo.extend(expression.get_source_expressions())

#===============================================================================
# Query manipulations

def add_alias_constraints(queryset, alias, **kwargs):
    model, alias = alias
    clause = queryset.query.where_class()
    for lookup, value in kwargs.items():
        field_name, lookup = lookup.split('__')
        clause.add(queryset.query.build_lookup(
            [lookup],
            Col(alias, model._meta.get_field(field_name)),
            value
        ), AND)
    queryset.query.where.add(clause, AND)