1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
from itertools import chain
import numpy as np
import pandas as pd
from pandas.api.types import union_categoricals
from ..progress import Progress
from ..result import QueryResult
class NumpyQueryResult(QueryResult):
"""
Stores query result from multiple blocks as numpy arrays.
"""
def store(self, packet):
block = getattr(packet, 'block', None)
if block is None:
return
# Header block contains no rows. Pick columns from it.
if block.num_rows:
if self.columnar:
self.data.append(block.get_columns())
else:
self.data.extend(block.get_rows())
elif not self.columns_with_types:
self.columns_with_types = block.columns_with_types
def get_result(self):
"""
:return: stored query result.
"""
for packet in self.packet_generator:
self.store(packet)
if self.columnar:
data = []
# Transpose to a list of columns, each column is list of chunks
for column_chunks in zip(*self.data):
# Concatenate chunks for each column
if isinstance(column_chunks[0], np.ndarray):
column = np.concatenate(column_chunks)
elif isinstance(column_chunks[0], pd.Categorical):
column = union_categoricals(column_chunks)
else:
column = tuple(chain.from_iterable(column_chunks))
data.append(column)
else:
data = self.data
if self.with_column_types:
return data, self.columns_with_types
else:
return data
class NumpyProgressQueryResult(NumpyQueryResult):
"""
Stores query result and progress information from multiple blocks.
Provides iteration over query progress.
"""
def __init__(self, *args, **kwargs):
self.progress_totals = Progress()
super(NumpyProgressQueryResult, self).__init__(*args, **kwargs)
def __iter__(self):
return self
def __next__(self):
while True:
packet = next(self.packet_generator)
progress_packet = getattr(packet, 'progress', None)
if progress_packet:
self.progress_totals.increment(progress_packet)
return (
self.progress_totals.rows, self.progress_totals.total_rows
)
else:
self.store(packet)
def get_result(self):
# Read all progress packets.
for _ in self:
pass
return super(NumpyProgressQueryResult, self).get_result()
class NumpyIterQueryResult(object):
"""
Provides iteration over returned data by chunks (streaming by chunks).
"""
def __init__(
self, packet_generator,
with_column_types=False):
self.packet_generator = packet_generator
self.with_column_types = with_column_types
self.first_block = True
super(NumpyIterQueryResult, self).__init__()
def __iter__(self):
return self
def __next__(self):
packet = next(self.packet_generator)
block = getattr(packet, 'block', None)
if block is None:
return []
if self.first_block and self.with_column_types:
self.first_block = False
rv = [block.columns_with_types]
rv.extend(block.get_rows())
return rv
else:
return block.get_rows()
|