1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
<?php
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'router';
$queue = 'msgs';
$consumerTag = 'consumer';
/**
* @var AbstractConnection $connection
*/
$connection = AMQPStreamConnection::create_connection([
['host' => HOST, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
], ['heartbeat' => 4]);
$sender = new PCNTLHeartbeatSender($connection);
$sender->register();
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
/**
* @param AMQPMessage $message
*/
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
echo "\nprocessing message\n";
// Message handling time will be 5x longer than the heartbeat timeout
$timeLeft = 20;
while ($timeLeft > 0) {
$timeLeft = sleep($timeLeft);
}
echo "\nmessage processed\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
/**
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
* @param AbstractConnection $connection
*/
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
$channel->consume();
|