File: sqs_queue_consumer.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (61 lines) | stat: -rw-r--r-- 1,848 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
"""
aiobotocore SQS Consumer Example
"""
import asyncio
import sys

import botocore.exceptions

from aiobotocore.session import get_session

QUEUE_NAME = 'test_queue12'


async def go():
    # Boto should get credentials from ~/.aws/credentials or the environment
    session = get_session()
    async with session.create_client('sqs', region_name='us-west-2') as client:
        try:
            response = await client.get_queue_url(QueueName=QUEUE_NAME)
        except botocore.exceptions.ClientError as err:
            if (
                err.response['Error']['Code']
                == 'AWS.SimpleQueueService.NonExistentQueue'
            ):
                print(f"Queue {QUEUE_NAME} does not exist")
                sys.exit(1)
            else:
                raise

        queue_url = response['QueueUrl']

        print('Pulling messages off the queue')

        while True:
            try:
                # This loop wont spin really fast as there is
                # essentially a sleep in the receive_message call
                response = await client.receive_message(
                    QueueUrl=queue_url,
                    WaitTimeSeconds=2,
                )

                if 'Messages' in response:
                    for msg in response['Messages']:
                        print(f'Got msg "{msg["Body"]}"')
                        # Need to remove msg from queue or else it'll reappear
                        await client.delete_message(
                            QueueUrl=queue_url,
                            ReceiptHandle=msg['ReceiptHandle'],
                        )
                else:
                    print('No messages in queue')
            except KeyboardInterrupt:
                break

        print('Finished')


if __name__ == '__main__':
    asyncio.run(go())