1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
<?php
namespace PhpAmqpLib\Tests\Functional\Connection\Heartbeat;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Tests\Functional\AbstractConnectionTestCase;
use PHPUnit\Framework\Attributes\RequiresPhp;
use PHPUnit\Framework\Attributes\Test;
/**
* @group connection
* @group signals
* @requires extension pcntl
*/
#[RequiresPhp('7.1')]
class SignalHeartbeatTest extends AbstractConnectionTestCase
{
/** @var AbstractConnection */
protected $connection;
/** @var string */
protected $exchangeName = 'test_pcntl_exchange';
/** @var string */
protected $queueName;
/** @var AMQPChannel */
protected $channel;
/** @var PCNTLHeartbeatSender */
protected $sender;
/** @var int */
protected $heartbeatTimeout = 4;
protected function setUpCompat()
{
$this->connection = $this->connection_create(
'stream',
HOST,
PORT,
['timeout' => 3, 'heartbeat' => $this->heartbeatTimeout]
);
$this->sender = new PCNTLHeartbeatSender($this->connection);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare($this->exchangeName, 'direct', false, false, false);
list($this->queueName, ,) = $this->channel->queue_declare();
$this->channel->queue_bind($this->queueName, $this->exchangeName, $this->queueName);
}
protected function tearDownCompat()
{
if ($this->sender) {
$this->sender->unregister();
}
if ($this->channel) {
$this->channel->exchange_delete($this->exchangeName);
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
$this->sender = null;
$this->channel = null;
$this->connection = null;
}
/**
* @covers \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender::isSupported
* @covers \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender::register
* @covers \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender::registerListener
* @covers \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender::unregister
*/
#[Test]
public function process_message_longer_than_heartbeat_timeout()
{
$this->sender->register();
$msg = new AMQPMessage($this->heartbeatTimeout, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
$this->channel->basic_publish($msg, $this->exchangeName, $this->queueName);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
[$this, 'processMessage']
);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function processMessage($msg)
{
$timeLeft = (int)$msg->body * 3;
while ($timeLeft > 0) {
$timeLeft = sleep($timeLeft);
}
$delivery_info = $msg->delivery_info;
$delivery_info['channel']->basic_ack($delivery_info['delivery_tag']);
$delivery_info['channel']->basic_cancel($delivery_info['consumer_tag']);
self::assertEquals($this->heartbeatTimeout, (int)$msg->body);
}
}
|