1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
from itertools import chain
from struct import Struct
from .base import Column
from .intcolumn import UInt64Column
from ..util.helpers import pairwise
class ArrayColumn(Column):
"""
Nested arrays written in flatten form after information about their
sizes (offsets really).
One element of array of arrays can be represented as tree:
(0 depth) [[3, 4], [5, 6]]
| |
(1 depth) [3, 4] [5, 6]
| | | |
(leaf) 3 4 5 6
Offsets (sizes) written in breadth-first search order. In example above
following sequence of offset will be written: 4 -> 2 -> 4
1) size of whole array: 4
2) size of array 1 in depth=1: 2
3) size of array 2 plus size of all array before in depth=1: 2 + 2 = 4
After sizes info comes flatten data: 3 -> 4 -> 5 -> 6
"""
py_types = (list, tuple)
def __init__(self, nested_column, **kwargs):
self.size_column = UInt64Column()
self.nested_column = nested_column
self._write_depth_0_size = True
super(ArrayColumn, self).__init__(**kwargs)
self.null_value = []
def write_data(self, data, buf):
# Column of Array(T) is stored in "compact" format and passed to server
# wrapped into another Array without size of wrapper array.
self.nested_column = ArrayColumn(self.nested_column)
self.nested_column.nullable = self.nullable
self.nullable = False
self._write_depth_0_size = False
self._write(data, buf)
def read_data(self, rows, buf):
self.nested_column = ArrayColumn(self.nested_column)
self.nested_column.nullable = self.nullable
self.nullable = False
return self._read(rows, buf)[0]
def _write_sizes(self, value, buf):
nulls_map = []
column = self
sizes = [len(value)] if self._write_depth_0_size else []
while True:
nested_column = column.nested_column
if not isinstance(nested_column, ArrayColumn):
if column.nullable:
nulls_map = [x is None for x in value]
break
offset = 0
new_value = []
for x in value:
offset += len(x)
sizes.append(offset)
new_value.extend(x)
value = new_value
column = nested_column
if nulls_map:
self._write_nulls_map(nulls_map, buf)
ns = Struct('<{}Q'.format(len(sizes)))
buf.write(ns.pack(*sizes))
def _write_data(self, value, buf):
if self.nullable:
value = value or []
if isinstance(self.nested_column, ArrayColumn):
value = list(chain.from_iterable(value))
if value:
self.nested_column._write_data(value, buf)
def _write_nulls_data(self, value, buf):
if self.nullable:
value = value or []
if isinstance(self.nested_column, ArrayColumn):
value = list(chain.from_iterable(value))
self.nested_column._write_nulls_data(value, buf)
else:
if self.nested_column.nullable:
self.nested_column._write_nulls_map(value, buf)
def _write(self, value, buf):
value = self.prepare_items(value)
self._write_sizes(value, buf)
self._write_nulls_data(value, buf)
self._write_data(value, buf)
def read_state_prefix(self, buf):
return self.nested_column.read_state_prefix(buf)
def write_state_prefix(self, buf):
self.nested_column.write_state_prefix(buf)
def _read(self, size, buf):
slices_series = [[0, size]]
nested_column = self.nested_column
cur_level_slice_size = size
cur_level_slice = None
while (isinstance(nested_column, ArrayColumn)):
if cur_level_slice is None:
cur_level_slice = [0]
ns = Struct('<{}Q'.format(cur_level_slice_size))
nested_sizes = ns.unpack(buf.read(ns.size))
cur_level_slice.extend(nested_sizes)
slices_series.append(cur_level_slice)
cur_level_slice = None
cur_level_slice_size = nested_sizes[-1] if len(nested_sizes) > 0 \
else 0
nested_column = nested_column.nested_column
n_items = cur_level_slice_size if size > 0 else 0
nulls_map = None
if nested_column.nullable:
nulls_map = self._read_nulls_map(n_items, buf)
data = []
if n_items:
data = list(nested_column._read_data(
n_items, buf, nulls_map=nulls_map
))
# Build nested structure.
for slices in reversed(slices_series):
data = [data[begin:end] for begin, end in pairwise(slices)]
return tuple(data)
def create_array_column(spec, column_by_spec_getter, column_options):
inner = spec[6:-1]
return ArrayColumn(column_by_spec_getter(inner), **column_options)
|