File: blocking_consume_recover_multiple_hosts.rst

package info (click to toggle)
python-pika 1.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,064 kB
  • sloc: python: 20,886; makefile: 136
file content (115 lines) | stat: -rw-r--r-- 4,662 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
Using the Blocking Connection with connection recovery with multiple hosts
==========================================================================

.. _example_blocking_basic_consume_recover_multiple_hosts:

RabbitMQ nodes can be `clustered <http://www.rabbitmq.com/clustering.html>`_.
In the absence of failure clients can connect to any node and perform any operation.
In case a node fails, stops, or becomes unavailable, clients should be able to
connect to another node and continue.

To simplify reconnection to a different node, connection recovery mechanism
should be combined with connection configuration that specifies multiple hosts.

The BlockingConnection adapter relies on exception handling to check for
connection errors::

    import pika
    import random

    def on_message(channel, method_frame, header_frame, body):
        print(method_frame.delivery_tag)
        print(body)
        print()
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)

    ## Assuming there are three hosts: host1, host2, and host3
    node1 = pika.URLParameters('amqp://node1')
    node2 = pika.URLParameters('amqp://node2')
    node3 = pika.URLParameters('amqp://node3')
    all_endpoints = [node1, node2, node3]

    while(True):
        try:
            print("Connecting...")
            ## Shuffle the hosts list before reconnecting.
            ## This can help balance connections.
            random.shuffle(all_endpoints)
            connection = pika.BlockingConnection(all_endpoints)
            channel = connection.channel()
            channel.basic_qos(prefetch_count=1)
            ## This queue is intentionally non-durable. See http://www.rabbitmq.com/ha.html#non-mirrored-queue-behavior-on-node-failure
            ## to learn more.
            channel.queue_declare('recovery-example', durable = False, auto_delete = True)
            channel.basic_consume('recovery-example', on_message)
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
                connection.close()
                break
        except pika.exceptions.ConnectionClosedByBroker:
            # Uncomment this to make the example not attempt recovery
            # from server-initiated connection closure, including
            # when the node is stopped cleanly
            #
            # break
            continue
        # Do not recover on channel errors
        except pika.exceptions.AMQPChannelError as err:
            print("Caught a channel error: {}, stopping...".format(err))
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            print("Connection was closed, retrying...")
            continue

Generic operation retry libraries such as `retry <https://github.com/invl/retry>`_
can prove useful.

To run the following example, install the library first with `pip install retry`.

In this example the `retry` decorator is used to set up recovery with delay::

    import pika
    import random
    from retry import retry

    def on_message(channel, method_frame, header_frame, body):
        print(method_frame.delivery_tag)
        print(body)
        print()
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)

    ## Assuming there are three hosts: host1, host2, and host3
    node1 = pika.URLParameters('amqp://node1')
    node2 = pika.URLParameters('amqp://node2')
    node3 = pika.URLParameters('amqp://node3')
    all_endpoints = [node1, node2, node3]

    @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
    def consume():
        random.shuffle(all_endpoints)
        connection = pika.BlockingConnection(all_endpoints)
        channel = connection.channel()
        channel.basic_qos(prefetch_count=1)

        ## This queue is intentionally non-durable. See http://www.rabbitmq.com/ha.html#non-mirrored-queue-behavior-on-node-failure
        ## to learn more.
        channel.queue_declare('recovery-example', durable = False, auto_delete = True)
        channel.basic_consume('recovery-example', on_message)

        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()
            connection.close()
        except pika.exceptions.ConnectionClosedByBroker:
            # Uncomment this to make the example not attempt recovery
            # from server-initiated connection closure, including
            # when the node is stopped cleanly
            # except pika.exceptions.ConnectionClosedByBroker:
            #     pass
            continue

    consume()