File: planetary_computer_01_ingestion_management.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (387 lines) | stat: -rw-r--r-- 16,146 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------

"""
FILE: planetarycomputer_ingestion.py

DESCRIPTION:
    This sample demonstrates comprehensive ingestion management operations including:
    - Creating and managing ingestion sources (managed identity-based) - DEMONSTRATION ONLY
    - Creating or replacing sources with create_or_replace_source (idempotent)
    - Retrieving specific sources with get_source
    - Creating and updating ingestion definitions
    - Retrieving specific ingestions with get
    - Running ingestion jobs from public catalogs
    - Listing ingestion runs with list_runs
    - Monitoring ingestion status
    - Managing ingestion operations

USAGE:
    python planetarycomputer_ingestion.py

    Set the environment variable PLANETARYCOMPUTER_ENDPOINT with your endpoint URL.
    Set the environment variable PLANETARYCOMPUTER_COLLECTION_ID with your collection ID.
    
    Optional (for managed identity examples):
    Set the environment variable PLANETARYCOMPUTER_INGESTION_CONTAINER_URI with your container URI.
    Set the environment variable PLANETARYCOMPUTER_INGESTION_CATALOG_URL with your source catalog URL.
    Set the environment variable PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID with your managed identity object ID.
    
    Optional (for SAS token examples):
    Set the environment variable AZURE_INGESTION_SAS_CONTAINER_URI with your SAS container URI.
    Set the environment variable AZURE_INGESTION_SAS_TOKEN with your SAS token.
"""

import os
from azure.planetarycomputer import PlanetaryComputerProClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import HttpResponseError
from azure.planetarycomputer.models import (
    ManagedIdentityConnection,
    ManagedIdentityIngestionSource,
    SharedAccessSignatureTokenConnection,
    SharedAccessSignatureTokenIngestionSource,
    IngestionDefinition,
    IngestionType,
)
import uuid

import logging

# Enable HTTP request/response logging
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.ERROR)
logging.basicConfig(level=logging.INFO)


def create_managed_identity_ingestion_sources(
    client: PlanetaryComputerProClient, container_uri: str, managed_identity_object_id: str
):
    """Create managed identity-based ingestion source and return the source_id."""

    # Validate required parameters
    if not container_uri:
        raise ValueError(
            "PLANETARYCOMPUTER_INGESTION_CONTAINER_URI environment variable must be set. "
            "Example: https://yourstorageaccount.blob.core.windows.net/yourcontainer"
        )
    if not managed_identity_object_id:
        raise ValueError(
            "PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID environment variable must be set. "
            "This is the object ID of the managed identity with access to the storage account."
        )

    # Clean up existing sources
    existing_sources = list(client.ingestion.list_sources())
    for source in existing_sources:
        client.ingestion.delete_source(id=source.id)
        logging.info(f"Deleted existing source: {source.id}")

    # Create connection info with managed identity
    connection_info = ManagedIdentityConnection(container_uri=container_uri, object_id=managed_identity_object_id)

    # Create ingestion source with unique ID
    source_id = str(uuid.uuid4())
    ingestion_source = ManagedIdentityIngestionSource(id=source_id, connection_info=connection_info)
    created_source = client.ingestion.create_source(body=ingestion_source)
    logging.info(f"Created managed identity ingestion source: {created_source.id}")

    # List managed identities
    logging.info("Listing available managed identities:")
    managed_identities = list(client.ingestion.list_managed_identities())
    for identity in managed_identities:
        logging.info(f"  - Object ID: {identity.object_id}")
        logging.info(f"    Resource ID: {identity.resource_id}")

    return source_id


def create_or_replace_source(
    client: PlanetaryComputerProClient, sas_container_uri: str, sas_token: str, source_id: str
):
    """Demonstrate create_or_replace_source idempotent operation.

    This assumes the source already exists (created by create_sas_token_ingestion_source).
    It demonstrates that create_or_replace_source can be called multiple times with the same source_id
    to update/replace the source (in this case, updating the SAS token).
    """
    # Validate required parameters
    if not sas_container_uri:
        raise ValueError(
            "AZURE_INGESTION_SAS_CONTAINER_URI environment variable must be set for create_or_replace_source"
        )
    if not sas_token:
        raise ValueError("AZURE_INGESTION_SAS_TOKEN environment variable must be set for create_or_replace_source")

    # Create connection info with SAS token
    connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=sas_token
    )

    # Create ingestion source
    ingestion_source = SharedAccessSignatureTokenIngestionSource(id=source_id, connection_info=connection_info)

    # First call - replaces the existing source with original token
    logging.info(f"First call to create_or_replace_source with existing source ID: {source_id}")
    first_result = client.ingestion.replace_source(id=source_id, body=ingestion_source)
    logging.info(f"First call result: {first_result.id}")

    # Second call - replaces again with modified token (demonstrates update capability)
    updated_token = "sp=rl&st=2024-01-01T00:00:00Z&se=2024-12-31T23:59:59Z&sv=2023-01-03&sr=c&sig=UpdatedRandomSignature123456"

    updated_connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=updated_token
    )
    updated_ingestion_source = SharedAccessSignatureTokenIngestionSource(
        id=source_id, connection_info=updated_connection_info
    )

    logging.info("Second call to create_or_replace_source with updated SAS token")
    second_result = client.ingestion.replace_source(id=source_id, body=updated_ingestion_source)
    logging.info(f"Second call result: {second_result.id}")

    return second_result.id


def get_source_by_id(client: PlanetaryComputerProClient, source_id: str):
    """Retrieve a specific ingestion source by ID.

    This demonstrates using get_source to fetch a specific source directly
    instead of listing all sources.
    """
    logging.info(f"Retrieving ingestion source: {source_id}")

    try:
        source = client.ingestion.get_source(id=source_id)
        logging.info(f"Successfully retrieved source: {source.id}")
        return source
    except Exception as e:
        logging.error(f"Failed to retrieve source {source_id}: {str(e)}")
        return None


def create_github_public_ingestion(client: PlanetaryComputerProClient, collection_id: str, source_catalog_url: str):
    """Create, update, and run ingestion from sample public catalog on GitHub."""

    # Delete all existing ingestions
    logging.info("Deleting all existing ingestions...")
    existing_ingestions = list(client.ingestion.list(collection_id=collection_id))
    for ingestion in existing_ingestions:
        client.ingestion.begin_delete(collection_id=collection_id, ingestion_id=ingestion.id, polling=True)
        logging.info(f"Deleted existing ingestion: {ingestion.id}")

    # Create ingestion definition
    ingestion_definition = IngestionDefinition(
        import_type=IngestionType.STATIC_CATALOG,
        display_name="Sample Ingestion",
        source_catalog_url=source_catalog_url,
        keep_original_assets=True,
        skip_existing_items=True,  # Skip items that already exist
    )
    # Create the ingestion
    logging.info("Creating ingestion for sample catalog...")
    ingestion_response = client.ingestion.create(collection_id=collection_id, body=ingestion_definition)
    ingestion_id = ingestion_response.id
    logging.info(f"Created ingestion: {ingestion_id}")

    # Update the ingestion display name
    updated_definition = IngestionDefinition(
        import_type=IngestionType.STATIC_CATALOG,
        display_name="Sample Dataset Ingestion",
    )

    ingestion = client.ingestion.update(collection_id=collection_id, ingestion_id=ingestion_id, body=updated_definition)
    logging.info(f"Updated ingestion display name to: {updated_definition.display_name}")

    return ingestion_id


def get_ingestion_by_id(client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str):
    """Retrieve a specific ingestion by ID.

    This demonstrates using get to fetch a specific ingestion directly
    instead of listing all ingestions.
    """
    logging.info(f"Retrieving ingestion: {ingestion_id} from collection: {collection_id}")

    try:
        ingestion = client.ingestion.get(collection_id=collection_id, ingestion_id=ingestion_id)

        logging.info(f"Successfully retrieved ingestion: {ingestion.id}")
        logging.info(f"  Display name: {ingestion.display_name}")
        logging.info(f"  Import type: {ingestion.import_type}")
        if ingestion.source_catalog_url:
            logging.info(f"  Source catalog: {ingestion.source_catalog_url}")

        return ingestion
    except Exception as e:
        logging.error(f"Failed to retrieve ingestion {ingestion_id}: {str(e)}")
        return None


def list_ingestion_runs(client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str):
    """List all runs for a specific ingestion.

    This demonstrates using list_runs to get all execution runs for an ingestion,
    which is useful for monitoring ingestion history and troubleshooting.
    """
    logging.info(f"Listing runs for ingestion: {ingestion_id}")

    try:
        runs = list(client.ingestion.list_runs(collection_id=collection_id, ingestion_id=ingestion_id))

        logging.info(f"Found {len(runs)} run(s) for ingestion {ingestion_id}")

        for run in runs:
            operation = run.operation
            logging.info(f"  Run ID: {run.id}")
            logging.info(f"    Status: {operation.status}")
            logging.info(
                f"    Items - Total: {operation.total_items}, "
                f"Successful: {operation.total_successful_items}, "
                f"Failed: {operation.total_failed_items}, "
                f"Pending: {operation.total_pending_items}"
            )

            if operation.status_history:
                for status_item in operation.status_history:
                    if status_item.error_code:
                        logging.info(f"    Error: {status_item.error_code} - {status_item.error_message}")

        return runs
    except Exception as e:
        logging.error(f"Failed to list runs for ingestion {ingestion_id}: {str(e)}")
        return []


def create_sas_token_ingestion_source(client: PlanetaryComputerProClient, sas_container_uri: str, sas_token: str):
    """Create a SAS token ingestion source with example values."""

    # Validate required parameters
    if not sas_container_uri:
        raise ValueError(
            "AZURE_INGESTION_SAS_CONTAINER_URI environment variable must be set. "
            "Example: https://yourstorageaccount.blob.core.windows.net/yourcontainer"
        )
    if not sas_token:
        raise ValueError(
            "AZURE_INGESTION_SAS_TOKEN environment variable must be set. "
            "This is the SAS token for accessing the storage account."
        )

    logging.info("Creating SAS token ingestion source...")

    # Create connection info with SAS token (using fake/example values)
    sas_connection_info = SharedAccessSignatureTokenConnection(
        container_uri=sas_container_uri, shared_access_signature_token=sas_token
    )

    # Create SAS token ingestion source
    sas_source_id = str(uuid.uuid4())
    sas_ingestion_source = SharedAccessSignatureTokenIngestionSource(
        id=sas_source_id, connection_info=sas_connection_info
    )

    # Register the SAS token source
    created_sas_source = client.ingestion.create_source(body=sas_ingestion_source)
    logging.info(f"Created SAS token ingestion source: {created_sas_source.id}")
    return created_sas_source.id


def create_ingestion_run(client: PlanetaryComputerProClient, collection_id: str, ingestion_id: str):
    """Create an ingestion run."""

    # Create ingestion run
    run_response = client.ingestion.create_run(collection_id=collection_id, ingestion_id=ingestion_id)
    logging.info(f"Created ingestion run: {run_response.id}")
    return run_response.id


def manage_operations(client: PlanetaryComputerProClient):
    """List, get, and cancel ingestion operations."""

    # List operations
    operations = list(client.ingestion.list_operations())

    if operations:
        # Get a specific operation
        operation_id = operations[0].id
        operation = client.ingestion.get_operation(operation_id)

        # Try to cancel the operation
        try:
            client.ingestion.cancel_operation(operation.id)
        except HttpResponseError as e:
            logging.info(f"Failed to cancel operation {operation.id}: {e.message}")
            pass

    # Cancel all operations
    try:
        client.ingestion.cancel_all_operations()
    except HttpResponseError as e:
        raise RuntimeError("Failed to cancel all operations") from e


def main():
    # Get configuration from environment
    endpoint = os.environ.get("PLANETARYCOMPUTER_ENDPOINT")
    collection_id = os.environ.get("PLANETARYCOMPUTER_COLLECTION_ID")

    # Get optional ingestion-specific configuration (for examples)
    container_uri = os.environ.get("PLANETARYCOMPUTER_INGESTION_CONTAINER_URI")
    source_catalog_url = os.environ.get("PLANETARYCOMPUTER_INGESTION_CATALOG_URL")
    managed_identity_object_id = os.environ.get("PLANETARYCOMPUTER_MANAGED_IDENTITY_OBJECT_ID")
    sas_container_uri = os.environ.get("AZURE_INGESTION_SAS_CONTAINER_URI")
    sas_token = os.environ.get("AZURE_INGESTION_SAS_TOKEN")

    assert endpoint is not None
    assert collection_id is not None
    assert container_uri is not None
    assert source_catalog_url is not None
    assert managed_identity_object_id is not None
    assert sas_container_uri is not None
    assert sas_token is not None

    if not endpoint:
        raise ValueError("PLANETARYCOMPUTER_ENDPOINT environment variable must be set")

    logging.info(f"Connected to: {endpoint}")
    logging.info(f"Collection ID: {collection_id}\n")

    # Create client
    credential = DefaultAzureCredential()
    client = PlanetaryComputerProClient(
        endpoint=endpoint,
        credential=credential,
        logging_enable=False,  # Set to True for detailed HTTP logging
    )

    # Execute ingestion management workflow
    # 1. Create managed identity and SAS token ingestion sources
    create_managed_identity_ingestion_sources(client, container_uri, managed_identity_object_id)
    sas_source_id = create_sas_token_ingestion_source(client, sas_container_uri, sas_token)

    # 2. Demonstrate advanced source operations (idempotent)
    updated_source_id = create_or_replace_source(client, sas_container_uri, sas_token, sas_source_id)
    get_source_by_id(client, updated_source_id)

    # 3. Run actual ingestion hosted on GitHub
    public_ingestion_id = create_github_public_ingestion(client, collection_id, source_catalog_url)

    # 4. Demonstrate advanced ingestion operations
    get_ingestion_by_id(client, collection_id, public_ingestion_id)

    # 5. Create an ingestion run
    create_ingestion_run(client, collection_id, public_ingestion_id)

    # 6. List all runs for the ingestion
    list_ingestion_runs(client, collection_id, public_ingestion_id)

    # 7. Manage operations
    manage_operations(client)


if __name__ == "__main__":
    main()