1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
<?php
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/../tests/config.php';
$exchange = 'bench_exchange';
$queue = 'bench_queue';
$consumer_tag = '';
$conn = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$ch = $conn->channel();
$ch->queue_declare($queue, false, false, false, false);
$ch->exchange_declare($exchange, 'direct', false, false, false);
$ch->queue_bind($queue, $exchange);
class Consumer
{
protected $msgCount = 0;
protected $startTime = null;
/**
* @param \PhpAmqpLib\Message\AMQPMessage $msg
*/
public function process_message($msg)
{
if ($this->startTime === null) {
$this->startTime = microtime(true);
}
if ($msg->body == 'quit') {
echo sprintf("Pid: %s, Count: %s, Time: %.4f\n", getmypid(), $this->msgCount, microtime(true) - $this->startTime);
die;
}
$this->msgCount++;
}
}
$ch->basic_consume($queue, '', false, true, false, false, array(new Consumer(), 'process_message'));
function shutdown($ch, $conn)
{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
while ($ch->is_consuming()) {
$ch->wait();
}
|