File: enumcolumn.py

package info (click to toggle)
python-clickhouse-driver 0.2.5-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,516 kB
  • sloc: python: 10,950; pascal: 42; makefile: 29; sh: 3
file content (119 lines) | stat: -rw-r--r-- 3,219 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from enum import Enum

from .. import errors
from .intcolumn import IntColumn


class EnumColumn(IntColumn):
    py_types = (Enum, int, str)

    def __init__(self, enum_cls, **kwargs):
        self.enum_cls = enum_cls
        super(EnumColumn, self).__init__(**kwargs)

    def before_write_items(self, items, nulls_map=None):
        null_value = self.null_value

        enum_cls = self.enum_cls

        for i, item in enumerate(items):
            if nulls_map and nulls_map[i]:
                items[i] = null_value
                continue

            source_value = item.name if isinstance(item, Enum) else item

            # Check real enum value
            try:
                if isinstance(source_value, str):
                    items[i] = enum_cls[source_value].value
                else:
                    items[i] = enum_cls(source_value).value
            except (ValueError, KeyError):
                choices = ', '.join(
                    "'{}' = {}".format(x.name.replace("'", r"\'"), x.value)
                    for x in enum_cls
                )
                enum_str = '{}({})'.format(enum_cls.__name__, choices)

                raise errors.LogicalError(
                    "Unknown element '{}' for type {}"
                    .format(source_value, enum_str)
                )

    def after_read_items(self, items, nulls_map=None):
        enum_cls = self.enum_cls

        if nulls_map is None:
            return tuple(enum_cls(item).name for item in items)
        else:
            return tuple(
                (None if is_null else enum_cls(items[i]).name)
                for i, is_null in enumerate(nulls_map)
            )


class Enum8Column(EnumColumn):
    ch_type = 'Enum8'
    format = 'b'
    int_size = 1


class Enum16Column(EnumColumn):
    ch_type = 'Enum16'
    format = 'h'
    int_size = 2


def create_enum_column(spec, column_options):
    if spec.startswith('Enum8'):
        params = spec[6:-1]
        cls = Enum8Column
    else:
        params = spec[7:-1]
        cls = Enum16Column

    return cls(Enum(cls.ch_type, _parse_options(params)), **column_options)


def _parse_options(option_string):
    options = dict()
    after_name = False
    escaped = False
    quote_character = None
    name = ''
    value = ''

    for ch in option_string:
        if escaped:
            name += ch
            escaped = False  # accepting escaped character

        elif after_name:
            if ch in (' ', '='):
                pass
            elif ch == ',':
                options[name] = int(value)
                after_name = False
                name = ''
                value = ''  # reset before collecting new option
            else:
                value += ch

        elif quote_character:
            if ch == '\\':
                escaped = True
            elif ch == quote_character:
                quote_character = None
                after_name = True  # start collecting option value
            else:
                name += ch

        else:
            if ch == "'":
                quote_character = ch

    if after_name:
        options.setdefault(name, int(value))  # append word after last comma

    return options