1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import random
import pika
from pika.exchange_type import ExchangeType
def on_message(ch, method_frame, _header_frame, body, userdata=None):
print('Userdata: {} Message body: {}'.format(userdata, body))
ch.basic_ack(delivery_tag=method_frame.delivery_tag)
credentials = pika.PlainCredentials('guest', 'guest')
params1 = pika.ConnectionParameters(
'localhost', port=5672, credentials=credentials)
params2 = pika.ConnectionParameters(
'localhost', port=5673, credentials=credentials)
params3 = pika.ConnectionParameters(
'localhost', port=5674, credentials=credentials)
params_all = [params1, params2, params3]
# Infinite loop
while True:
try:
random.shuffle(params_all)
connection = pika.BlockingConnection(params_all)
channel = connection.channel()
channel.exchange_declare(
exchange='test_exchange',
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
channel.queue_declare(queue='standard', auto_delete=True)
channel.queue_bind(
queue='standard',
exchange='test_exchange',
routing_key='standard_key')
channel.basic_qos(prefetch_count=1)
on_message_callback = functools.partial(
on_message, userdata='on_message_userdata')
channel.basic_consume('standard', on_message_callback)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
break
# Do not recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
|