1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
"""A simple example how to use KustoClient."""
from datetime import timedelta
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
######################################################
## AUTH ##
######################################################
# Note that the 'help' cluster only allows interactive
# access by AAD users (and *not* AAD applications)
cluster = "https://help.kusto.windows.net"
# In case you want to authenticate with AAD application.
client_id = "<insert here your AAD application id>"
client_secret = "<insert here your AAD application key>"
# read more at https://docs.microsoft.com/en-us/onedrive/find-your-office-365-tenant-id
authority_id = "<insert here your tenant id>"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster, client_id, client_secret, authority_id)
# In case you want to authenticate with AAD application certificate.
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
PEM = pem_file.read()
thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_authentication(cluster, client_id, PEM, thumbprint, authority_id)
# In case you want to authenticate with AAD application certificate Subject Name & Issuer
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
PEM = pem_file.read()
filename = "path to a public certificate"
with open(filename, "r") as cert_file:
public_certificate = cert_file.read()
thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_sni_authentication(cluster, client_id, PEM, public_certificate, thumbprint, authority_id)
# No authentication - for rare cases where the cluster is defined to work without any need for auth. usually reserved for internal use.
kcsb = KustoConnectionStringBuilder()
# Managed Identity - automatically injected into your machine by azure when running on an azure service.
# It's the best way for any code that does such - it's automatic, and requires no saving of secrets.
# In case you want to authenticate with a System Assigned Managed Service Identity (MSI)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster)
# In case you want to authenticate with a User Assigned Managed Service Identity (MSI)
user_assigned_client_id = "the AAD identity client id"
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster, client_id=user_assigned_client_id)
# In case you want to authenticate with Azure CLI.
# Users are required to be in a logged in state in az-cli, for this authentication method to succeed. Run `az login` to login to azure cli.
kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster)
# In case you want to authenticate with AAD username and password
username = "<username>"
password = "<password>"
kcsb = KustoConnectionStringBuilder.with_aad_user_password_authentication(cluster, username, password, authority_id)
# In case you want to authenticate with AAD device code.
# Please note that if you choose this option, you'll need to authenticate for every new instance that is initialized.
# It is highly recommended to create one instance and use it for all of your queries.
kcsb = KustoConnectionStringBuilder.with_aad_device_authentication(cluster)
# The authentication method will be taken from the chosen KustoConnectionStringBuilder.
client = KustoClient(kcsb)
# Make sure to close the client when you're done with it.
# Either by using a context manager:
with KustoClient(kcsb) as client2:
pass # will be closed automatically at the end of the block
# Or by calling the close method explicitly:
client3 = KustoClient(kcsb)
client3.close()
######################################################
## QUERY ##
######################################################
# once authenticated, usage is as following
db = "Samples"
query = "StormEvents | take 10"
response = client.execute(db, query)
# iterating over rows is possible
for row in response.primary_results[0]:
# printing specific columns by index
print("value at 0 {}".format(row[0]))
print("\n")
# printing specific columns by name
print("EventType:{}".format(row["EventType"]))
# tables are serializeable, so:
with open("results.json", "w+") as f:
f.write(str(response.primary_results[0]))
# we also support dataframes:
dataframe = dataframe_from_result_table(response.primary_results[0])
print(dataframe)
# Streaming Query - rather than reading everything ahead, iterate through results as they come
multiple_tables = 'StormEvents | where EventType == "Heavy Rain" | take 10; StormEvents | where EventType == "Tornado" | take 10'
results = client.execute_streaming_query("DB", multiple_tables)
tables_iter = results.iter_primary_results()
first_table = next(tables_iter)
# Will block until each row arrives
for row in first_table:
# printing specific columns by index
print("value at 0 {}".format(row[0]))
print("\n")
# printing specific columns by name
print("EventType:{}".format(row["EventType"]))
# next(tables_iter) - throws, we can't read the next table until we exhausted this one
# Read next table
second_table = next(tables_iter)
print(next(second_table))
# You can always access the table's properties, even after it's exhausted
print(first_table.columns, first_table.table_kind, first_table.table_name)
print(second_table.columns, second_table.table_kind, second_table.table_name)
# Will skip forward, but the previous table will be exhausted
results.set_skip_incomplete_tables(True)
next(results, None) # Tables are finished
# When we finish all the tables we get None
print(results.tables) # Access all tables, not just the primary results
##################
### EXCEPTIONS ###
##################
# Query is too big to be executed
query = "StormEvents"
try:
response = client.execute(db, query)
except KustoServiceError as error:
print("2. Error:", error)
print("2. Is semantic error:", error.is_semantic_error())
print("2. Has partial results:", error.has_partial_results())
print("2. Result size:", len(error.get_partial_results()))
properties = ClientRequestProperties()
properties.set_option(properties.results_defer_partial_query_failures_option_name, True)
properties.set_option(properties.request_timeout_option_name, timedelta(seconds=8 * 60))
response = client.execute(db, query, properties=properties)
print("3. Response error count: ", response.errors_count)
print("3. Exceptions:", response.get_exceptions())
print("3. Result size:", len(response.primary_results))
# Query has semantic error
query = "StormEvents | where foo = bar"
try:
response = client.execute(db, query)
except KustoServiceError as error:
print("4. Error:", error)
print("4. Is semantic error:", error.is_semantic_error())
print("4. Has partial results:", error.has_partial_results())
client = KustoClient("https://kustolab.kusto.windows.net")
response = client.execute("ML", ".show version")
query = """
let max_t = datetime(2016-09-03);
service_traffic
| make-series num=count() on TimeStamp in range(max_t-5d, max_t, 1h) by OsVer
"""
response = client.execute_query("ML", query).primary_results[0]
|