1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import io
from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import (
BlobDescriptor,
FileDescriptor,
IngestionProperties,
IngestionStatus,
KustoStreamingIngestClient,
ManagedStreamingIngestClient,
QueuedIngestClient,
StreamDescriptor,
)
##################################################################
## AUTH ##
##################################################################
cluster = "https://ingest-{cluster_name}.kusto.windows.net/"
# In case you want to authenticate with AAD application.
client_id = "<insert here your AAD application id>"
client_secret = "<insert here your AAD application key>"
# read more at https://docs.microsoft.com/en-us/onedrive/find-your-office-365-tenant-id
authority_id = "<insert here your tenant id>"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster, client_id, client_secret, authority_id)
# In case you want to authenticate with AAD application certificate.
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
PEM = pem_file.read()
thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_authentication(cluster, client_id, PEM, thumbprint, authority_id)
# In case you want to authenticate with AAD application certificate Subject Name & Issuer
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
PEM = pem_file.read()
filename = "path to a public certificate"
with open(filename, "r") as cert_file:
public_certificate = cert_file.read()
thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_sni_authentication(cluster, client_id, PEM, public_certificate, thumbprint, authority_id)
# In case you want to authenticate with a System Assigned Managed Service Identity (MSI)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster)
# In case you want to authenticate with a User Assigned Managed Service Identity (MSI)
user_assigned_client_id = "the AAD identity client id"
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster, client_id=user_assigned_client_id)
# In case you want to authenticate with AAD username and password
username = "<username>"
password = "<password>"
kcsb = KustoConnectionStringBuilder.with_aad_user_password_authentication(cluster, username, password, authority_id)
# In case you want to authenticate with AAD device code.
# Please note that if you choose this option, you'll need to autenticate for every new instance that is initialized.
# It is highly recommended to create one instance and use it for all of your queries.
kcsb = KustoConnectionStringBuilder.with_aad_device_authentication(cluster)
# The authentication method will be taken from the chosen KustoConnectionStringBuilder.
client = QueuedIngestClient(kcsb)
# there are more options for authenticating - see azure-kusto-data samples
##################################################################
## INGESTION ##
##################################################################
# there are a lot of useful properties, make sure to go over docs and check them out
ingestion_props = IngestionProperties(
database="{database_name}",
table="{table_name}",
data_format=DataFormat.CSV,
# in case status update for success are also required (remember to import ReportLevel from azure.kusto.ingest)
# report_level=ReportLevel.FailuresAndSuccesses,
# in case a mapping is required (remember to import IngestionMappingKind from azure.kusto.data.data_format)
# ingestion_mapping_reference="{json_mapping_that_already_exists_on_table}",
# ingestion_mapping_kind= IngestionMappingKind.JSON,
)
# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333) # 3333 is the raw size of the data in bytes.
client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_props)
result = client.ingest_from_file("{filename}.csv", ingestion_properties=ingestion_props)
# Inspect the result for useful information, such as source_id and blob_url
print(repr(result))
# ingest from blob
blob_descriptor = BlobDescriptor(
"https://{path_to_blob}.csv.gz?sp=rl&st=2020-05-20T13:38:37Z&se=2020-05-21T13:38:37Z&sv=2019-10-10&sr=c&sig=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
10,
) # 10 is the raw size of the data in bytes.
client.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_props)
# ingest from dataframe
import pandas
fields = ["id", "name", "value"]
rows = [[1, "abc", 15.3], [2, "cde", 99.9]]
df = pandas.DataFrame(data=rows, columns=fields)
client.ingest_from_dataframe(df, ingestion_properties=ingestion_props)
# ingest a whole folder.
import os
path = "folder/path"
[client.ingest_from_file(f, ingestion_properties=ingestion_props) for f in os.listdir(path)]
##################################################################
## INGESTION STATUS ##
##################################################################
# if status updates are required, something like this can be done
import pprint
import time
from azure.kusto.ingest.status import KustoIngestStatusQueues
qs = KustoIngestStatusQueues(client)
MAX_BACKOFF = 180
backoff = 1
while True:
################### NOTICE ####################
# in order to get success status updates,
# make sure ingestion properties set the
# reportLevel=ReportLevel.FailuresAndSuccesses.
if qs.success.is_empty() and qs.failure.is_empty():
time.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
print("No new messages. backing off for {} seconds".format(backoff))
continue
backoff = 1
success_messages = qs.success.pop(10)
failure_messages = qs.failure.pop(10)
pprint.pprint("SUCCESS : {}".format(success_messages))
pprint.pprint("FAILURE : {}".format(failure_messages))
# you can of course separate them and dump them into a file for follow up investigations
with open("successes.log", "w+") as sf:
for sm in success_messages:
sf.write(str(sm))
with open("failures.log", "w+") as ff:
for fm in failure_messages:
ff.write(str(fm))
##################################################################
## STREAMING INGEST ##
##################################################################
# Authenticate against this cluster endpoint as shows in the Auth section
cluster = "https://{cluster_name}.kusto.windows.net"
client = KustoStreamingIngestClient(kcsb)
ingestion_properties = IngestionProperties(database="{database_name}", table="{table_name}", data_format=DataFormat.CSV)
# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333) # 3333 is the raw size of the data in bytes.
client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_properties)
client.ingest_from_file("{filename}.csv", ingestion_properties=ingestion_properties)
# ingest from dataframe
import pandas
fields = ["id", "name", "value"]
rows = [[1, "abc", 15.3], [2, "cde", 99.9]]
df = pandas.DataFrame(data=rows, columns=fields)
client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties)
# ingest from stream
byte_sequence = b"56,56,56"
bytes_stream = io.BytesIO(byte_sequence)
client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
stream_descriptor = StreamDescriptor(bytes_stream)
client.ingest_from_stream(stream_descriptor, ingestion_properties=ingestion_properties)
str_sequence = "57,57,57"
str_stream = io.StringIO(str_sequence)
client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties)
##################################################################
## NANAGED STREAMING INGEST ##
##################################################################
# Managed streaming ingest client will try to use streaming ingestion for performance, but will fall back to queued ingestion if unable.
dm_cluster = "https://ingest-{cluster_name}.kusto.windows.net"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(dm_cluster, client_id, client_secret, authority_id)
# Create it from a dm connection string
client = ManagedStreamingIngestClient.from_dm_kcsb(kcsb)
# or an engine connection string, like a streaming ingestion client with `from_engine_kcsb`
# or provide both: `ManagedStreamingIngestClient(engine_kcsb, dm_kcsb)`
# use client as you would a streaming or queued ingestion client
byte_sequence = b"56,56,56"
bytes_stream = io.BytesIO(byte_sequence)
client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
ingestion_properties = IngestionProperties(database="{database_name}", table="{table_name}", data_format=DataFormat.CSV)
# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333) # 3333 is the raw size of the data in bytes.
result = client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_properties)
# inspect the result to see what type of ingestion was preformed
if result.status == IngestionStatus.QUEUED:
# fell back to queued ingestion
pass
# Managed streaming ingest client will fall back to queued if:
# - Multiple transient errors were encountered when trying to do streaming ingestion
# - The ingestion is too large for streaming ingestion (over 4MB)
# - The ingestion is directly for a blob
|