1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
<?php
namespace PhpAmqpLib\Tests\Functional\Channel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Tests\Functional\Channel\ChannelTestCase;
use PHPUnit\Framework\Attributes\Test;
/**
* @group connection
*/
class DirectExchangeTest extends ChannelTestCase
{
protected function setUpCompat()
{
parent::setUpCompat();
$this->exchange->name = 'test_direct_exchange';
}
#[Test]
public function exchange_declare_with_closed_connection()
{
$this->expectException(\PhpAmqpLib\Exception\AMQPChannelClosedException::class);
$this->connection->close();
$this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
}
#[Test]
public function exchange_declare_with_closed_channel()
{
$this->expectException(\PhpAmqpLib\Exception\AMQPChannelClosedException::class);
$this->channel->close();
$this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
}
#[Test]
public function basic_consume_foo()
{
$this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
list($this->queue->name, ,) = $this->channel->queue_declare();
$this->channel->queue_bind($this->queue->name, $this->exchange->name, $this->queue->name);
$this->message = (object) [
'body' => 'foo',
'properties' => [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
'correlation_id' => 'my_correlation_id',
'reply_to' => 'my_reply_to',
],
];
$msg = new AMQPMessage($this->message->body, $this->message->properties);
$this->channel->basic_publish($msg, $this->exchange->name, $this->queue->name);
$callback = function ($msg) {
$this->assertEquals($this->message->body, $msg->body);
$this->assertEquals(getmypid(), $msg->delivery_info['consumer_tag']);
$this->assertEquals($this->queue->name, $msg->delivery_info['routing_key']);
$this->assertEquals($this->exchange->name, $msg->delivery_info['exchange']);
$this->assertEquals(false, $msg->delivery_info['redelivered']);
$this->assertEquals($this->message->properties['content_type'], $msg->get('content_type'));
$this->assertEquals($this->message->properties['correlation_id'], $msg->get('correlation_id'));
$this->assertEquals($this->message->properties['reply_to'], $msg->get('reply_to'));
$this->expectException('OutOfBoundsException');
$msg->get('no_property');
};
$this->channel->basic_consume(
$this->queue->name,
getmypid(),
false,
false,
false,
false,
$callback
);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
}
|