1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import json
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Union, Callable, Dict, Optional
if TYPE_CHECKING:
import pandas as pd
from azure.kusto.data._models import KustoResultTable, KustoStreamingResultTable
# Alias for dataframe_from_result_table converter type
Converter = Dict[str, Union[str, Callable[[str, "pd.DataFrame"], "pd.Series"]]]
def load_bundled_json(file_name: str) -> Dict:
filename = Path(__file__).absolute().parent.joinpath(file_name)
with filename.open("r", encoding="utf-8") as data:
return json.load(data)
@lru_cache(maxsize=1, typed=False)
def default_dict() -> Converter:
import pandas as pd
return {
"string": lambda col, df: df[col].astype(pd.StringDtype()) if hasattr(pd, "StringDType") else df[col],
"guid": lambda col, df: df[col],
"uuid": lambda col, df: df[col],
"uniqueid": lambda col, df: df[col],
"dynamic": lambda col, df: df[col],
"bool": lambda col, df: df[col].astype(bool),
"boolean": lambda col, df: df[col].astype(bool),
"int": lambda col, df: df[col].astype(pd.Int32Dtype()),
"int32": lambda col, df: df[col].astype(pd.Int32Dtype()),
"int64": lambda col, df: df[col].astype(pd.Int64Dtype()),
"long": lambda col, df: df[col].astype(pd.Int64Dtype()),
"real": lambda col, df: parse_float(df, col),
"double": lambda col, df: parse_float(df, col),
"decimal": lambda col, df: parse_float(df, col),
"datetime": lambda col, df: parse_datetime(df, col),
"date": lambda col, df: parse_datetime(df, col),
"timespan": lambda col, df: df[col].apply(parse_timedelta),
"time": lambda col, df: df[col].apply(parse_timedelta),
}
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
def dataframe_from_result_table(
table: "Union[KustoResultTable, KustoStreamingResultTable]",
nullable_bools: bool = False,
converters_by_type: Optional[Converter] = None,
converters_by_column_name: Optional[Converter] = None,
) -> "pd.DataFrame":
f"""Converts Kusto tables into pandas DataFrame.
:param azure.kusto.data._models.KustoResultTable table: Table received from the response.
:param nullable_bools: When True, converts bools that are 'null' from kusto or 'None' from python to pandas.NA. This will be the default in the future.
:param converters_by_type: If given, converts specified types to corresponding types, else uses {default_dict()}. The dictionary maps from kusto
datatype (https://learn.microsoft.com/azure/data-explorer/kusto/query/scalar-data-types/) to a lambda that receives a column name and a dataframe and
returns the converted column or to a string type name.
:param converters_by_column_name: If given, converts specified columns to corresponding types, else uses converters_by_type. The dictionary maps from column
name to a lambda that receives a column name and a dataframe and returns the converted column.
:return: pandas DataFrame.
"""
import pandas as pd
if not table:
raise ValueError()
from azure.kusto.data._models import KustoResultTable, KustoStreamingResultTable
if not isinstance(table, KustoResultTable) and not isinstance(table, KustoStreamingResultTable):
raise TypeError("Expected KustoResultTable or KustoStreamingResultTable got {}".format(type(table).__name__))
columns = [col.column_name for col in table.columns]
frame = pd.DataFrame(table.raw_rows, columns=columns)
default = default_dict()
for col in table.columns:
column_name = col.column_name
column_type = col.column_type
if converters_by_column_name and column_name in converters_by_column_name:
converter = converters_by_column_name.get(column_name)
elif converters_by_type and column_type in converters_by_type:
converter = converters_by_type.get(column_type)
elif nullable_bools and column_type == "bool":
converter = lambda col, df: df[col].astype(pd.BooleanDtype())
else:
converter = default.get(column_type)
if converter is None:
raise Exception("Unexpected type " + column_type)
if isinstance(converter, str):
frame[column_name] = frame[column_name].astype(converter)
else:
frame[column_name] = converter(column_name, frame)
return frame
def get_string_tail_lower_case(val, length):
if length <= 0:
return ""
if length >= len(val):
return val.lower()
return val[len(val) - length :].lower()
# TODO When moving to pandas 2 only - change to the appropriate type
def parse_float(frame, col):
import numpy as np
import pandas as pd
frame[col] = frame[col].replace("NaN", np.nan).replace("Infinity", np.inf).replace("-Infinity", -np.inf)
frame[col] = pd.to_numeric(frame[col], errors="coerce").astype(pd.Float64Dtype())
return frame[col]
def parse_datetime(frame, col):
# Pandas before version 2 doesn't support the "format" arg
import pandas as pd
args = {}
if pd.__version__.startswith("2."):
args = {"format": "ISO8601", "utc": True}
else:
# if frame contains ".", replace "Z" with ".000Z"
# == False is not a mistake - that's the pandas way to do it
contains_dot = frame[col].str.contains(".")
frame.loc[contains_dot == False, col] = frame.loc[contains_dot == False, col].str.replace("Z", ".000Z")
frame[col] = pd.to_datetime(frame[col], errors="coerce", **args)
return frame[col]
def parse_timedelta(raw_value: Union[int, float, str]) -> "pd.Timedelta":
"""
Transform a raw python value to a pandas timedelta.
"""
import pandas as pd
if isinstance(raw_value, (int, float)):
# https://docs.microsoft.com/en-us/dotnet/api/system.datetime.ticks
# Kusto saves up to ticks, 1 tick == 100 nanoseconds
return pd.to_timedelta(raw_value * 100, unit="ns")
if isinstance(raw_value, str):
# The timespan format Kusto returns is 'd.hh:mm:ss.ssssss' or 'hh:mm:ss.ssssss' or 'hh:mm:ss'
# Pandas expects 'd days hh:mm:ss.ssssss' or 'hh:mm:ss.ssssss' or 'hh:mm:ss'
parts = raw_value.split(":")
if "." not in parts[0]:
return pd.to_timedelta(raw_value)
else:
formatted_value = raw_value.replace(".", " days ", 1)
return pd.to_timedelta(formatted_value)
|