File: customize_long_running_operation.md

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (523 lines) | stat: -rw-r--r-- 24,001 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
## Long-running operation (LRO) customization

Operations that are started by an initial call, then need to be monitored for status until completion are often represented as long-running operations (LRO).
The `azure-core` library provides the [LROPoller][lro_poller] (and [AsyncLROPoller][async_lro_poller]) protocols
that expose methods to interact with the LRO such as waiting until the operation reaches a terminal state, checking its status, or
providing a callback to do work on the final result when it is ready. If the LRO follows the
[Azure REST API guidelines][rest_api_guidelines_lro],
it's likely that the generated client library code should _just work_.
In cases where the LRO diverges from the guidelines, you may need to customize your code to achieve the desired scenario.

There are 3 options to customize the logic for LROs.

1) [Polling strategy - OperationResourcePolling, LocationPolling, StatusCheckPolling](#polling-strategy---operationresourcepolling-locationpolling-statuscheckpolling)
    - You need to customize the polling strategy
2) [Polling method - LROBasePolling/AsyncLROBasePolling](#polling-method---lrobasepollingasynclrobasepolling)
    - You need to customize the polling loop
3) [Poller API - LROPoller/AsyncLROPoller](#poller-api---lropollerasynclropoller)
    - You need to customize the public interface of the poller

The "poller API" is what the user uses to interact with the LRO. Internally, the poller uses the "polling method" to run the polling
loop which makes calls to the status monitor, controls delay, and determines when the LRO has reached a terminal state.
The "polling method" uses a "polling strategy" to determine how to extract the status information from the responses
returned by the status monitoring.

### Polling strategy - OperationResourcePolling, LocationPolling, StatusCheckPolling

The `azure.core.polling` module provides three built-in strategies for polling -[OperationResourcePolling][operation_resource_polling],
[LocationPolling][location_polling], and [StatusCheckPolling][status_check_polling]. The type of polling needed will be
determined automatically using the response structure, unless otherwise specified by the client library developer. If
the LRO is determined not to fit either `OperationResourcePolling` or `LocationPolling`, `StatusCheckPolling` serves as
a fallback strategy which will not perform polling, but instead return a successful response for a 2xx status code.

If you need to customize the polling strategy, choose a polling algorithm that closely represents what you need to do
or create your own that inherits from [azure.core.polling.base_polling.LongRunningOperation][long_running_operation]
and implements the necessary methods.

For our example, let's say that `OperationResourcePolling` closely resembles what the service does, but we
need to account for a non-standard status.

#### Example: Raise exception for a non-standard status that is returned via the initial POST call - "ValidationFailed".

```python
from azure.core.polling.base_polling import OperationResourcePolling, _as_json
from azure.core.exceptions import HttpResponseError


class CustomOperationResourcePolling(OperationResourcePolling):
    """Implements a operation resource polling, typically from the Operation-Location header.
    Customized to raise an exception in the case of a "ValidationFailed" status returned
    from the service.
    """

    def get_status(self, pipeline_response: "PipelineResponseType") -> str:
        """This method is called on the response for each polling request
        and is used to extract and return the LRO status from that response.
        In the case that the operation has failed (i.e. a non-successful status),
        an exception should be raised. This will bring polling to an end and raise
        the failure to the listener.
        """
        status = super().get_status(pipeline_response)
        if status.lower() == "validationfailed":
            response = pipeline_response.http_response
            body = _as_json(response)
            raise HttpResponseError(response=response, error=body["error"])
        return status
```

You can then wrap a client method that was generated as an LRO, and pass the additional `polling` keyword argument. The `polling`
keyword argument takes an implementation of [azure.core.polling.PollingMethod][polling_method]
(e.g. [azure.core.polling.base_polling.LROBasePolling][lro_base_polling]) and allows for a custom strategy to be passed
in to the keyword argument `lro_algorithms`:

```python
from typing import AnyStr, MutableMapping, Any
from azure.core.polling import LROPoller
from azure.core.polling.base_polling import LROBasePolling
JSON = MutableMapping[str, Any]


class ServiceOperations:

    def begin_analyze(self, data: AnyStr, name: str, **kwargs) -> LROPoller[JSON]:
        return self._generated_client.begin_analyze(
            data,
            name,
            polling=LROBasePolling(
                lro_algorithms=[
                    CustomOperationResourcePolling()  # overrides other LRO strategies
                ]
            ),
            **kwargs
        )
```

This example uses the default polling method - `LROBasePolling` and just overrides the strategy used for polling.
If you need to control the polling loop, then see the next section.


### Polling method - LROBasePolling/AsyncLROBasePolling

Built-in methods for polling are included in `azure-core` as both sync / async variants - [LROBasePolling][lro_base_polling]
and [AsyncLROBasePolling][async_lro_base_polling]. The polling method runs the polling loop and performs GET requests
to the status monitor to check if a terminal state is reached. In between polls it inserts delay based on
1) the service sent `retry-after` header, or 2) the given `polling_interval` if no retry-after header is present.

You can also use [azure.core.polling.NoPolling][no_polling](or [AsyncNoPolling][async_no_polling]) which will not
initiate polling and simply return the deserialized initial response when called with `poller.result()`.

To use `NoPolling`, you can pass `polling=False` to an operation generated as an LRO:

```python
from typing import AnyStr, MutableMapping, Any
from azure.core.polling import LROPoller
JSON = MutableMapping[str, Any]


class ServiceOperations:

    def begin_analyze(self, data: AnyStr, name: str, **kwargs) -> LROPoller[JSON]:
        return self._generated_client.begin_analyze(
            data,
            name,
            polling=False,
            **kwargs
        )
```

To customize parts of the polling method, you can create a subclass which uses [LROBasePolling][lro_base_polling] and overrides necessary methods.
If significant customization is necessary, use [azure.core.polling.PollingMethod][polling_method]
(or [AsyncPollingMethod][async_polling_method])and implement all the necessary methods.

#### Example: Create an LRO method which will poll for when a file gets uploaded successfully (greatly simplified)

For this example, the customization necessary requires defining our own polling strategy and polling method.
First, we'll define a simple polling strategy that the polling method will use to get the status information from the response.

```python
from typing import Optional, MutableMapping, Any
from azure.core.polling.base_polling import LongRunningOperation, OperationFailed
from azure.core.pipeline import PipelineResponse
JSON = MutableMapping[str, Any]


class CustomPollingStrategy(LongRunningOperation):
    """CustomPollingStrategy which provides default logic
    for interpreting operation responses and status updates.
    """

    def can_poll(self, pipeline_response: PipelineResponse) -> bool:
        """Determine from the initial response that we can poll.
        In this example, we need a file_id present to proceed with polling.

        :param PipelineResponse pipeline_response: initial REST call response.
        """
        response = pipeline_response.http_response.json()
        if response.get("file_id", None) is None:
            return False
        return True

    def get_polling_url(self) -> str:
        """Return the polling URL. This is the URL for the status monitor
        and where the GET requests will be made during polling.

        For this example, we don't need to extract the URL
        from the initial response so it is not implemented.
        """
        raise NotImplementedError("The polling strategy does not need to extract a polling URL.")

    def set_initial_status(self, pipeline_response: PipelineResponse) -> str:
        """Process first response after initiating long running operation and set initial status.

        :param PipelineResponse pipeline_response: initial REST call response.
        """

        response = pipeline_response.http_response
        if response.status_code == 200:
            return "InProgress"
        raise OperationFailed("Operation failed or canceled")

    def get_status(self, response: JSON) -> str:
        """Return the status based on this response.

        Typically, this method extracts a status string from the
        response. In this example, we determine status based on whether our
        result is populated or not.
        """
        if response is None:
            return "InProgress"
        return "Succeeded"

    def get_final_get_url(self, pipeline_response: PipelineResponse) -> Optional[str]:
        """If a final GET is needed when the LRO is complete, returns the URL.

        :rtype: str
        """
        return None
```

Next, we'll define the custom polling method:


```python
import functools
import time
import base64
from typing import Any, Tuple, Callable, MutableMapping
from azure.core.pipeline import PipelineResponse
from azure.core.polling import PollingMethod
from azure.core.polling.base_polling import BadResponse
from azure.core.exceptions import ResourceNotFoundError
JSON = MutableMapping[str, Any]


class CustomPollingMethod(PollingMethod):
    def __init__(self, polling_interval: float = 30, **kwargs: Any) -> None:
        """Creates a custom polling method which polls until a file is uploaded.
        For our example, the operation is considered to have reached a terminal state once a successful GET
        is done on the file (e.g. no ResourceNotFoundError is raised).

        :param polling_interval: The amount of time to wait between polls. This fictitious service does not
            use retry-after so we will default to this value.
        :param kwargs: Any operation-specific keyword arguments that should be passed into the GET call.
        """
        self._polling_interval = polling_interval
        self._kwargs = kwargs

    def initialize(self, client: Any, initial_response: PipelineResponse, deserialization_callback: Callable) -> None:
        """Set the initial status of this LRO, verify that we can poll, and
        initialize anything necessary for polling.

        :param client: An instance of a client. In this example, the generated client.
        :param initial_response: In this example, the PipelineResponse returned from the initial call.
        :param deserialization_callback: A callable to transform the final result before returning to the end user.
        """

        # verify we have the information to poll
        if self._operation.can_poll(initial_response) is False:
            raise BadResponse("No file_id in response.")

        response = initial_response.http_response.json()

        # initialize
        self.client = client
        self.file_id = response["file_id"]
        self._initial_response = initial_response
        self._deserialization_callback = deserialization_callback
        self._resource = None
        self._finished = False

        # sets our strategy
        self._operation = CustomPollingStrategy()

        # create the command which will be polled against as the status monitor
        self._command = functools.partial(self.client.get_upload_file, file_id=self.file_id, **self._kwargs)

        # set initial status
        self._status = self._operation.set_initial_status(initial_response)


    def status(self) -> str:
        """Should return the current status as a string. The initial status is set by
        the polling strategy with set_initial_status() and then subsequently set by
        each call to get_status().

        This is what is returned to the user when status() is called on the LROPoller.

        :rtype: str
        """
        return self._status

    def finished(self) -> bool:
        """Is this polling finished?
        Controls whether the polling loop should continue to poll.

        :returns: Return True if the operation has reached a terminal state
            or False if polling should continue.
        :rtype: bool
        """
        return True if self.status() == "Succeeded" else False

    def resource(self) -> JSON:
        """Return the built resource.
        This is what is returned when to the user when result() is called on the LROPoller.

        This might include a deserialization callback (passed in initialize())
        to transform or customize the final result, if necessary.
        """
        return self._deserialization_callback(self._resource)

    def run(self) -> None:
        """The polling loop.

        The polling should call the status monitor, evaluate and set the current status,
        insert delay between polls, and continue polling until a terminal state is reached.
        """
        while not self.finished():
            self.update_status()
            if not self.finished():
                # inserts delay if not done
                time.sleep(self._polling_interval)

    def update_status(self):
        """Update the current status of the LRO by calling the status monitor
        and then using the polling strategy's get_status() to set the status."""
        try:
            self._resource = self._command()
        except ResourceNotFoundError:
            pass

        self._status = self._operation.get_status(self._resource)

    def get_continuation_token(self) -> str:
        """Returns an opaque token which can be used by the user to rehydrate/restart the LRO.
        Saves the initial state of the LRO so that polling can be resumed from that context.

        .. code-block:: python

            initial_poller = client.begin_upload(data)
            continuation_token = initial_poller.continuation_token()

            poller: LROPoller = client.begin_upload(None, continuation_token=continuation_token)
            poller.result()

        In standard LROs, the PipelineResponse is serialized here, however, there may be a need to
        customize this further depending on your scenario.
        """
        import pickle

        return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")

    @classmethod
    def from_continuation_token(cls, continuation_token: str, **kwargs: Any) -> Tuple[Any, PipelineResponse, Callable]:
        """Deserializes the user-provided continuation_token to the initial response and returns
        the context necessary to rebuild the LROPoller from its classmethod.
        """
        try:
            client = kwargs["client"]
        except KeyError:
            raise ValueError("Need kwarg 'client' to be recreated from continuation_token")

        try:
            deserialization_callback = kwargs["deserialization_callback"]
        except KeyError:
            raise ValueError(
                "Need kwarg 'deserialization_callback' to be recreated from continuation_token"
            )

        import pickle

        initial_response = pickle.loads(base64.b64decode(continuation_token))  # nosec
        # Restore the transport in the context
        initial_response.context.transport = client._client._pipeline._transport  # pylint: disable=protected-access
        return client, initial_response, deserialization_callback
```

And now, to plug into the client code:

```python
from typing import AnyStr, MutableMapping, Any
from azure.core.polling import LROPoller
JSON = MutableMapping[str, Any]


class ServiceOperations:

    def begin_upload(self, data: AnyStr, **kwargs) -> LROPoller[JSON]:
        continuation_token = kwargs.pop("continuation_token", None)
        polling_method = CustomPollingMethod(**kwargs)

        # if continuation_token is provided, we should rehydrate the LRO using the from_continuation_token method
        # which calls our implementation on the CustomPollingMethod method
        if continuation_token is not None:
            return LROPoller.from_continuation_token(
                continuation_token=continuation_token,
                polling_method=polling_method,
                deserialization_callback=lambda x: x,
                client=self
            )

        # the initial call. We pass in `cls` to receive the pipeline_response.
        pipeline_response = self._generated_client.create_upload(data, cls=lambda response, x, y: response, **kwargs)

        return LROPoller(
            client=self,
            initial_response=pipeline_response,
            deserialization_callback=lambda x: x,  # returning the result as-is, but could be a callable to transform the final result
            polling_method=polling_method,
        )
```

Note that we need to account for a `continuation_token` being passed by the user, in which case we should not make the
initial call again, but rather resume polling from the rehydrated state. Since passing `continuation_token` doesn't
require the user to provide the parameters for the initial call, it can be helpful to add overloads to the method to
clarify its usage, especially in cases where required parameters become non-required:

```python
from typing import AnyStr, MutableMapping, Any, overload
from azure.core.polling import LROPoller
JSON = MutableMapping[str, Any]


class ServiceOperations:
    @overload
    def begin_upload(self, data: AnyStr, **kwargs: Any) -> LROPoller[JSON]:
        ...


    @overload
    def begin_upload(self, *, continuation_token: str, **kwargs: Any) -> LROPoller[JSON]:
        ...

    def begin_upload(self, *args, **kwargs) -> LROPoller[JSON]:
        continuation_token = kwargs.pop("continuation_token", None)
        polling_method = CustomPollingMethod(**kwargs)
        if continuation_token is not None:
            return LROPoller.from_continuation_token(
                continuation_token=continuation_token,
                polling_method=polling_method,
                deserialization_callback=lambda x: x,
                client=self
            )

        data = kwargs.pop("data", None)
        if data is None:
            try:
                data = args[0]
            except IndexError:
                raise TypeError("begin_upload() missing 1 required positional argument: 'data'")

        pipeline_response = self._generated_client.create_upload(data, cls=lambda response, x, y: response, **kwargs)

        return LROPoller(
            client=self,
            initial_response=pipeline_response,
            deserialization_callback=lambda x: x,
            polling_method=polling_method,
        )
```


### Poller API - LROPoller/AsyncLROPoller

The last option is if you need to customize the public interface of the `LROPoller` / `AsyncLROPoller`.
Reasons to do this might include exposing important attributes or metadata of the operation in progress,
or adding new features to interact with the operation, such as to pause/resume or cancel it.

#### Example: I want to add a cancel method to my poller

This example builds off the previous example and uses the custom polling method defined above. The custom polling
method gives us access to the client and `file_id` needed to make the `cancel` call. If you support rehydration of
the LRO via `continuation_token`, you must override the `from_continuation_token` method so that the custom poller is used.

```python
from typing import TypeVar
from azure.core.polling import LROPoller, PollingMethod
PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)


class CustomLROPoller(LROPoller[PollingReturnType_co]):

    def cancel(self, **kwargs) -> None:
        """Cancel the upload"""
        return self.polling_method().client.cancel_upload_file(self.polling_method().file_id, **kwargs)

    @classmethod
    def from_continuation_token(
        cls, polling_method: PollingMethod[PollingReturnType_co], continuation_token: str, **kwargs
    ) -> "CustomLROPoller[PollingReturnType_co]":
        (
            client,
            initial_response,
            deserialization_callback,
        ) = polling_method.from_continuation_token(continuation_token, **kwargs)
        return cls(client, initial_response, deserialization_callback, polling_method)

```

And now, to plug into the client code:

```python
from typing import AnyStr, MutableMapping, Any
JSON = MutableMapping[str, Any]


class ServiceOperations:

    def begin_upload(self, data: AnyStr, **kwargs) -> CustomLROPoller[JSON]:
        continuation_token = kwargs.pop("continuation_token", None)
        polling_method = CustomPollingMethod(**kwargs)
        if continuation_token is not None:
            return CustomLROPoller.from_continuation_token(
                continuation_token=continuation_token,
                deserialization_callback=lambda x: x,
                polling_method=polling_method,
                client=self
            )
        response = self._generated_client.create_upload(data, **kwargs)
        return CustomLROPoller[JSON](
            client=self,
            initial_response=response,
            deserialization_callback=lambda x: x,
            polling_method=polling_method,
        )
```

Note that we updated the `begin_upload` return type to `CustomLROPoller`. You should only need to explicitly reference
the custom poller if a new public API has been added. The custom poller should additionally be added to the package
`__init__.py` so that the new public API will be properly documented.


[rest_api_guidelines_lro]: https://github.com/microsoft/api-guidelines/blob/vNext/azure/Guidelines.md#long-running-operations--jobs
[operation_resource_polling]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/base_polling.py#L178
[location_polling]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/base_polling.py#L277
[status_check_polling]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/base_polling.py#L325
[lro_poller]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.LROPoller
[lro_base_polling]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/base_polling.py#L357
[long_running_operation]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/base_polling.py#L121-L161
[polling_method]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.PollingMethod
[async_polling_method]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.AsyncPollingMethod
[async_lro_poller]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.AsyncLROPoller
[async_lro_base_polling]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/azure/core/polling/async_base_polling.py#L40
[no_polling]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.NoPolling
[async_no_polling]: https://azuresdkdocs.z19.web.core.windows.net/python/azure-core/latest/azure.core.polling.html#azure.core.polling.AsyncNoPolling