1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
<?php
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
const WAIT_BEFORE_RECONNECT_uS = 1000000;
// Assume we have a cluster of nodes on ports 5672, 5673 and 5674.
// This should be possible to start on localhost using RABBITMQ_NODE_PORT
const PORT1 = 5672;
const PORT2 = 5673;
const PORT3 = 5674;
/*
To handle arbitrary node restart you can use a combination of connection
recovery and mulltiple hosts connection.
*/
function connect() {
// If you want a better load-balancing, you cann reshuffle the list.
return AMQPStreamConnection::create_connection([
['host' => HOST, 'port' => PORT1, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
['host' => HOST, 'port' => PORT2, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
['host' => HOST, 'port' => PORT3, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
],
[
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 0
]);
}
function cleanup_connection($connection) {
// Connection might already be closed.
// Ignoring exceptions.
try {
if($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
$connection = null;
while(true){
try {
$connection = connect();
register_shutdown_function('shutdown', $connection);
// Your application code goes here.
do_something_with_connection($connection);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\RuntimeException $e) {
echo 'Runtime exception ' . PHP_EOL;
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\ErrorException $e) {
echo 'Error exception ' . PHP_EOL;
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
}
}
function do_something_with_connection($connection) {
$queue = 'receive';
$consumerTag = 'consumer';
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
while ($channel->is_consuming()) {
$channel->wait();
}
}
/**
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
/**
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($connection)
{
$connection->close();
}
|