File: auto_lock_renew_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (138 lines) | stat: -rw-r--r-- 6,672 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show usage of AutoLockRenewer asynchronously:
    1. Automatically renew locks on messages received from non-sessionful entity
    2. Automatically renew locks on the session of sessionful entity

We do not guarantee that this SDK is thread-safe. We do not recommend reusing the ServiceBusClient,
 ServiceBusSender, ServiceBusReceiver across threads. It is up to the running 
 application to use these classes in a thread-safe manner.
"""

import os
import asyncio

from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import ServiceBusError
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
SESSION_QUEUE_NAME = os.environ["SERVICEBUS_SESSION_QUEUE_NAME"]


async def renew_lock_on_message_received_from_non_sessionful_entity():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    async with servicebus_client:
        async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
            msgs_to_send = [ServiceBusMessage("session message: {}".format(i)) for i in range(10)]
            await sender.send_messages(msgs_to_send)
            print("Send messages to non-sessionful queue.")

        # Can also be called via "with AutoLockRenewer() as renewer" to automate shutdown.
        renewer = AutoLockRenewer()

        async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:
            received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)

            for msg in received_msgs:
                # automatically renew the lock on each message for 100 seconds
                renewer.register(receiver, msg, max_lock_renewal_duration=100)
            print("Register messages into AutoLockRenewer done.")

            await asyncio.sleep(100)  # message handling for long period (E.g. application logic)

            for msg in received_msgs:
                await receiver.complete_message(msg)
            print("Complete messages.")

        await renewer.close()


async def renew_lock_on_session_of_the_sessionful_entity():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    async with servicebus_client:

        async with servicebus_client.get_queue_sender(queue_name=SESSION_QUEUE_NAME) as sender:
            msgs_to_send = [ServiceBusMessage("session message: {}".format(i), session_id="SESSION") for i in range(10)]
            await sender.send_messages(msgs_to_send)
            print("Send messages to sessionful queue.")

        renewer = AutoLockRenewer()

        async with servicebus_client.get_queue_receiver(
            queue_name=SESSION_QUEUE_NAME, session_id="SESSION", prefetch_count=10
        ) as receiver:
            # automatically renew the lock on the session for 100 seconds
            renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
            print("Register session into AutoLockRenewer.")

            received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
            await asyncio.sleep(100)  # message handling for long period (E.g. application logic)

            for msg in received_msgs:
                await receiver.complete_message(msg)
            print("Complete messages.")


async def renew_lock_with_lock_renewal_failure_callback():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    async with servicebus_client:
        async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
            await sender.send_messages(ServiceBusMessage("message"))

        async with AutoLockRenewer() as renewer:
            # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the
            # service side message lock duration, to demonstrate failure.  Normally, this should not be adjusted.
            renewer._sleep_time = 40
            async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:

                async def on_lock_renew_failure_callback(renewable, error):
                    # If auto-lock-renewal fails, this function will be called.
                    # If failure is due to an error, the second argument will be populated, otherwise
                    # it will default to `None`.
                    # This callback can be an ideal location to log the failure, or take action to safely
                    # handle any processing on the message or session that was in progress.
                    print("Intentionally failed to renew lock on {} due to {}".format(renewable, error))

                received_msgs = await receiver.receive_messages(max_message_count=1, max_wait_time=5)

                for msg in received_msgs:
                    # automatically renew the lock on each message for 120 seconds
                    renewer.register(
                        receiver,
                        msg,
                        max_lock_renewal_duration=90,
                        on_lock_renew_failure=on_lock_renew_failure_callback,
                    )
                print("Register messages into AutoLockRenewer done.")

                # Cause the messages and autorenewal to time out.
                # Other reasons for renew failure could include a network or service outage.
                await asyncio.sleep(80)

                try:
                    for msg in received_msgs:
                        await receiver.complete_message(msg)
                except ServiceBusError as e:
                    print("Messages cannot be settled if they have timed out. (This is expected)")

                print("Lock renew failure demonstration complete.")


asyncio.run(renew_lock_on_message_received_from_non_sessionful_entity())
asyncio.run(renew_lock_on_session_of_the_sessionful_entity())
asyncio.run(renew_lock_with_lock_renewal_failure_callback())