File: amqp_publisher_with_confirms_mandatory.php

package info (click to toggle)
php-amqplib 3.7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,060 kB
  • sloc: php: 13,145; makefile: 77; sh: 27
file content (73 lines) | stat: -rw-r--r-- 2,217 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

include(__DIR__ . '/config.php');

$exchange = 'someExchange';

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->set_ack_handler(
    function (AMQPMessage $message) {
        echo 'Message acked with content ' . $message->body . PHP_EOL;
    }
);

$channel->set_nack_handler(
    function (AMQPMessage $message) {
        echo 'Message nacked with content ' . $message->body . PHP_EOL;
    }
);

$channel->set_return_listener(
    function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
        echo 'Message returned with content ' . $message->body . PHP_EOL;
    }
);

/*
 * bring the channel into publish confirm mode.
 * if you would call $ch->tx_select() befor or after you brought the channel into this mode
 * the next call to $ch->wait() would result in an exception as the publish confirm mode and transactions
 * are mutually exclusive
 */
$channel->confirm_select();

/*
    name: $exchange
    type: fanout
    passive: false // don't check if an exchange with the same name exists
    durable: false // the exchange won't survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/

$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);

$i = 1;
$message = new AMQPMessage($i, array('content_type' => 'text/plain'));
$channel->basic_publish($message, $exchange, null, true);

/*
 * watching the amqp debug output you can see that the server will ack the message with delivery tag 1 and the
 * multiple flag probably set to false
 */

$channel->wait_for_pending_acks_returns();

while ($i <= 11) {
    $message = new AMQPMessage($i++, array('content_type' => 'text/plain'));
    $channel->basic_publish($message, $exchange, null, true);
}

/*
 * you do not have to wait for pending acks after each message sent. in fact it will be much more efficient
 * to wait for as many messages to be acked as possible.
 */
$channel->wait_for_pending_acks_returns();

$channel->close();
$connection->close();