File: DirectExchangeTest.php

package info (click to toggle)
php-amqplib 3.7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,060 kB
  • sloc: php: 13,145; makefile: 77; sh: 27
file content (89 lines) | stat: -rw-r--r-- 3,036 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<?php

namespace PhpAmqpLib\Tests\Functional\Channel;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Tests\Functional\Channel\ChannelTestCase;
use PHPUnit\Framework\Attributes\Test;

/**
 * @group connection
 */
class DirectExchangeTest extends ChannelTestCase
{
    protected function setUpCompat()
    {
        parent::setUpCompat();

        $this->exchange->name = 'test_direct_exchange';
    }

    #[Test]
    public function exchange_declare_with_closed_connection()
    {
        $this->expectException(\PhpAmqpLib\Exception\AMQPChannelClosedException::class);

        $this->connection->close();

        $this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
    }

    #[Test]
    public function exchange_declare_with_closed_channel()
    {
        $this->expectException(\PhpAmqpLib\Exception\AMQPChannelClosedException::class);

        $this->channel->close();

        $this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
    }

    #[Test]
    public function basic_consume_foo()
    {
        $this->channel->exchange_declare($this->exchange->name, 'direct', false, false, false);
        list($this->queue->name, ,) = $this->channel->queue_declare();
        $this->channel->queue_bind($this->queue->name, $this->exchange->name, $this->queue->name);

        $this->message = (object) [
            'body' => 'foo',
            'properties' => [
                'content_type' => 'text/plain',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
                'correlation_id' => 'my_correlation_id',
                'reply_to' => 'my_reply_to',
            ],
        ];

        $msg = new AMQPMessage($this->message->body, $this->message->properties);

        $this->channel->basic_publish($msg, $this->exchange->name, $this->queue->name);

        $callback = function ($msg) {
            $this->assertEquals($this->message->body, $msg->body);
            $this->assertEquals(getmypid(), $msg->delivery_info['consumer_tag']);
            $this->assertEquals($this->queue->name, $msg->delivery_info['routing_key']);
            $this->assertEquals($this->exchange->name, $msg->delivery_info['exchange']);
            $this->assertEquals(false, $msg->delivery_info['redelivered']);
            $this->assertEquals($this->message->properties['content_type'], $msg->get('content_type'));
            $this->assertEquals($this->message->properties['correlation_id'], $msg->get('correlation_id'));
            $this->assertEquals($this->message->properties['reply_to'], $msg->get('reply_to'));
            $this->expectException('OutOfBoundsException');
            $msg->get('no_property');
        };

        $this->channel->basic_consume(
            $this->queue->name,
            getmypid(),
            false,
            false,
            false,
            false,
            $callback
        );

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }
}