File: lowcardinalitycolumn.py

package info (click to toggle)
python-clickhouse-driver 0.2.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,516 kB
  • sloc: python: 10,950; pascal: 42; makefile: 31; sh: 3
file content (123 lines) | stat: -rw-r--r-- 4,042 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from math import log

from ..reader import read_binary_uint64
from ..writer import write_binary_int64
from .base import Column
from .intcolumn import UInt8Column, UInt16Column, UInt32Column, UInt64Column


def create_low_cardinality_column(spec, column_by_spec_getter, column_options):
    inner = spec[15:-1]
    nested = column_by_spec_getter(inner)
    return LowCardinalityColumn(nested, **column_options)


class LowCardinalityColumn(Column):
    """
    Stores column as index (unique elements) and keys.
    Good for de-duplication of large values with low cardinality.
    """
    int_types = {
        0: UInt8Column,
        1: UInt16Column,
        2: UInt32Column,
        3: UInt64Column
    }

    # Need to read additional keys.
    # Additional keys are stored before indexes as value N and N keys
    # after them.
    has_additional_keys_bit = 1 << 9
    # Need to update dictionary.
    # It means that previous granule has different dictionary.
    need_update_dictionary = 1 << 10

    serialization_type = has_additional_keys_bit | need_update_dictionary

    def __init__(self, nested_column, **kwargs):
        self.nested_column = nested_column
        super(LowCardinalityColumn, self).__init__(**kwargs)

    def read_state_prefix(self, buf):
        return read_binary_uint64(buf)

    def write_state_prefix(self, buf):
        # KeysSerializationVersion. See ClickHouse docs.
        write_binary_int64(1, buf)

    def _write_data(self, items, buf):
        index, keys = [], []
        key_by_index_element = {}

        if self.nested_column.nullable:
            # First element represents NULL if column is nullable.
            index.append(self.nested_column.null_value)
            # Prevent null map writing. Reset nested column nullable flag.
            self.nested_column.nullable = False

            for x in items:
                if x is None:
                    # Zero element for null.
                    keys.append(0)

                else:
                    key = key_by_index_element.get(x)
                    # Get key from index or add it to index.
                    if key is None:
                        key = len(key_by_index_element)
                        key_by_index_element[x] = key
                        index.append(x)

                    keys.append(key + 1)
        else:
            for x in items:
                key = key_by_index_element.get(x)

                # Get key from index or add it to index.
                if key is None:
                    key = len(key_by_index_element)
                    key_by_index_element[x] = len(key_by_index_element)
                    index.append(x)

                keys.append(key)

        # Do not write anything for empty column.
        # May happen while writing empty arrays.
        if not len(index):
            return

        int_type = int(log(len(index), 2) / 8)
        int_column = self.int_types[int_type]()

        serialization_type = self.serialization_type | int_type

        write_binary_int64(serialization_type, buf)
        write_binary_int64(len(index), buf)

        self.nested_column.write_data(index, buf)
        write_binary_int64(len(items), buf)
        int_column.write_items(keys, buf)

    def _read_data(self, n_items, buf, nulls_map=None):
        if not n_items:
            return tuple()

        serialization_type = read_binary_uint64(buf)

        # Lowest byte contains info about key type.
        key_type = serialization_type & 0xf
        keys_column = self.int_types[key_type]()

        nullable = self.nested_column.nullable
        # Prevent null map reading. Reset nested column nullable flag.
        self.nested_column.nullable = False

        index_size = read_binary_uint64(buf)
        index = self.nested_column.read_data(index_size, buf)
        if nullable:
            index = (None, ) + index[1:]

        read_binary_uint64(buf)  # number of keys
        keys = keys_column.read_data(n_items, buf)

        return tuple(index[x] for x in keys)