File: blocking_consume_recover_multiple_hosts.py

package info (click to toggle)
python-pika 1.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,064 kB
  • sloc: python: 20,886; makefile: 136
file content (63 lines) | stat: -rw-r--r-- 1,994 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205

import functools
import random
import pika
from pika.exchange_type import ExchangeType


def on_message(ch, method_frame, _header_frame, body, userdata=None):
    print('Userdata: {} Message body: {}'.format(userdata, body))
    ch.basic_ack(delivery_tag=method_frame.delivery_tag)


credentials = pika.PlainCredentials('guest', 'guest')

params1 = pika.ConnectionParameters(
    'localhost', port=5672, credentials=credentials)
params2 = pika.ConnectionParameters(
    'localhost', port=5673, credentials=credentials)
params3 = pika.ConnectionParameters(
    'localhost', port=5674, credentials=credentials)
params_all = [params1, params2, params3]

# Infinite loop
while True:
    try:
        random.shuffle(params_all)
        connection = pika.BlockingConnection(params_all)
        channel = connection.channel()
        channel.exchange_declare(
            exchange='test_exchange',
            exchange_type=ExchangeType.direct,
            passive=False,
            durable=True,
            auto_delete=False)
        channel.queue_declare(queue='standard', auto_delete=True)
        channel.queue_bind(
            queue='standard',
            exchange='test_exchange',
            routing_key='standard_key')
        channel.basic_qos(prefetch_count=1)

        on_message_callback = functools.partial(
            on_message, userdata='on_message_userdata')
        channel.basic_consume('standard', on_message_callback)

        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()

        connection.close()
        break
    # Do not recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Do not recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue