File: queue_samples_message.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (397 lines) | stat: -rw-r--r-- 13,144 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: queue_samples_message.py

DESCRIPTION:
    These samples demonstrate the following: creating and setting an access policy to generate a
    sas token, getting a queue client from a queue URL, setting and getting queue
    metadata, sending messages and receiving them individually or by batch, deleting and
    clearing all messages, and peeking and updating messages.

USAGE:
    python queue_samples_message.py

    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""


from datetime import datetime, timedelta
import os
import sys


class QueueMessageSamples(object):

    connection_string = os.getenv("STORAGE_CONNECTION_STRING")

    def set_access_policy(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # [START create_queue_client_from_connection_string]
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue1")
        if queue.account_name is None:
            print("Connection string did not provide an account name." + "\n" + "Test: set_access_policy")
            sys.exit(1)
        # [END create_queue_client_from_connection_string]

        # Create the queue
        queue.create_queue()

        # Send a message
        queue.send_message("hello world")

        try:
            # [START set_access_policy]
            # Create an access policy
            from azure.storage.queue import AccessPolicy, QueueSasPermissions

            access_policy = AccessPolicy()
            access_policy.start = datetime.utcnow() - timedelta(hours=1)
            access_policy.expiry = datetime.utcnow() + timedelta(hours=1)
            access_policy.permission = QueueSasPermissions(read=True)
            identifiers = {"my-access-policy-id": access_policy}

            # Set the access policy
            queue.set_queue_access_policy(identifiers)
            # [END set_access_policy]

            # Use the access policy to generate a SAS token
            # [START queue_client_sas_token]
            from azure.storage.queue import generate_queue_sas

            sas_token = generate_queue_sas(
                queue.account_name, queue.queue_name, queue.credential.account_key, policy_id="my-access-policy-id"
            )
            # [END queue_client_sas_token]

            # Authenticate with the sas token
            # [START create_queue_client]
            token_auth_queue = QueueClient.from_queue_url(queue_url=queue.url, credential=sas_token)
            # [END create_queue_client]

            # Use the newly authenticated client to receive messages
            my_message = token_auth_queue.receive_messages()

        finally:
            # Delete the queue
            queue.delete_queue()

    def queue_metadata(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue2")

        # Create the queue
        queue.create_queue()

        try:
            # [START set_queue_metadata]
            metadata = {"foo": "val1", "bar": "val2", "baz": "val3"}
            queue.set_queue_metadata(metadata=metadata)
            # [END set_queue_metadata]

            # [START get_queue_properties]
            properties = queue.get_queue_properties().metadata
            # [END get_queue_properties]

        finally:
            # Delete the queue
            queue.delete_queue()

    def send_and_receive_messages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue3")

        # Create the queue
        queue.create_queue()

        try:
            # [START send_messages]
            queue.send_message("message1")
            queue.send_message("message2", visibility_timeout=30)  # wait 30s before becoming visible
            queue.send_message("message3")
            queue.send_message("message4")
            queue.send_message("message5")
            # [END send_messages]

            # [START receive_messages]
            # Receive messages one-by-one
            messages = queue.receive_messages()
            for msg in messages:
                print(msg.content)

            # Receive messages by batch
            messages = queue.receive_messages(messages_per_page=5)
            for msg_batch in messages.by_page():
                for msg in msg_batch:
                    print(msg.content)
                    queue.delete_message(msg)
            # [END receive_messages]

            # Only prints 4 messages because message 2 is not visible yet
            # >>message1
            # >>message3
            # >>message4
            # >>message5

        finally:
            # Delete the queue
            queue.delete_queue()

    def list_message_pages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue4")

        # Create the queue
        queue.create_queue()

        try:
            queue.send_message("message1")
            queue.send_message("message2")
            queue.send_message("message3")
            queue.send_message("message4")
            queue.send_message("message5")
            queue.send_message("message6")

            # [START receive_messages_listing]
            # Store two messages in each page
            message_batches = queue.receive_messages(messages_per_page=2).by_page()

            # Iterate through the page lists
            print(list(next(message_batches)))
            print(list(next(message_batches)))

            # There are two iterations in the last page as well.
            last_page = next(message_batches)
            for message in last_page:
                print(message)
            # [END receive_messages_listing]

        finally:
            queue.delete_queue()

    def receive_one_message_from_queue(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue5")

        # Create the queue
        queue.create_queue()

        try:
            queue.send_message("message1")
            queue.send_message("message2")
            queue.send_message("message3")

            # [START receive_one_message]
            # Pop two messages from the front of the queue
            message1 = queue.receive_message()
            message2 = queue.receive_message()
            # We should see message 3 if we peek
            message3 = queue.peek_messages()[0]

            if not message1 or not message2 or not message3:
                raise ValueError("One of the messages are None.")

            print(message1.content)
            print(message2.content)
            print(message3.content)
            # [END receive_one_message]

        finally:
            queue.delete_queue()

    def delete_and_clear_messages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue6")

        # Create the queue
        queue.create_queue()

        try:
            # Send messages
            queue.send_message("message1")
            queue.send_message("message2")
            queue.send_message("message3")
            queue.send_message("message4")
            queue.send_message("message5")

            # [START delete_message]
            # Get the message at the front of the queue
            msg = next(queue.receive_messages())

            # Delete the specified message
            queue.delete_message(msg)
            # [END delete_message]

            # [START clear_messages]
            queue.clear_messages()
            # [END clear_messages]

        finally:
            # Delete the queue
            queue.delete_queue()

    def peek_messages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue7")

        # Create the queue
        queue.create_queue()

        try:
            # Send messages
            queue.send_message("message1")
            queue.send_message("message2")
            queue.send_message("message3")
            queue.send_message("message4")
            queue.send_message("message5")

            # [START peek_message]
            # Peek at one message at the front of the queue
            msg = queue.peek_messages()

            # Peek at the last 5 messages
            messages = queue.peek_messages(max_messages=5)

            # Print the last 5 messages
            for message in messages:
                print(message.content)
            # [END peek_message]

        finally:
            # Delete the queue
            queue.delete_queue()

    def update_message(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue8")

        # Create the queue
        queue.create_queue()

        try:
            # [START update_message]
            # Send a message
            queue.send_message("update me")

            # Receive the message
            messages = queue.receive_messages()

            # Update the message
            list_result = next(messages)
            message = queue.update_message(
                list_result.id, pop_receipt=list_result.pop_receipt, visibility_timeout=0, content="updated"
            )
            # [END update_message]

        finally:
            # Delete the queue
            queue.delete_queue()

    def receive_messages_with_max_messages(self):
        if self.connection_string is None:
            print("Missing required environment variable: connection_string")
            sys.exit(1)

        # Instantiate a queue client
        from azure.storage.queue import QueueClient

        queue = QueueClient.from_connection_string(self.connection_string, "myqueue9")

        # Create the queue
        queue.create_queue()

        try:
            queue.send_message("message1")
            queue.send_message("message2")
            queue.send_message("message3")
            queue.send_message("message4")
            queue.send_message("message5")
            queue.send_message("message6")
            queue.send_message("message7")
            queue.send_message("message8")
            queue.send_message("message9")
            queue.send_message("message10")

            # Receive messages one-by-one
            messages = queue.receive_messages(max_messages=5)
            for msg in messages:
                print(msg.content)
                queue.delete_message(msg)

            # Only prints 5 messages because 'max_messages'=5
            # >>message1
            # >>message2
            # >>message3
            # >>message4
            # >>message5

        finally:
            # Delete the queue
            queue.delete_queue()


if __name__ == "__main__":
    sample = QueueMessageSamples()
    sample.set_access_policy()
    sample.queue_metadata()
    sample.send_and_receive_messages()
    sample.list_message_pages()
    sample.receive_one_message_from_queue()
    sample.delete_and_clear_messages()
    sample.peek_messages()
    sample.update_message()
    sample.receive_messages_with_max_messages()