File: failure_and_recovery.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (164 lines) | stat: -rw-r--r-- 7,558 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
An (overly) verbose sample demonstrating possible failure modes and potential recovery patterns.

Many of these catches are present for illustrative or duplicate purposes, and could be condensed or elided
in a production scenario depending on the system design.
"""

import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.exceptions import (
    MessageSizeExceededError,
    ServiceBusConnectionError,
    ServiceBusAuthorizationError,
    ServiceBusAuthenticationError,
    OperationTimeoutError,
    ServiceBusError,
    ServiceBusCommunicationError,
    MessageAlreadySettled,
    MessageLockLostError,
    MessageNotFoundError,
)
from azure.identity import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]


def send_batch_messages(sender):
    batch_message = sender.create_message_batch()
    for i in range(10):
        try:
            message = ServiceBusMessage("Data {}".format(i))
        except TypeError:
            # Message body is of an inappropriate type, must be string, bytes or None.
            continue
        try:
            batch_message.add_message(message)
        except MessageSizeExceededError:
            # ServiceBusMessageBatch object reaches max_size.
            # New ServiceBusMessageBatch object can be created here to send more data.
            # This must be handled at the application layer, by breaking up or condensing.
            continue
    last_error = None
    for _ in range(3):  # Send retries
        try:
            sender.send_messages(batch_message)
            return
        except OperationTimeoutError:
            # send has timed out, retry.
            continue
        except MessageSizeExceededError:
            # The body provided in the message to be sent is too large.
            # This must be handled at the application layer, by breaking up or condensing.
            raise
        except ServiceBusError as e:
            # Other types of service bus errors that can be handled at the higher level, such as connection/auth errors
            # If it happens persistently, should bubble up, and should be logged/alerted if high volume.
            last_error = e
            continue
    if last_error:
        raise last_error


def receive_messages(receiver):
    should_retry = True
    while should_retry:
        try:
            for msg in receiver:
                try:
                    # Do your application-specific data processing here
                    print(str(msg))
                    should_complete = True
                except Exception as e:
                    should_complete = False

                for _ in range(3):  # Settlement retry
                    try:
                        if should_complete:
                            receiver.complete_message(msg)
                        else:
                            receiver.abandon_message(msg)
                            # Depending on the desired behavior, one could dead letter on failure instead; failure modes are comparable.
                            # Abandon returns the message to the queue for another consumer to receive, dead letter moves to the dead letter subqueue.
                            #
                            # receiver.dead_letter_message(msg, reason=str(e), error_description="Application level failure")
                        break
                    except MessageAlreadySettled:
                        # Message was already settled, either somewhere earlier in this processing or by another node.  Continue.
                        break
                    except MessageLockLostError:
                        # Message lock was lost before settlement.  Handle as necessary in the app layer for idempotency then continue on.
                        break
                    except MessageNotFoundError:
                        # Message has an improper sequence number, was dead lettered, or otherwise does not exist.  Handle at app layer, continue on.
                        break
                    except ServiceBusError:
                        # Any other undefined service errors during settlement.  Can be transient, and can retry, but should be logged, and alerted on high volume.
                        continue
            return
        except ServiceBusAuthorizationError:
            # Permission based errors should be bubbled up.
            raise
        except:
            # Although miscellaneous service errors and interruptions can occasionally occur during receiving,
            # In most pragmatic cases one can try to continue receiving unless the failure mode seens persistent.
            # Logging the associated failure and alerting on high volume is often prudent.
            continue


def send_and_receive_defensively():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential, logging_enable=True)

    for _ in range(3):  # Connection retries.
        try:
            print("Opening")
            with servicebus_client:
                sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
                try:
                    with sender:
                        print("Sending")
                        send_batch_messages(sender)
                except ValueError:
                    # Handler was shut down previously.  (Cannot happen in this example, shown for completeness.)
                    pass

                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
                try:
                    with receiver:
                        print("Receiving")
                        receive_messages(receiver)
                except ValueError:
                    # Handler was shut down previously.  (Cannot happen in this example, shown for completeness.)
                    pass

                return
        except ServiceBusConnectionError:
            # An error occurred in the connection to the service.
            # This may have been caused by a transient network issue or service problem. It is recommended to retry.
            continue
        except ServiceBusAuthorizationError:
            # An error occurred when authorizing the connection to the service.
            # This may have been caused by the credentials not having the right permission to perform the operation.
            # It is recommended to check the permission of the credentials.
            raise
        except ServiceBusAuthenticationError:
            # An error occurred when authenticate the connection to the service.
            # This may have been caused by the credentials being incorrect. It is recommended to check the credentials.
            raise
        except ServiceBusCommunicationError:
            # Unable to communicate with the specified servicebus.  Ensure that the FQDN is correct,
            # and that there is no firewall or network issue preventing connectivity.
            raise


send_and_receive_defensively()
print("Send and Receive is done.")