File: queue_samples_message_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (374 lines) | stat: -rw-r--r-- 13,660 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: queue_samples_message_async.py

DESCRIPTION:
    These samples demonstrate the following: creating and setting an access policy to generate a
    sas token, getting a queue client from a queue URL, setting and getting queue
    metadata, sending messages and receiving them individually or by batch, deleting and
    clearing all messages, and peeking and updating messages.

USAGE:
    python queue_samples_message_async.py

    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""

from datetime import datetime, timedelta
import asyncio
import os
import sys


class QueueMessageSamplesAsync(object):

    connection_string = os.getenv("STORAGE_CONNECTION_STRING")

    async def set_access_policy_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # [START async_create_queue_client_from_connection_string]
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync1")
        if queue.account_name is None:
            print("Connection string did not provide an account name." + "\n" + "Test: set_access_policy_async")
            sys.exit(1)
        # [END async_create_queue_client_from_connection_string]

        # Create the queue
        async with queue:
            await queue.create_queue()

            # Send a message
            await queue.send_message("hello world")

            try:
                # [START async_set_access_policy]
                # Create an access policy
                from azure.storage.queue import AccessPolicy, QueueSasPermissions

                access_policy = AccessPolicy()
                access_policy.start = datetime.utcnow() - timedelta(hours=1)
                access_policy.expiry = datetime.utcnow() + timedelta(hours=1)
                access_policy.permission = QueueSasPermissions(read=True)
                identifiers = {"my-access-policy-id": access_policy}

                # Set the access policy
                await queue.set_queue_access_policy(identifiers)
                # [END async_set_access_policy]

                # Use the access policy to generate a SAS token
                from azure.storage.queue import generate_queue_sas

                sas_token = generate_queue_sas(
                    queue.account_name, queue.queue_name, queue.credential.account_key, policy_id="my-access-policy-id"
                )

                # Authenticate with the sas token
                # [START async_create_queue_client]
                token_auth_queue = QueueClient.from_queue_url(queue_url=queue.url, credential=sas_token)
                # [END async_create_queue_client]

                # Use the newly authenticated client to receive messages
                my_messages = token_auth_queue.receive_messages()

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def queue_metadata_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync2")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                # [START async_set_queue_metadata]
                metadata = {"foo": "val1", "bar": "val2", "baz": "val3"}
                await queue.set_queue_metadata(metadata=metadata)
                # [END async_set_queue_metadata]

                # [START async_get_queue_properties]
                properties = await queue.get_queue_properties()
                # [END async_get_queue_properties]

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def send_and_receive_messages_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync3")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                # [START async_send_messages]
                await asyncio.gather(
                    queue.send_message("message1"),
                    queue.send_message("message2", visibility_timeout=30),  # wait 30s before becoming visible
                    queue.send_message("message3"),
                    queue.send_message("message4"),
                    queue.send_message("message5"),
                )
                # [END async_send_messages]

                # [START async_receive_messages]
                # Receive messages one-by-one
                messages = queue.receive_messages()
                async for msg in messages:
                    print(msg.content)

                # Receive messages by batch
                messages = queue.receive_messages(messages_per_page=5)
                async for msg_batch in messages.by_page():
                    async for msg in msg_batch:
                        print(msg.content)
                        await queue.delete_message(msg)
                # [END async_receive_messages]

                # Only prints 4 messages because message 2 is not visible yet
                # >>message1
                # >>message3
                # >>message4
                # >>message5

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def receive_one_message_from_queue(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync4")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                await asyncio.gather(
                    queue.send_message("message1"), queue.send_message("message2"), queue.send_message("message3")
                )

                # [START receive_one_message]
                # Pop two messages from the front of the queue
                message1 = await queue.receive_message()
                message2 = await queue.receive_message()
                # We should see message 3 if we peek
                message3 = await queue.peek_messages()

                if not message1 or not message2 or not message3:
                    raise ValueError("One of the messages are None.")

                print(message1.content)
                print(message2.content)
                print(message3[0].content)
                # [END receive_one_message]

            finally:
                await queue.delete_queue()

    async def delete_and_clear_messages_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync5")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                # Send messages
                await asyncio.gather(
                    queue.send_message("message1"),
                    queue.send_message("message2"),
                    queue.send_message("message3"),
                    queue.send_message("message4"),
                    queue.send_message("message5"),
                )

                # [START async_delete_message]
                # Get the message at the front of the queue
                messages = queue.receive_messages()
                async for msg in messages:
                    # Delete the specified message
                    await queue.delete_message(msg)
                    # [END async_delete_message]
                    break

                # [START async_clear_messages]
                await queue.clear_messages()
                # [END async_clear_messages]

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def peek_messages_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync6")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                # Send messages
                await asyncio.gather(
                    queue.send_message("message1"),
                    queue.send_message("message2"),
                    queue.send_message("message3"),
                    queue.send_message("message4"),
                    queue.send_message("message5"),
                )

                # [START async_peek_message]
                # Peek at one message at the front of the queue
                msg = await queue.peek_messages()

                # Peek at the last 5 messages
                messages = await queue.peek_messages(max_messages=5)

                # Print the last 5 messages
                for message in messages:
                    print(message.content)
                # [END async_peek_message]

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def update_message_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync7")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                # [START async_update_message]
                # Send a message
                await queue.send_message("update me")

                # Receive the message
                messages = queue.receive_messages()

                # Update the message
                async for message in messages:
                    message = await queue.update_message(message, visibility_timeout=0, content="updated")
                    # [END async_update_message]
                    break

            finally:
                # Delete the queue
                await queue.delete_queue()

    async def receive_messages_with_max_messages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue.aio import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueueasync8")

        # Create the queue
        async with queue:
            await queue.create_queue()

            try:
                await queue.send_message("message1")
                await queue.send_message("message2")
                await queue.send_message("message3")
                await queue.send_message("message4")
                await queue.send_message("message5")
                await queue.send_message("message6")
                await queue.send_message("message7")
                await queue.send_message("message8")
                await queue.send_message("message9")
                await queue.send_message("message10")

                # Receive messages one-by-one
                messages = queue.receive_messages(max_messages=5)
                async for msg in messages:
                    print(msg.content)
                    await queue.delete_message(msg)

                # Only prints 5 messages because 'max_messages'=5
                # >>message1
                # >>message2
                # >>message3
                # >>message4
                # >>message5

            finally:
                # Delete the queue
                await queue.delete_queue()


async def main():
    sample = QueueMessageSamplesAsync()
    await sample.set_access_policy_async()
    await sample.queue_metadata_async()
    await sample.send_and_receive_messages_async()
    await sample.receive_one_message_from_queue()
    await sample.delete_and_clear_messages_async()
    await sample.peek_messages_async()
    await sample.update_message_async()
    await sample.receive_messages_with_max_messages()


if __name__ == "__main__":
    asyncio.run(main())