File: sample.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (236 lines) | stat: -rw-r--r-- 9,827 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import io

from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import (
    BlobDescriptor,
    FileDescriptor,
    IngestionProperties,
    IngestionStatus,
    KustoStreamingIngestClient,
    ManagedStreamingIngestClient,
    QueuedIngestClient,
    StreamDescriptor,
)

##################################################################
##                              AUTH                            ##
##################################################################

cluster = "https://ingest-{cluster_name}.kusto.windows.net/"

# In case you want to authenticate with AAD application.
client_id = "<insert here your AAD application id>"
client_secret = "<insert here your AAD application key>"

# read more at https://docs.microsoft.com/en-us/onedrive/find-your-office-365-tenant-id
authority_id = "<insert here your tenant id>"

kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster, client_id, client_secret, authority_id)

# In case you want to authenticate with AAD application certificate.
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
    PEM = pem_file.read()

thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_authentication(cluster, client_id, PEM, thumbprint, authority_id)

# In case you want to authenticate with AAD application certificate Subject Name & Issuer
filename = "path to a PEM certificate"
with open(filename, "r") as pem_file:
    PEM = pem_file.read()

filename = "path to a public certificate"
with open(filename, "r") as cert_file:
    public_certificate = cert_file.read()

thumbprint = "certificate's thumbprint"
kcsb = KustoConnectionStringBuilder.with_aad_application_certificate_sni_authentication(cluster, client_id, PEM, public_certificate, thumbprint, authority_id)

# In case you want to authenticate with a System Assigned Managed Service Identity (MSI)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster)

# In case you want to authenticate with a User Assigned Managed Service Identity (MSI)
user_assigned_client_id = "the AAD identity client id"
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster, client_id=user_assigned_client_id)

# In case you want to authenticate with AAD username and password
username = "<username>"
password = "<password>"
kcsb = KustoConnectionStringBuilder.with_aad_user_password_authentication(cluster, username, password, authority_id)

# In case you want to authenticate with AAD device code.
# Please note that if you choose this option, you'll need to autenticate for every new instance that is initialized.
# It is highly recommended to create one instance and use it for all of your queries.
kcsb = KustoConnectionStringBuilder.with_aad_device_authentication(cluster)

# The authentication method will be taken from the chosen KustoConnectionStringBuilder.
client = QueuedIngestClient(kcsb)

# there are more options for authenticating - see azure-kusto-data samples

##################################################################
##                        INGESTION                             ##
##################################################################

# there are a lot of useful properties, make sure to go over docs and check them out
ingestion_props = IngestionProperties(
    database="{database_name}",
    table="{table_name}",
    data_format=DataFormat.CSV,
    # in case status update for success are also required (remember to import ReportLevel from azure.kusto.ingest)
    # report_level=ReportLevel.FailuresAndSuccesses,
    # in case a mapping is required (remember to import IngestionMappingKind from azure.kusto.data.data_format)
    # ingestion_mapping_reference="{json_mapping_that_already_exists_on_table}",
    # ingestion_mapping_kind= IngestionMappingKind.JSON,
)

# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333)  # 3333 is the raw size of the data in bytes.
client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_props)
result = client.ingest_from_file("{filename}.csv", ingestion_properties=ingestion_props)

# Inspect the result for useful information, such as source_id and blob_url
print(repr(result))

# ingest from blob
blob_descriptor = BlobDescriptor(
    "https://{path_to_blob}.csv.gz?sp=rl&st=2020-05-20T13:38:37Z&se=2020-05-21T13:38:37Z&sv=2019-10-10&sr=c&sig=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    10,
)  # 10 is the raw size of the data in bytes.
client.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_props)

# ingest from dataframe
import pandas

fields = ["id", "name", "value"]
rows = [[1, "abc", 15.3], [2, "cde", 99.9]]

df = pandas.DataFrame(data=rows, columns=fields)

client.ingest_from_dataframe(df, ingestion_properties=ingestion_props)

# ingest a whole folder.
import os

path = "folder/path"
[client.ingest_from_file(f, ingestion_properties=ingestion_props) for f in os.listdir(path)]

##################################################################
##                        INGESTION STATUS                      ##
##################################################################

# if status updates are required, something like this can be done
import pprint
import time
from azure.kusto.ingest.status import KustoIngestStatusQueues

qs = KustoIngestStatusQueues(client)

MAX_BACKOFF = 180

backoff = 1
while True:
    ################### NOTICE ####################
    # in order to get success status updates,
    # make sure ingestion properties set the
    # reportLevel=ReportLevel.FailuresAndSuccesses.
    if qs.success.is_empty() and qs.failure.is_empty():
        time.sleep(backoff)
        backoff = min(backoff * 2, MAX_BACKOFF)
        print("No new messages. backing off for {} seconds".format(backoff))
        continue

    backoff = 1

    success_messages = qs.success.pop(10)
    failure_messages = qs.failure.pop(10)

    pprint.pprint("SUCCESS : {}".format(success_messages))
    pprint.pprint("FAILURE : {}".format(failure_messages))

    # you can of course separate them and dump them into a file for follow up investigations
    with open("successes.log", "w+") as sf:
        for sm in success_messages:
            sf.write(str(sm))

    with open("failures.log", "w+") as ff:
        for fm in failure_messages:
            ff.write(str(fm))

##################################################################
##                        STREAMING INGEST                      ##
##################################################################

# Authenticate against this cluster endpoint as shows in the Auth section
cluster = "https://{cluster_name}.kusto.windows.net"

client = KustoStreamingIngestClient(kcsb)

ingestion_properties = IngestionProperties(database="{database_name}", table="{table_name}", data_format=DataFormat.CSV)

# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333)  # 3333 is the raw size of the data in bytes.
client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_properties)
client.ingest_from_file("{filename}.csv", ingestion_properties=ingestion_properties)

# ingest from dataframe
import pandas

fields = ["id", "name", "value"]
rows = [[1, "abc", 15.3], [2, "cde", 99.9]]

df = pandas.DataFrame(data=rows, columns=fields)

client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties)

# ingest from stream
byte_sequence = b"56,56,56"
bytes_stream = io.BytesIO(byte_sequence)
client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)

stream_descriptor = StreamDescriptor(bytes_stream)
client.ingest_from_stream(stream_descriptor, ingestion_properties=ingestion_properties)

str_sequence = "57,57,57"
str_stream = io.StringIO(str_sequence)
client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties)

##################################################################
##                NANAGED STREAMING INGEST                      ##
##################################################################

# Managed streaming ingest client will try to use streaming ingestion for performance, but will fall back to queued ingestion if unable.
dm_cluster = "https://ingest-{cluster_name}.kusto.windows.net"

kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(dm_cluster, client_id, client_secret, authority_id)

# Create it from a dm connection string
client = ManagedStreamingIngestClient.from_dm_kcsb(kcsb)
# or an engine connection string, like a streaming ingestion client with `from_engine_kcsb`
# or provide both: `ManagedStreamingIngestClient(engine_kcsb, dm_kcsb)`

# use client as you would a streaming or queued ingestion client

byte_sequence = b"56,56,56"
bytes_stream = io.BytesIO(byte_sequence)
client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)

ingestion_properties = IngestionProperties(database="{database_name}", table="{table_name}", data_format=DataFormat.CSV)

# ingest from file
file_descriptor = FileDescriptor("{filename}.csv", 3333)  # 3333 is the raw size of the data in bytes.
result = client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_properties)

# inspect the result to see what type of ingestion was preformed
if result.status == IngestionStatus.QUEUED:
    # fell back to queued ingestion
    pass

# Managed streaming ingest client will fall back to queued if:
# - Multiple transient errors were encountered when trying to do streaming ingestion
# - The ingestion is too large for streaming ingestion (over 4MB)
# - The ingestion is directly for a blob