1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Example to show usage of AutoLockRenewer asynchronously:
1. Automatically renew locks on messages received from non-sessionful entity
2. Automatically renew locks on the session of sessionful entity
We do not guarantee that this SDK is thread-safe. We do not recommend reusing the ServiceBusClient,
ServiceBusSender, ServiceBusReceiver across threads. It is up to the running
application to use these classes in a thread-safe manner.
"""
import os
import asyncio
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import ServiceBusError
from azure.identity.aio import DefaultAzureCredential
FULLY_QUALIFIED_NAMESPACE = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
SESSION_QUEUE_NAME = os.environ["SERVICEBUS_SESSION_QUEUE_NAME"]
async def renew_lock_on_message_received_from_non_sessionful_entity():
credential = DefaultAzureCredential()
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
async with servicebus_client:
async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
msgs_to_send = [ServiceBusMessage("session message: {}".format(i)) for i in range(10)]
await sender.send_messages(msgs_to_send)
print("Send messages to non-sessionful queue.")
# Can also be called via "with AutoLockRenewer() as renewer" to automate shutdown.
renewer = AutoLockRenewer()
async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in received_msgs:
# automatically renew the lock on each message for 100 seconds
renewer.register(receiver, msg, max_lock_renewal_duration=100)
print("Register messages into AutoLockRenewer done.")
await asyncio.sleep(100) # message handling for long period (E.g. application logic)
for msg in received_msgs:
await receiver.complete_message(msg)
print("Complete messages.")
await renewer.close()
async def renew_lock_on_session_of_the_sessionful_entity():
credential = DefaultAzureCredential()
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
async with servicebus_client:
async with servicebus_client.get_queue_sender(queue_name=SESSION_QUEUE_NAME) as sender:
msgs_to_send = [ServiceBusMessage("session message: {}".format(i), session_id="SESSION") for i in range(10)]
await sender.send_messages(msgs_to_send)
print("Send messages to sessionful queue.")
renewer = AutoLockRenewer()
async with servicebus_client.get_queue_receiver(
queue_name=SESSION_QUEUE_NAME, session_id="SESSION", prefetch_count=10
) as receiver:
# automatically renew the lock on the session for 100 seconds
renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
print("Register session into AutoLockRenewer.")
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
await asyncio.sleep(100) # message handling for long period (E.g. application logic)
for msg in received_msgs:
await receiver.complete_message(msg)
print("Complete messages.")
async def renew_lock_with_lock_renewal_failure_callback():
credential = DefaultAzureCredential()
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
async with servicebus_client:
async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
await sender.send_messages(ServiceBusMessage("message"))
async with AutoLockRenewer() as renewer:
# For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the
# service side message lock duration, to demonstrate failure. Normally, this should not be adjusted.
renewer._sleep_time = 40
async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch_count=10) as receiver:
async def on_lock_renew_failure_callback(renewable, error):
# If auto-lock-renewal fails, this function will be called.
# If failure is due to an error, the second argument will be populated, otherwise
# it will default to `None`.
# This callback can be an ideal location to log the failure, or take action to safely
# handle any processing on the message or session that was in progress.
print("Intentionally failed to renew lock on {} due to {}".format(renewable, error))
received_msgs = await receiver.receive_messages(max_message_count=1, max_wait_time=5)
for msg in received_msgs:
# automatically renew the lock on each message for 120 seconds
renewer.register(
receiver,
msg,
max_lock_renewal_duration=90,
on_lock_renew_failure=on_lock_renew_failure_callback,
)
print("Register messages into AutoLockRenewer done.")
# Cause the messages and autorenewal to time out.
# Other reasons for renew failure could include a network or service outage.
await asyncio.sleep(80)
try:
for msg in received_msgs:
await receiver.complete_message(msg)
except ServiceBusError as e:
print("Messages cannot be settled if they have timed out. (This is expected)")
print("Lock renew failure demonstration complete.")
asyncio.run(renew_lock_on_message_received_from_non_sessionful_entity())
asyncio.run(renew_lock_on_session_of_the_sessionful_entity())
asyncio.run(renew_lock_with_lock_renewal_failure_callback())
|