File: utils.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (292 lines) | stat: -rw-r--r-- 15,523 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import enum
import os
import uuid
from time import sleep
from tqdm import tqdm
from azure.kusto.data import KustoConnectionStringBuilder, ClientRequestProperties, KustoClient, DataFormat
from azure.kusto.data.exceptions import KustoClientError, KustoServiceError
from azure.kusto.ingest import IngestionProperties, BaseIngestClient, QueuedIngestClient, FileDescriptor, BlobDescriptor


class AuthenticationModeOptions(enum.Enum):
    """
    AuthenticationModeOptions - represents the different options to autenticate to the system
    """

    UserPrompt = ("UserPrompt",)
    ManagedIdentity = ("ManagedIdentity",)
    AppKey = ("AppKey",)
    AppCertificate = "AppCertificate"


class Utils:
    class Authentication:
        """
        Authentication module of Utils - in charge of authenticating the user with the system
        """

        @classmethod
        def generate_connection_string(cls, cluster_url: str, authentication_mode: AuthenticationModeOptions) -> KustoConnectionStringBuilder:
            """
            Generates Kusto Connection String based on given Authentication Mode.
            :param cluster_url: Cluster to connect to.
            :param authentication_mode: User Authentication Mode, Options: (UserPrompt|ManagedIdentity|AppKey|AppCertificate)
            :return: A connection string to be used when creating a Client
            """
            # Learn More: For additional information on how to authorize users and apps in Kusto,
            # see: https://docs.microsoft.com/azure/data-explorer/manage-database-permissions

            if authentication_mode == AuthenticationModeOptions.UserPrompt.name:
                # Prompt user for credentials
                return KustoConnectionStringBuilder.with_interactive_login(cluster_url)

            elif authentication_mode == AuthenticationModeOptions.ManagedIdentity.name:
                # Authenticate using a System-Assigned managed identity provided to an azure service, or using a User-Assigned managed identity.
                # For more information, see https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
                return cls.create_managed_identity_connection_string(cluster_url)

            elif authentication_mode == AuthenticationModeOptions.AppKey.name:
                # Learn More: For information about how to procure an AAD Application,
                # see: https://docs.microsoft.com/azure/data-explorer/provision-azure-ad-app
                # TODO (config - optional): App ID & tenant, and App Key to authenticate with
                return KustoConnectionStringBuilder.with_aad_application_key_authentication(
                    cluster_url, os.environ.get("APP_ID"), os.environ.get("APP_KEY"), os.environ.get("APP_TENANT")
                )

            elif authentication_mode == AuthenticationModeOptions.AppCertificate.name:
                return cls.create_application_certificate_connection_string(cluster_url)

            else:
                Utils.error_handler(f"Authentication mode '{authentication_mode}' is not supported")

        @classmethod
        def create_managed_identity_connection_string(cls, cluster_url: str) -> KustoConnectionStringBuilder:
            """
            Generates Kusto Connection String based on 'ManagedIdentity' Authentication Mode.
            :param cluster_url: Url of cluster to connect to
            :return: ManagedIdentity Kusto Connection String
            """
            # Connect using the system- or user-assigned managed identity (Azure service only)
            # TODO (config - optional): Managed identity client ID if you are using a user-assigned managed identity
            client_id = os.environ.get("MANAGED_IDENTITY_CLIENT_ID")
            return (
                KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster_url, client_id=client_id)
                if client_id
                else KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster_url)
            )

        @classmethod
        def create_application_certificate_connection_string(cls, cluster_url: str) -> KustoConnectionStringBuilder:
            """
            Generates Kusto Connection String based on 'AppCertificate' Authentication Mode.
            :param cluster_url: Url of cluster to connect to
            :return: AppCertificate Kusto Connection String
            """

            # TODO (config - optional): App ID & tenant, path to public certificate and path to private certificate pem file to authenticate with
            app_id = os.environ.get("APP_ID")
            app_tenant = os.environ.get("APP_TENANT")
            private_key_pem_file_path = os.environ.get("PRIVATE_KEY_PEM_FILE_PATH")
            cert_thumbprint = os.environ.get("CERT_THUMBPRINT")
            public_cert_file_path = os.environ.get("PUBLIC_CERT_FILE_PATH")  # Only used for "Subject Name and Issuer" auth
            public_certificate = None
            pem_certificate = None

            try:
                with open(private_key_pem_file_path, "r") as pem_file:
                    pem_certificate = pem_file.read()
            except Exception as ex:
                Utils.error_handler(f"Failed to load PEM file from {private_key_pem_file_path}", ex)

            if public_cert_file_path:
                try:
                    with open(public_cert_file_path, "r") as cert_file:
                        public_certificate = cert_file.read()
                except Exception as ex:
                    Utils.error_handler(f"Failed to load public certificate file from {public_cert_file_path}", ex)

                return KustoConnectionStringBuilder.with_aad_application_certificate_sni_authentication(
                    cluster_url, app_id, pem_certificate, public_certificate, cert_thumbprint, app_tenant
                )
            else:
                return KustoConnectionStringBuilder.with_aad_application_certificate_authentication(
                    cluster_url, app_id, pem_certificate, cert_thumbprint, app_tenant
                )

    class Queries:
        """
        Queries module of Utils - in charge of querying the data - either with management queries, or data queries
        """

        MGMT_PREFIX = "."

        @classmethod
        def create_client_request_properties(cls, scope: str, timeout: str = None) -> ClientRequestProperties:
            """
            Creates a fitting ClientRequestProperties object, to be used when executing control commands or queries.
            :param scope: Working scope
            :param timeout: Requests default timeout
            :return: ClientRequestProperties object
            """
            client_request_properties = ClientRequestProperties()
            client_request_properties.client_request_id = f"{scope};{str(uuid.uuid4())}"
            client_request_properties.application = "sample_app.py"

            # Tip: Though uncommon, you can alter the request default command timeout using the below command, e.g. to set the timeout to 10 minutes, use "10m"
            if timeout:
                client_request_properties.set_option(ClientRequestProperties.request_timeout_option_name, timeout)

            return client_request_properties

        @classmethod
        def execute_command(cls, kusto_client: KustoClient, database_name: str, command: str) -> bool:
            """
            Executes a Command using a premade client
            :param kusto_client: Premade client to run Commands. can be either an adminClient or queryClient
            :param database_name: DB name
            :param command: The Command to execute
            :return: True on success, false otherwise
            """
            try:
                if command.startswith(cls.MGMT_PREFIX):
                    client_request_properties = cls.create_client_request_properties("Python_SampleApp_ControlCommand")
                else:
                    client_request_properties = cls.create_client_request_properties("Python_SampleApp_Query")

                result = kusto_client.execute(database_name, command, client_request_properties)
                print(f"Response from executed command '{command}':")
                for row in result.primary_results[0]:
                    print(row.to_list())

                return True

            except KustoClientError as ex:
                Utils.error_handler(f"Client error while trying to execute command '{command}' on database '{database_name}'", ex)
            except KustoServiceError as ex:
                Utils.error_handler(f"Server error while trying to execute command '{command}' on database '{database_name}'", ex)
            except Exception as ex:
                Utils.error_handler(f"Unknown error while trying to execute command '{command}' on database '{database_name}'", ex)

            return False

    class Ingestion:
        """
        Ingestion module of Utils - in charge of ingesting the given data - based on the configuration file.
        """

        @classmethod
        def create_ingestion_properties(
            cls, database_name: str, table_name: str, data_format: DataFormat, mapping_name: str, ignore_first_record: bool
        ) -> IngestionProperties:
            """
            Creates a fitting KustoIngestionProperties object, to be used when executing ingestion commands.
            :param database_name: DB name
            :param table_name: Table name
            :param data_format: Given data format
            :param mapping_name: Desired mapping name
            :param ignore_first_record: Flag noting whether to ignore the first record in the table. only relevant to
            tabular textual formats (CSV and the likes). for more information
            please read: https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
            :return: IngestionProperties object
            """
            return IngestionProperties(
                database=f"{database_name}",
                table=f"{table_name}",
                ingestion_mapping_reference=mapping_name,
                # Learn More: For more information about supported data formats, see: https://docs.microsoft.com/azure/data-explorer/ingestion-supported-formats
                data_format=data_format,
                # TODO (config - optional): Setting the ingestion batching policy takes up to 5 minutes to take effect.
                #  We therefore set Flush-Immediately for the sake of the sample, but it generally shouldn't be used in practice.
                #  Comment out the line below after running the sample the first few times.
                flush_immediately=True,
                ignore_first_record=ignore_first_record,
            )

        @classmethod
        def ingest_from_file(
            cls,
            ingest_client: BaseIngestClient,
            database_name: str,
            table_name: str,
            file_path: str,
            data_format: DataFormat,
            ignore_first_record,
            mapping_name: str = None,
        ) -> None:
            """
            Ingest Data from a given file path.
            :param ingest_client: Client to ingest data
            :param database_name: DB name
            :param table_name: Table name
            :param file_path: File path
            :param data_format: Given data format
            :param ignore_first_record: Flag noting whether to ignore the first record in the table. only relevant to
            tabular textual formats (CSV and the likes). for more information
            please read: https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
            :param mapping_name: Desired mapping name
            """
            ingestion_properties = cls.create_ingestion_properties(database_name, table_name, data_format, mapping_name, ignore_first_record)

            # Tip 1: For optimal ingestion batching and performance,specify the uncompressed data size in the file descriptor instead of the default below of 0.
            # Otherwise, the service will determine the file size, requiring an additional s2s call, and may not be accurate for compressed files.
            # Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere
            file_descriptor = FileDescriptor(file_path, size=0, source_id=uuid.uuid4())
            ingest_client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_properties)

        @classmethod
        def ingest_from_blob(
            cls,
            ingest_client: QueuedIngestClient,
            database_name: str,
            table_name: str,
            blob_url: str,
            data_format: DataFormat,
            ignore_first_record: bool,
            mapping_name: str = None,
        ) -> None:
            """
            Ingest Data from a Blob.
            :param ingest_client: Client to ingest data
            :param database_name: DB name
            :param table_name: Table name
            :param blob_url: Blob Uri
            :param data_format: Given data format
            :param ignore_first_record: Flag noting whether to ignore the first record in the table. only relevant to
            tabular textual formats (CSV and the likes). for more information
            please read: https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
            :param mapping_name: Desired mapping name
            """
            ingestion_properties = cls.create_ingestion_properties(database_name, table_name, data_format, mapping_name, ignore_first_record)

            # Tip 1: For optimal ingestion batching and performance,specify the uncompressed data size in the file descriptor instead of the default below of 0.
            # Otherwise, the service will determine the file size, requiring an additional s2s call, and may not be accurate for compressed files.
            # Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere
            blob_descriptor = BlobDescriptor(blob_url, size=0, source_id=str(uuid.uuid4()))
            ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)

        @classmethod
        def wait_for_ingestion_to_complete(cls, wait_for_ingest_seconds: int) -> None:
            """
            Halts the program for WaitForIngestSeconds, allowing the queued ingestion process to complete.
            :param wait_for_ingest_seconds: Sleep time to allow for queued ingestion to complete.
            """
            print(
                f"Sleeping {wait_for_ingest_seconds} seconds for queued ingestion to complete. Note: This may take longer depending on the file size "
                f"and ingestion batching policy."
            )

            for x in tqdm(range(wait_for_ingest_seconds, 0, -1)):
                sleep(1)

    @staticmethod
    def error_handler(error: str, e: Exception = None) -> None:
        """
        Error handling function. Will mention the appropriate error message (and the exception itself if exists), and will quit the program.
        :param error: Appropriate error message received from calling function
        :param e: Thrown exception
        """
        print(f"Script failed with error: {error}")
        if e:
            print(f"Exception: {e}")

        exit(-1)