File: sample_code_servicebus_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (370 lines) | stat: -rw-r--r-- 16,228 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
Examples to show basic async use case of python azure-servicebus SDK, including:
    - Create ServiceBusClient
    - Create ServiceBusSender/ServiceBusReceiver
    - Send single message and batch messages
    - Peek, receive and settle messages
    - Receive and settle dead-lettered messages
    - Receive and settle deferred messages
    - Schedule and cancel scheduled messages
    - Session related operations
"""
import os
import datetime
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage


_RUN_ITERATOR = False


async def process_message(message):
    print(str(message))


def example_create_servicebus_client_async():
    # [START create_sb_client_from_conn_str_async]
    import os
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    # [END create_sb_client_from_conn_str_async]

    # [START create_sb_client_async]
    import os
    from azure.identity.aio import DefaultAzureCredential
    from azure.servicebus.aio import ServiceBusClient

    fully_qualified_namespace = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
    servicebus_client = ServiceBusClient(
        fully_qualified_namespace=fully_qualified_namespace, credential=DefaultAzureCredential()
    )
    # [END create_sb_client_async]
    return servicebus_client


async def example_create_servicebus_sender_async():
    servicebus_client = example_create_servicebus_client_async()
    # [START create_servicebus_sender_from_conn_str_async]
    import os
    from azure.servicebus.aio import ServiceBusSender

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
    queue_sender = ServiceBusSender._from_connection_string(conn_str=servicebus_connection_str, queue_name=queue_name)
    # [END create_servicebus_sender_from_conn_str_async]

    # [START create_servicebus_sender_from_sb_client_async]
    import os
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
    # [END create_servicebus_sender_from_sb_client_async]

    # [START create_topic_sender_from_sb_client_async]
    import os
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
    # [END create_topic_sender_from_sb_client_async]

    queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
    return queue_sender


async def example_create_servicebus_receiver_async():
    servicebus_client = example_create_servicebus_client_async()

    # [START create_servicebus_receiver_from_conn_str_async]
    import os
    from azure.servicebus.aio import ServiceBusReceiver

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
    queue_receiver = ServiceBusReceiver._from_connection_string(
        conn_str=servicebus_connection_str, queue_name=queue_name
    )
    # [END create_servicebus_receiver_from_conn_str_async]

    # [START create_queue_deadletter_receiver_from_sb_client_async]
    import os
    from azure.servicebus import ServiceBusSubQueue
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        queue_receiver = servicebus_client.get_queue_receiver(
            queue_name=queue_name, sub_queue=ServiceBusSubQueue.DEAD_LETTER
        )
    # [END create_queue_deadletter_receiver_from_sb_client_async]

    # [START create_servicebus_receiver_from_sb_client_async]
    import os
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
    # [END create_servicebus_receiver_from_sb_client_async]

    # [START create_subscription_deadletter_receiver_from_sb_client_async]
    import os
    from azure.servicebus import ServiceBusSubQueue
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
    subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name, subscription_name=subscription_name, sub_queue=ServiceBusSubQueue.DEAD_LETTER
        )
    # [END create_subscription_deadletter_receiver_from_sb_client_async]

    # [START create_subscription_receiver_from_sb_client_async]
    import os
    from azure.servicebus.aio import ServiceBusClient

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
    subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    async with servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=subscription_name,
        )
    # [END create_subscription_receiver_from_sb_client_async]

    queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
    return queue_receiver


async def example_send_and_receive_async():
    from azure.servicebus import ServiceBusMessage

    servicebus_sender = await example_create_servicebus_sender_async()
    # [START send_async]
    async with servicebus_sender:
        message_send = ServiceBusMessage("Hello World")
        await servicebus_sender.send_messages(message_send)
        # [END send_async]
        await servicebus_sender.send_messages([ServiceBusMessage("Hello World")] * 5)

    servicebus_sender = await example_create_servicebus_sender_async()
    # [START create_batch_async]
    async with servicebus_sender:
        batch_message = await servicebus_sender.create_message_batch()
        batch_message.add_message(ServiceBusMessage("Single message inside batch"))
    # [END create_batch_async]

    servicebus_receiver = await example_create_servicebus_receiver_async()
    # [START peek_messages_async]
    async with servicebus_receiver:
        messages = await servicebus_receiver.peek_messages()
        for message in messages:
            print(str(message))
    # [END peek_messages_async]

    servicebus_receiver = await example_create_servicebus_receiver_async()
    # [START receive_async]
    async with servicebus_receiver:
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            print(str(message))
            await servicebus_receiver.complete_message(message)
    # [END receive_async]

    servicebus_receiver = await example_create_servicebus_receiver_async()
    # [START receive_forever_async]
    async with servicebus_receiver:
        async for message in servicebus_receiver:
            print(str(message))
            await servicebus_receiver.complete_message(message)
            # [END receive_forever_async]
            break

        # [START abandon_message_async]
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            await servicebus_receiver.abandon_message(message)
        # [END abandon_message_async]

        # [START complete_message_async]
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            await servicebus_receiver.complete_message(message)
        # [END complete_message_async]

        # [START defer_message_async]
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            await servicebus_receiver.defer_message(message)
        # [END defer_message_async]

        # [START dead_letter_message_async]
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            await servicebus_receiver.dead_letter_message(message)
        # [END dead_letter_message_async]

        # [START renew_message_lock_async]
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            await servicebus_receiver.renew_message_lock(message)
        # [END renew_message_lock_async]

    servicebus_receiver = await example_create_servicebus_receiver_async()
    # [START auto_lock_renew_message_async]
    from azure.servicebus.aio import AutoLockRenewer

    lock_renewal = AutoLockRenewer()
    async with servicebus_receiver:
        async for message in servicebus_receiver:
            lock_renewal.register(servicebus_receiver, message, max_lock_renewal_duration=60)
            await process_message(message)
            await servicebus_receiver.complete_message(message)
            # [END auto_lock_renew_message_async]
            break
    await lock_renewal.close()


async def example_receive_deferred_async():
    servicebus_sender = await example_create_servicebus_sender_async()
    servicebus_receiver = await example_create_servicebus_receiver_async()
    async with servicebus_sender:
        await servicebus_sender.send_messages(ServiceBusMessage("Hello World"))
    # [START receive_defer_async]
    async with servicebus_receiver:
        deferred_sequenced_numbers = []
        messages = await servicebus_receiver.receive_messages(max_wait_time=5)
        for message in messages:
            deferred_sequenced_numbers.append(message.sequence_number)
            print(str(message))
            await servicebus_receiver.defer_message(message)

        received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
            sequence_numbers=deferred_sequenced_numbers
        )

        for message in received_deferred_msg:
            await servicebus_receiver.complete_message(message)
    # [END receive_defer_async]


async def example_receive_deadletter_async():
    from azure.servicebus import ServiceBusSubQueue

    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]

    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
        async with servicebus_client.get_queue_sender(queue_name) as servicebus_sender:
            await servicebus_sender.send_messages(ServiceBusMessage("Hello World"))
        # [START receive_deadletter_async]
        async with servicebus_client.get_queue_receiver(queue_name) as servicebus_receiver:
            messages = await servicebus_receiver.receive_messages(max_wait_time=5)
            for message in messages:
                await servicebus_receiver.dead_letter_message(
                    message, reason="reason for dead lettering", error_description="description for dead lettering"
                )

        async with servicebus_client.get_queue_receiver(
            queue_name, sub_queue=ServiceBusSubQueue.DEAD_LETTER
        ) as servicebus_deadletter_receiver:
            messages = await servicebus_deadletter_receiver.receive_messages(max_wait_time=5)
            for message in messages:
                await servicebus_deadletter_receiver.complete_message(message)
        # [END receive_deadletter_async]


async def example_session_ops_async():
    servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
    queue_name = os.environ["SERVICEBUS_SESSION_QUEUE_NAME"]
    session_id = "<your session id>"

    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:

        async with servicebus_client.get_queue_sender(queue_name=queue_name) as sender:
            await sender.send_messages(ServiceBusMessage("msg", session_id=session_id))

        # [START get_session_async]
        async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
            session = receiver.session
        # [END get_session_async]

        # [START get_session_state_async]
        async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
            session = receiver.session
            session_state = await session.get_state()
        # [END get_session_state_async]

        # [START set_session_state_async]
        async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
            session = receiver.session
            await session.set_state("START")
        # [END set_session_state_async]

        # [START session_renew_lock_async]
        async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
            session = receiver.session
            await session.renew_lock()
        # [END session_renew_lock_async]

        # [START auto_lock_renew_session_async]
        from azure.servicebus.aio import AutoLockRenewer

        lock_renewal = AutoLockRenewer()
        async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
            session = receiver.session
            # Auto renew session lock for 2 minutes
            lock_renewal.register(receiver, session, max_lock_renewal_duration=120)
            async for message in receiver:
                await process_message(message)
                await receiver.complete_message(message)
                # [END auto_lock_renew_session_async]
                break


async def example_schedule_ops_async():
    servicebus_sender = await example_create_servicebus_sender_async()
    # [START scheduling_messages_async]
    async with servicebus_sender:
        scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
        scheduled_messages = [ServiceBusMessage("Scheduled message") for _ in range(10)]
        sequence_nums = await servicebus_sender.schedule_messages(scheduled_messages, scheduled_time_utc)
    # [END scheduling_messages_async]

    servicebus_sender = await example_create_servicebus_sender_async()
    # [START cancel_scheduled_messages_async]
    async with servicebus_sender:
        await servicebus_sender.cancel_scheduled_messages(sequence_nums)
    # [END cancel_scheduled_messages_async]


if __name__ == "__main__":
    asyncio.run(example_send_and_receive_async())
    asyncio.run(example_receive_deferred_async())
    asyncio.run(example_schedule_ops_async())
    asyncio.run(example_receive_deadletter_async())
    asyncio.run(example_session_ops_async())