File: _models.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (213 lines) | stat: -rw-r--r-- 7,329 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import json
from abc import ABCMeta, abstractmethod
from decimal import Decimal
from enum import Enum
from typing import Iterator, List, Any, Union, Optional, Dict

from . import _converters
from .exceptions import KustoMultiApiError, KustoStreamingQueryError


class WellKnownDataSet(str, Enum):
    """Categorizes data tables according to the role they play in the data set that a Kusto query returns."""

    PrimaryResult = "PrimaryResult"
    QueryCompletionInformation = "QueryCompletionInformation"
    TableOfContents = "TableOfContents"
    QueryProperties = "QueryProperties"


class KustoResultRow:
    """Iterator over a Kusto result row."""

    conversion_funcs = {"datetime": _converters.to_datetime, "timespan": _converters.to_timedelta, "decimal": Decimal}

    def __init__(self, columns: "List[KustoResultColumn]", row: list):
        self._value_by_name = {}
        self._value_by_index = []

        for i, value in enumerate(row):
            column = columns[i]
            try:
                column_type = column.column_type.lower()
            except AttributeError:
                self._value_by_index.append(value)
                self._value_by_name[columns[i]] = value
                continue

            # If you are here to read this, you probably hit some datetime/timedelta inconsistencies.
            # Azure-Data-Explorer(Kusto) supports 7 decimal digits, while the corresponding python types supports only 6.
            # One example why one might want this precision, is when working with pandas.
            # In that case, use azure.kusto.data.helpers.dataframe_from_result_table which takes into account the original value.
            typed_value = self.get_typed_value(column_type, value)

            self._value_by_index.append(typed_value)
            self._value_by_name[column.column_name] = typed_value

    @staticmethod
    def get_typed_value(column_type: str, value: Any) -> Any:
        return KustoResultRow.conversion_funcs[column_type](value) if value is not None and column_type in KustoResultRow.conversion_funcs else value

    @property
    def columns_count(self) -> int:
        return len(self._value_by_name)

    def __iter__(self) -> Iterator[Any]:
        for i in range(self.columns_count):
            yield self[i]

    def __getitem__(self, key: Union[str, int]) -> Any:
        if isinstance(key, int):
            return self._value_by_index[key]
        return self._value_by_name[key]

    def __len__(self) -> int:
        return self.columns_count

    def to_dict(self) -> Dict[str, Any]:
        return self._value_by_name

    def to_list(self) -> list:
        return self._value_by_index

    def __str__(self) -> str:
        return "['{}']".format("', '".join([str(val) for val in self._value_by_index]))

    def __repr__(self) -> str:
        values = [repr(val) for val in self._value_by_name.values()]
        return "KustoResultRow(['{}'], [{}])".format("', '".join(self._value_by_name), ", ".join(values))

    def __eq__(self, other) -> bool:
        if len(self) != len(other):
            return False
        for value_index, value in enumerate(self):
            if value != other[value_index]:
                return False
        return True


class KustoResultColumn:
    def __init__(self, json_column: Dict[str, Any], ordinal: int):
        self.column_name = json_column["ColumnName"]
        self.column_type = json_column.get("ColumnType") or json_column["DataType"]
        self.ordinal = ordinal

    def __repr__(self) -> str:
        return "KustoResultColumn({},{})".format(json.dumps({"ColumnName": self.column_name, "ColumnType": self.column_type}), self.ordinal)


class BaseKustoResultTable(metaclass=ABCMeta):
    def __init__(self, json_table: Dict[str, Any]):
        self.table_name = json_table.get("TableName")
        self.table_id = json_table.get("TableId")
        self.table_kind = WellKnownDataSet[json_table["TableKind"]] if "TableKind" in json_table else None
        self.columns = [KustoResultColumn(column, index) for index, column in enumerate(json_table["Columns"])]

        self.raw_columns = json_table["Columns"]
        self.raw_rows = json_table["Rows"]
        self.kusto_result_rows = None

    def __bool__(self) -> bool:
        return any(self.columns)

    __nonzero__ = __bool__

    @property
    def columns_count(self) -> int:
        return len(self.columns)

    @abstractmethod
    def __len__(self) -> Optional[int]:
        pass

    @property
    @abstractmethod
    def rows_count(self) -> int:
        pass


class BaseStreamingKustoResultTable(BaseKustoResultTable):
    def __init__(self, json_table: Dict[str, Any]):
        super().__init__(json_table)

        self.finished = False
        self.row_count = 0

    @property
    def rows_count(self) -> int:
        if not self.finished:
            raise KustoStreamingQueryError("Can't retrieve rows count before the iteration is finished")
        return self.row_count

    def __len__(self) -> Optional[int]:
        if not self.finished:
            return None  # We return None here instead of an exception, because otherwise calling list() on the object will throw
        return self.rows_count

    def iter_rows(self) -> "BaseStreamingKustoResultTable":
        return self


class KustoResultTable(BaseKustoResultTable):
    """Iterator over a Kusto result table."""

    def __init__(self, json_table: Dict[str, Any]):
        super().__init__(json_table)
        errors = [row for row in json_table["Rows"] if isinstance(row, dict)]
        if errors:
            raise KustoMultiApiError(errors)

    @property
    def rows(self) -> List[KustoResultRow]:
        if not self.kusto_result_rows:
            self.kusto_result_rows = [KustoResultRow(self.columns, row) for row in self.raw_rows]
        return self.kusto_result_rows

    def to_dict(self) -> Dict[str, Any]:
        """Converts the table to a dict."""
        return {"name": self.table_name, "kind": self.table_kind, "data": [r.to_dict() for r in self]}

    @property
    def rows_count(self) -> int:
        return len(self.raw_rows)

    def __len__(self) -> int:
        return self.rows_count

    def __iter__(self) -> Iterator[KustoResultRow]:
        for row_index, row in enumerate(self.raw_rows):
            if self.kusto_result_rows:
                yield self.kusto_result_rows[row_index]
            else:
                yield KustoResultRow(self.columns, row)

    def __getitem__(self, key: int) -> KustoResultRow:
        return self.rows[key]

    def __str__(self) -> str:
        d = self.to_dict()
        # enum is not serializable, using value instead
        d["kind"] = d["kind"].value
        return json.dumps(d, default=str)


class KustoStreamingResultTable(BaseStreamingKustoResultTable):
    """
    Iterator over a Kusto result table in streaming.
    This class can be iterated only once.
    """

    def __next__(self) -> KustoResultRow:
        try:
            row = next(self.raw_rows)
        except StopIteration:
            self.finished = True
            raise
        self.row_count += 1
        return KustoResultRow(self.columns, row)

    def __iter__(self) -> Iterator[KustoResultRow]:
        return self