File: connection_recovery_consume.php

package info (click to toggle)
php-amqplib 3.7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,060 kB
  • sloc: php: 13,145; makefile: 77; sh: 27
file content (112 lines) | stat: -rw-r--r-- 3,135 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
<?php

include(__DIR__ . '/config.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;

const WAIT_BEFORE_RECONNECT_uS = 1000000;

// Assume we have a cluster of nodes on ports 5672, 5673 and 5674.
// This should be possible to start on localhost using RABBITMQ_NODE_PORT
const PORT1 = 5672;
const PORT2 = 5673;
const PORT3 = 5674;

/*
    To handle arbitrary node restart you can use a combination of connection
    recovery and mulltiple hosts connection.
*/

function connect() {
    // If you want a better load-balancing, you cann reshuffle the list.
    return AMQPStreamConnection::create_connection([
        ['host' => HOST, 'port' => PORT1, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
        ['host' => HOST, 'port' => PORT2, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
        ['host' => HOST, 'port' => PORT3, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
    ],
    [
        'insist' => false,
        'login_method' => 'AMQPLAIN',
        'login_response' => null,
        'locale' => 'en_US',
        'connection_timeout' => 3.0,
        'read_write_timeout' => 3.0,
        'context' => null,
        'keepalive' => false,
        'heartbeat' => 0
    ]);
}

function cleanup_connection($connection) {
    // Connection might already be closed.
    // Ignoring exceptions.
    try {
        if($connection !== null) {
            $connection->close();
        }
    } catch (\ErrorException $e) {
    }
}

$connection = null;

while(true){
    try {
        $connection = connect();
        register_shutdown_function('shutdown', $connection);
        // Your application code goes here.
        do_something_with_connection($connection);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage() . PHP_EOL;
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\RuntimeException $e) {
        echo 'Runtime exception ' . PHP_EOL;
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\ErrorException $e) {
        echo 'Error exception ' . PHP_EOL;
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    }
}

function do_something_with_connection($connection) {
    $queue = 'receive';
    $consumerTag = 'consumer';
    $channel = $connection->channel();
    $channel->queue_declare($queue, false, true, false, false);
    $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
    while ($channel->is_consuming()) {
        $channel->wait();
    }
}


/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

/**
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($connection)
{
    $connection->close();
}