File: expression_parser.py

package info (click to toggle)
python-sqlalchemy-utils 0.30.12-2~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 1,056 kB
  • sloc: python: 10,350; makefile: 160
file content (145 lines) | stat: -rw-r--r-- 3,923 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
try:
    from collections import OrderedDict
except ImportError:
    from ordereddict import OrderedDict

import six
import sqlalchemy as sa
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql.annotation import AnnotatedColumn
from sqlalchemy.sql.elements import (
    Case,
    ClauseList,
    False_,
    Grouping,
    Label,
    Null,
    True_,
    Tuple
)
from sqlalchemy.sql.expression import (
    BinaryExpression,
    BindParameter,
    BooleanClauseList,
    Cast,
    UnaryExpression
)


class ExpressionParser(object):
    parsers = OrderedDict((
        (BinaryExpression, 'binary_expression'),
        (BooleanClauseList, 'boolean_expression'),
        (UnaryExpression, 'unary_expression'),
        (sa.Column, 'column'),
        (AnnotatedColumn, 'column'),
        (BindParameter, 'bind_parameter'),
        (False_, 'false'),
        (True_, 'true'),
        (Grouping, 'grouping'),
        (ClauseList, 'clause_list'),
        (Label, 'label'),
        (Cast, 'cast'),
        (Case, 'case'),
        (Tuple, 'tuple'),
        (Null, 'null'),
        (InstrumentedAttribute, 'instrumented_attribute')
    ))

    def expression(self, expr):
        if expr is None:
            return
        for class_, parser in self.parsers.items():
            if isinstance(expr, class_):
                return getattr(self, parser)(expr)
        raise Exception(
            'Unknown expression type %s' % expr.__class__.__name__
        )

    def instrumented_attribute(self, expr):
        return expr

    def null(self, expr):
        return expr

    def tuple(self, expr):
        return expr.__class__(
            *map(self.expression, expr.clauses),
            type_=expr.type
        )

    def clause_list(self, expr):
        return expr.__class__(
            *map(self.expression, expr.clauses),
            group=expr.group,
            group_contents=expr.group_contents,
            operator=expr.operator
        )

    def label(self, expr):
        return expr.__class__(
            name=expr.name,
            element=self.expression(expr._element),
            type_=expr.type
        )

    def cast(self, expr):
        return expr.__class__(
            expression=self.expression(expr.clause),
            type_=expr.type
        )

    def case(self, expr):
        return expr.__class__(
            whens=[
                tuple(self.expression(x) for x in when) for when in expr.whens
            ],
            value=self.expression(expr.value),
            else_=self.expression(expr.else_)
        )

    def grouping(self, expr):
        return expr.__class__(self.expression(expr.element))

    def true(self, expr):
        return expr

    def false(self, expr):
        return expr

    def process_table(self, table):
        return table

    def column(self, column):
        table = self.process_table(column.table)
        return table.c[column.name]

    def unary_expression(self, expr):
        return expr.operator(self.expression(expr.element))

    def bind_parameter(self, expr):
        # somehow bind parameters passed as unicode are converted to
        # ascii strings along the way, force convert them back to avoid
        # sqlalchemy unicode warnings
        if isinstance(expr.type, sa.Unicode):
            expr.value = six.text_type(expr.value)
        return expr

    def binary_expression(self, expr):
        return expr.__class__(
            left=self.expression(expr.left),
            right=self.expression(expr.right),
            operator=expr.operator,
            type_=expr.type,
            negate=expr.negate,
            modifiers=expr.modifiers.copy()
        )

    def boolean_expression(self, expr):
        return expr.operator(*[
            self.expression(child_expr)
            for child_expr in expr.get_children()
        ])

    def __call__(self, expr):
        return self.expression(expr)