File: planetary_computer_01_ingestion_management_async.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (443 lines) | stat: -rw-r--r-- 16,660 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------

"""
FILE: planetarycomputer_ingestion.py

DESCRIPTION:
    This sample demonstrates comprehensive ingestion management operations including:
    - Creating and managing ingestion sources (managed identity-based) - DEMONSTRATION ONLY
    - Creating or replacing sources with create_or_replace_source (idempotent)
    - Retrieving specific sources with get_source
    - Creating and updating ingestion definitions
    - Retrieving specific ingestions with get
    - Running ingestion jobs from public catalogs
    - Listing ingestion runs with list_runs
    - Monitoring ingestion status
    - Managing ingestion operations

USAGE:
    python planetarycomputer_ingestion.py

    Set the environment variable PLANETARYCOMPUTER_ENDPOINT with your endpoint URL.
    Set the environment variable PLANETARYCOMPUTER_COLLECTION_ID with your collection ID.

    Optional (for managed identity examples):
    Set the environment variable PLANETARYCOMPUTER_INGESTION_CONTAINER_URI with your container URI.
    Set the environment variable PLANETARYCOMPUTER_INGESTION_CATALOG_URL with your source catalog URL.
    Set the environment variable PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID with your managed identity object ID.

    Optional (for SAS token examples):
    Set the environment variable AZURE_INGESTION_SAS_CONTAINER_URI with your SAS container URI.
    Set the environment variable AZURE_INGESTION_SAS_TOKEN with your SAS token.
"""

import os
import asyncio
from azure.planetarycomputer.aio import PlanetaryComputerProClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import HttpResponseError
from azure.planetarycomputer.models import (
    ManagedIdentityConnection,
    ManagedIdentityIngestionSource,
    SharedAccessSignatureTokenConnection,
    SharedAccessSignatureTokenIngestionSource,
    IngestionDefinition,
    IngestionType,
)
import uuid

import logging

# Enable HTTP request/response logging
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
    logging.ERROR
)
logging.basicConfig(level=logging.INFO)


async def create_managed_identity_ingestion_sources(
    client: PlanetaryComputerProClient,
    container_uri: str,
    managed_identity_object_id: str,
):
    """Create managed identity-based ingestion source and return the source_id."""

    # Validate required parameters
    if not container_uri:
        raise ValueError(
            "PLANETARYCOMPUTER_INGESTION_CONTAINER_URI environment variable must be set. "
            "Example: https://yourstorageaccount.blob.core.windows.net/yourcontainer"
        )
    if not managed_identity_object_id:
        raise ValueError(
            "PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID environment variable must be set. "
            "This is the object ID of the managed identity with access to the storage account."
        )

    # Clean up existing sources
    async for source in client.ingestion.list_sources():
        await client.ingestion.delete_source(id=source.id)
        logging.info(f"Deleted existing source: {source.id}")

    # Create connection info with managed identity
    connection_info = ManagedIdentityConnection(
        container_uri=container_uri, object_id=managed_identity_object_id
    )

    # Create ingestion source with unique ID
    source_id = str(uuid.uuid4())
    ingestion_source = ManagedIdentityIngestionSource(
        id=source_id, connection_info=connection_info
    )
    created_source = await client.ingestion.create_source(body=ingestion_source)
    logging.info(f"Created managed identity ingestion source: {created_source.id}")

    # List managed identities
    logging.info("Listing available managed identities:")
    async for identity in client.ingestion.list_managed_identities():
        logging.info(f"  - Object ID: {identity.object_id}")
        logging.info(f"    Resource ID: {identity.resource_id}")

    return source_id


async def create_or_replace_source(
    client: PlanetaryComputerProClient,
    sas_container_uri: str,
    sas_token: str,
    source_id: str,
):
    """Demonstrate create_or_replace_source idempotent operation.

    This assumes the source already exists (created by create_sas_token_ingestion_source).
    It demonstrates that create_or_replace_source can be called multiple times with the same source_id
    to update/replace the source (in this case, updating the SAS token).
    """
    # Validate required parameters
    if not sas_container_uri:
        raise ValueError(
            "AZURE_INGESTION_SAS_CONTAINER_URI environment variable must be set for create_or_replace_source"
        )
    if not sas_token:
        raise ValueError(
            "AZURE_INGESTION_SAS_TOKEN environment variable must be set for create_or_replace_source"
        )

    # Create connection info with SAS token
    connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=sas_token
    )

    # Create ingestion source
    ingestion_source = SharedAccessSignatureTokenIngestionSource(
        id=source_id, connection_info=connection_info
    )

    # First call - replaces the existing source with original token
    logging.info(
        f"First call to create_or_replace_source with existing source ID: {source_id}"
    )
    first_result = await client.ingestion.replace_source(
        id=source_id, body=ingestion_source
    )
    logging.info(f"First call result: {first_result.id}")

    # Second call - replaces again with modified token (demonstrates update capability)
    updated_token = "sp=rl&st=2024-01-01T00:00:00Z&se=2024-12-31T23:59:59Z&sv=2023-01-03&sr=c&sig=UpdatedRandomSignature123456"

    updated_connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=updated_token
    )
    updated_ingestion_source = SharedAccessSignatureTokenIngestionSource(
        id=source_id, connection_info=updated_connection_info
    )

    logging.info("Second call to create_or_replace_source with updated SAS token")
    second_result = await client.ingestion.replace_source(
        id=source_id, body=updated_ingestion_source
    )
    logging.info(f"Second call result: {second_result.id}")

    return second_result.id


async def get_source_by_id(client: PlanetaryComputerProClient, source_id: str):
    """Retrieve a specific ingestion source by ID.

    This demonstrates using get_source to fetch a specific source directly
    instead of listing all sources.
    """
    logging.info(f"Retrieving ingestion source: {source_id}")

    try:
        source = await client.ingestion.get_source(id=source_id)
        logging.info(f"Successfully retrieved source: {source.id}")
        return source
    except Exception as e:
        logging.error(f"Failed to retrieve source {source_id}: {str(e)}")
        return None


async def create_github_public_ingestion(
    client: PlanetaryComputerProClient, collection_id: str, source_catalog_url: str
):
    """Create, update, and run ingestion from sample public catalog on GitHub."""

    # Delete all existing ingestions
    logging.info("Deleting all existing ingestions...")
    async for ingestion in client.ingestion.list(collection_id=collection_id):
        await client.ingestion.begin_delete(
            collection_id=collection_id, ingestion_id=ingestion.id, polling=True
        )
        logging.info(f"Deleted existing ingestion: {ingestion.id}")

    # Create ingestion definition
    ingestion_definition = IngestionDefinition(
        import_type=IngestionType.STATIC_CATALOG,
        display_name="Sample Ingestion",
        source_catalog_url=source_catalog_url,
        keep_original_assets=True,
        skip_existing_items=True,  # Skip items that already exist
    )

    # Create the ingestion
    logging.info("Creating ingestion for sample catalog...")
    ingestion_response = await client.ingestion.create(
        collection_id=collection_id, body=ingestion_definition
    )
    ingestion_id = ingestion_response.id
    logging.info(f"Created ingestion: {ingestion_id}")

    # Update the ingestion display name
    updated_definition = IngestionDefinition(
        import_type=IngestionType.STATIC_CATALOG,
        display_name="Sample Dataset Ingestion",
    )

    ingestion = await client.ingestion.update(
        collection_id=collection_id, ingestion_id=ingestion_id, body=updated_definition
    )
    logging.info(
        f"Updated ingestion display name to: {updated_definition.display_name}"
    )

    return ingestion_id


async def get_ingestion_by_id(
    client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str
):
    """Retrieve a specific ingestion by ID.

    This demonstrates using get to fetch a specific ingestion directly
    instead of listing all ingestions.
    """
    logging.info(
        f"Retrieving ingestion: {ingestion_id} from collection: {collection_id}"
    )

    try:
        ingestion = await client.ingestion.get(
            collection_id=collection_id, ingestion_id=ingestion_id
        )

        logging.info(f"Successfully retrieved ingestion: {ingestion.id}")
        logging.info(f"  Display name: {ingestion.display_name}")
        logging.info(f"  Import type: {ingestion.import_type}")
        if ingestion.source_catalog_url:
            logging.info(f"  Source catalog: {ingestion.source_catalog_url}")

        return ingestion
    except Exception as e:
        logging.error(f"Failed to retrieve ingestion {ingestion_id}: {str(e)}")
        return None


async def list_ingestion_runs(
    client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str
):
    """List all runs for a specific ingestion.

    This demonstrates using list_runs to get all execution runs for an ingestion,
    which is useful for monitoring ingestion history and troubleshooting.
    """
    logging.info(f"Listing runs for ingestion: {ingestion_id}")

    try:
        async for run in client.ingestion.list_runs(
            collection_id=collection_id, ingestion_id=ingestion_id
        ):
            operation = run.operation
            logging.info(f"  Run ID: {run.id}")
            logging.info(f"    Status: {operation.status}")
            logging.info(
                f"    Items - Total: {operation.total_items}, "
                f"Successful: {operation.total_successful_items}, "
                f"Failed: {operation.total_failed_items}, "
                f"Pending: {operation.total_pending_items}"
            )

            if operation.status_history:
                for status_item in operation.status_history:
                    if status_item.error_code:
                        logging.info(
                            f"    Error: {status_item.error_code} - {status_item.error_message}"
                        )
    except Exception as e:
        logging.error(f"Failed to list runs for ingestion {ingestion_id}: {str(e)}")


async def create_sas_token_ingestion_source(
    client: PlanetaryComputerProClient, sas_container_uri: str, sas_token: str
):
    """Create a SAS token ingestion source with example values."""

    # Validate required parameters
    if not sas_container_uri:
        raise ValueError(
            "AZURE_INGESTION_SAS_CONTAINER_URI environment variable must be set. "
            "Example: https://yourstorageaccount.blob.core.windows.net/yourcontainer"
        )
    if not sas_token:
        raise ValueError(
            "AZURE_INGESTION_SAS_TOKEN environment variable must be set. "
            "This is the SAS token for accessing the storage account."
        )

    logging.info("Creating SAS token ingestion source...")

    # Create connection info with SAS token (using fake/example values)
    sas_connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=sas_token
    )

    # Create SAS token ingestion source
    sas_source_id = str(uuid.uuid4())
    sas_ingestion_source = SharedAccessSignatureTokenIngestionSource(
        id=sas_source_id, connection_info=sas_connection_info
    )

    # Register the SAS token source
    created_sas_source = await client.ingestion.create_source(body=sas_ingestion_source)
    logging.info(f"Created SAS token ingestion source: {created_sas_source.id}")
    return created_sas_source.id


async def create_ingestion_run(
    client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str
):
    """Create an ingestion run."""

    # Create ingestion run
    run_response = await client.ingestion.create_run(
        collection_id=collection_id, ingestion_id=ingestion_id
    )
    logging.info(f"Created ingestion run: {run_response.id}")
    return run_response.id


async def manage_operations(client: "PlanetaryComputerProClient"):
    """List, get, and cancel ingestion operations."""

    # List operations and get the first one if available
    operation_id = None
    async for operation in client.ingestion.list_operations():
        operation_id = operation.id
        break

    if operation_id:
        # Get a specific operation
        operation = await client.ingestion.get_operation(operation_id)

        # Try to cancel the operation
        try:
            await client.ingestion.cancel_operation(operation.id)
        except HttpResponseError as e:
            logging.info(f"Failed to cancel operation {operation.id}: {e.message}")
            pass

    # Cancel all operations
    try:
        await client.ingestion.cancel_all_operations()
    except HttpResponseError as e:
        raise RuntimeError("Failed to cancel all operations") from e


async def main():
    # Get configuration from environment
    endpoint = os.environ.get("PLANETARYCOMPUTER_ENDPOINT")
    collection_id = os.environ.get("PLANETARYCOMPUTER_COLLECTION_ID")

    # Get optional ingestion-specific configuration (for examples)
    container_uri = os.environ.get("PLANETARYCOMPUTER_INGESTION_CONTAINER_URI")
    source_catalog_url = os.environ.get("PLANETARYCOMPUTER_INGESTION_CATALOG_URL")
    managed_identity_object_id = os.environ.get(
        "PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID"
    )
    sas_container_uri = os.environ.get("AZURE_INGESTION_SAS_CONTAINER_URI")
    sas_token = os.environ.get("AZURE_INGESTION_SAS_TOKEN")

    assert endpoint is not None
    assert collection_id is not None
    assert container_uri is not None
    assert source_catalog_url is not None
    assert managed_identity_object_id is not None
    assert sas_container_uri is not None
    assert sas_token is not None

    if not endpoint:
        raise ValueError("PLANETARYCOMPUTER_ENDPOINT environment variable must be set")

    logging.info(f"Connected to: {endpoint}")
    logging.info(f"Collection ID: {collection_id}\n")

    # Create client
    credential = DefaultAzureCredential()
    client = PlanetaryComputerProClient(
        endpoint=endpoint,
        credential=credential,
        logging_enable=False,  # Set to True for detailed HTTP logging
    )

    # Execute ingestion management workflow
    # 1. Create managed identity and SAS token ingestion sources
    await create_managed_identity_ingestion_sources(
        client, container_uri, managed_identity_object_id
    )
    sas_source_id = await create_sas_token_ingestion_source(
        client, sas_container_uri, sas_token
    )

    # 2. Demonstrate advanced source operations (idempotent)
    updated_source_id = await create_or_replace_source(
        client, sas_container_uri, sas_token, sas_source_id
    )
    await get_source_by_id(client, updated_source_id)

    # 3. Run actual ingestion hosted on GitHub
    public_ingestion_id = await create_github_public_ingestion(
        client, collection_id, source_catalog_url
    )

    # 4. Demonstrate advanced ingestion operations
    await get_ingestion_by_id(client, collection_id, public_ingestion_id)

    # 5. Create an ingestion run
    await create_ingestion_run(client, collection_id, public_ingestion_id)

    # 6. List all runs for the ingestion
    await list_ingestion_runs(client, collection_id, public_ingestion_id)

    # 7. Manage operations
    await manage_operations(client)

    await client.close()
    await credential.close()


if __name__ == "__main__":
    asyncio.run(main())