1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
<?php
namespace PhpAmqpLib\Tests\Functional\Bug;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Tests\TestCaseCompat;
use PHPUnit\Framework\Attributes\Test;
/**
* @group connection
*/
class Bug40Test extends TestCaseCompat
{
protected $exchangeName = 'test_exchange';
protected $queueName1;
protected $queueName2;
protected $queue1Messages = 0;
protected $connection;
protected $channel;
protected $channel2;
protected function setUpCompat()
{
$this->connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$this->channel = $this->connection->channel();
$this->channel2 = $this->connection->channel();
$this->channel->exchange_declare($this->exchangeName, 'direct', false, false, false);
list($this->queueName1, ,) = $this->channel->queue_declare();
list($this->queueName2, ,) = $this->channel->queue_declare();
$this->channel->queue_bind($this->queueName1, $this->exchangeName, $this->queueName1);
$this->channel->queue_bind($this->queueName2, $this->exchangeName, $this->queueName2);
}
protected function tearDownCompat()
{
if ($this->channel) {
$this->channel->exchange_delete($this->exchangeName);
$this->channel->close();
$this->channel = null;
}
if ($this->channel2) {
$this->channel2->close();
$this->channel2 = null;
}
if ($this->connection) {
$this->connection->close();
$this->connection = null;
}
}
#[Test]
public function frame_order()
{
$msg = new AMQPMessage('test message');
$this->channel->basic_publish($msg, $this->exchangeName, $this->queueName1);
$this->channel->basic_publish($msg, $this->exchangeName, $this->queueName1);
$this->channel->basic_publish($msg, $this->exchangeName, $this->queueName2);
$this->channel->basic_consume(
$this->queueName1,
'',
false,
true,
false,
false,
[$this, 'processMessage1']
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function processMessage1($msg)
{
$this->queue1Messages++;
if ($this->queue1Messages === 1) {
$this->channel2->basic_consume(
$this->queueName2,
'',
false,
true,
false,
false,
[$this, 'processMessage2']
);
}
while ($this->channel2->is_consuming()) {
$this->channel2->wait();
}
if ($this->queue1Messages === 2) {
$delivery_info = $msg->delivery_info;
$delivery_info['channel']->basic_cancel($delivery_info['consumer_tag']);
}
}
public function processMessage2($msg)
{
$delivery_info = $msg->delivery_info;
$delivery_info['channel']->basic_cancel($delivery_info['consumer_tag']);
$this->assertLessThan(2, $this->queue1Messages);
}
}
|