File: auto_lock_renew.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (141 lines) | stat: -rw-r--r-- 6,490 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show usage of AutoLockRenewer:
    1. Automatically renew locks on messages received from non-sessionful entity
    2. Automatically renew locks on the session of sessionful entity

We do not guarantee that this SDK is thread-safe. We do not recommend reusing the ServiceBusClient,
 ServiceBusSender, ServiceBusReceiver across threads. It is up to the running 
 application to use these classes in a thread-safe manner.
"""

import os
import time

from azure.servicebus import ServiceBusClient, AutoLockRenewer, ServiceBusMessage
from azure.servicebus.exceptions import ServiceBusError
from azure.identity import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
SESSION_QUEUE_NAME = os.environ["SERVICEBUS_SESSION_QUEUE_NAME"]


def renew_lock_on_message_received_from_non_sessionful_entity():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    with servicebus_client:
        with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
            msgs_to_send = [ServiceBusMessage("message: {}".format(i)) for i in range(10)]
            sender.send_messages(msgs_to_send)
            print("Send messages to non-sessionful queue.")

        # Can also be called via "with AutoLockRenewer() as renewer" to automate shutdown.
        renewer = AutoLockRenewer()

        with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:
            received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)

            for msg in received_msgs:
                # automatically renew the lock on each message for 100 seconds
                renewer.register(receiver, msg, max_lock_renewal_duration=100)
            print("Register messages into AutoLockRenewer done.")

            time.sleep(100)  # message handling for long period (E.g. application logic)

            for msg in received_msgs:
                receiver.complete_message(msg)  # Settling the message deregisters it from the AutoLockRenewer
            print("Complete messages.")

        renewer.close()


def renew_lock_on_session_of_the_sessionful_entity():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    with servicebus_client:

        with servicebus_client.get_queue_sender(queue_name=SESSION_QUEUE_NAME) as sender:
            msgs_to_send = [ServiceBusMessage("session message: {}".format(i), session_id="SESSION") for i in range(10)]
            sender.send_messages(msgs_to_send)
            print("Send messages to sessionful queue.")

        renewer = AutoLockRenewer()

        with servicebus_client.get_queue_receiver(
            queue_name=SESSION_QUEUE_NAME, session_id="SESSION", prefetch_count=10
        ) as receiver:

            # automatically renew the lock on the session for 100 seconds
            renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
            print("Register session into AutoLockRenewer.")

            received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
            time.sleep(100)  # message handling for long period (E.g. application logic)

            for msg in received_msgs:
                receiver.complete_message(msg)

            print("Complete messages.")

        renewer.close()


def renew_lock_with_lock_renewal_failure_callback():
    credential = DefaultAzureCredential()
    servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)

    with servicebus_client:
        with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
            sender.send_messages(ServiceBusMessage("message"))

        with AutoLockRenewer() as renewer:
            # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the
            # service side message lock duration, to demonstrate failure.  Normally, this should not be adjusted.
            renewer._sleep_time = 40
            with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:

                def on_lock_renew_failure_callback(renewable, error):
                    # If auto-lock-renewal fails, this function will be called.
                    # If failure is due to an error, the second argument will be populated, otherwise
                    # it will default to `None`.
                    # This callback can be an ideal location to log the failure, or take action to safely
                    # handle any processing on the message or session that was in progress.
                    print("Intentionally failed to renew lock on {} due to {}".format(renewable, error))

                received_msgs = receiver.receive_messages(max_message_count=1, max_wait_time=5)

                for msg in received_msgs:
                    # automatically renew the lock on each message for 120 seconds
                    renewer.register(
                        receiver,
                        msg,
                        max_lock_renewal_duration=90,
                        on_lock_renew_failure=on_lock_renew_failure_callback,
                    )
                print("Register messages into AutoLockRenewer done.")

                # Cause the messages and autorenewal to time out.
                # Other reasons for renew failure could include a network or service outage.
                time.sleep(80)

                try:
                    for msg in received_msgs:
                        receiver.complete_message(msg)
                except ServiceBusError as e:
                    print("Messages cannot be settled if they have timed out. (This is expected)")

                print("Lock renew failure demonstration complete.")


renew_lock_on_message_received_from_non_sessionful_entity()
renew_lock_on_session_of_the_sessionful_entity()
renew_lock_with_lock_renewal_failure_callback()