File: amqpqueue_consume_multiple.phpt

package info (click to toggle)
php-amqp 2.1.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,536 kB
  • sloc: ansic: 7,295; xml: 1,162; php: 690; pascal: 49; makefile: 2
file content (85 lines) | stat: -rw-r--r-- 2,218 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
--TEST--
AMQPQueue::consume multiple
--SKIPIF--
<?php
if (!extension_loaded("amqp")) print "skip AMQP extension is not loaded";
elseif (!getenv("PHP_AMQP_HOST")) print "skip PHP_AMQP_HOST environment variable is not set";
?>
--FILE--
<?php
$id = bin2hex(random_bytes(32));

$cnn = new AMQPConnection();
$cnn->setHost(getenv('PHP_AMQP_HOST'));
$cnn->connect();

$ch = new AMQPChannel($cnn);
$ch2 = new AMQPChannel($cnn);
$ch3 = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('exchange-' . $id);
$ex->setType(AMQP_EX_TYPE_TOPIC);
$ex->declareExchange();

// Create and bind queues
$q1 = new AMQPQueue($ch);
$q1->setName('queue-one-' . $id);
$q1->declareQueue();
$q1->bind($ex->getName(), 'routing.one');

$q2 = new AMQPQueue($ch2);
$q2->setName('queue-two-' . $id);
$q2->declareQueue();
$q2->bind($ex->getName(), 'routing.two');

$q3 = new AMQPQueue($ch3);
$q3->setName('queue-three-' . $id);
$q3->declareQueue();
$q3->bind($ex->getName(), 'routing.three');


// Publish a message to the exchange with a routing key
$ex->publish('message1', 'routing.one');
$ex->publish('message2', 'routing.two');
$ex->publish('message3', 'routing.three');

$count = 0;

function consumeThings(AMQPEnvelope $message, AMQPQueue $queue)
{
    global $count;

    echo "Message: {$message->getBody()}, routing key: {$message->getRoutingKey()}, consumer tag: {$message->getConsumerTag()}\n";
    echo "Queue: {$queue->getName()}, consumer tag: {$queue->getConsumerTag()}\n";
    echo "Queue and message consumer tag ", ($queue->getConsumerTag() == $message->getConsumerTag() ? 'matches' : 'do not match'), "\n";
    echo PHP_EOL;

    $count++;

    $queue->ack($message->getDeliveryTag());

    if ($count >= 2) {
        return false;
    }

    return true;
}

$q1->consume();
$q2->consume('consumeThings');

// This is important!
$q1->cancel();
$q2->cancel();

?>
--EXPECTF--
Message: message1, routing key: routing.one, consumer tag: amq.ctag-%s
Queue: queue-one-%s, consumer tag: amq.ctag-%s
Queue and message consumer tag matches

Message: message2, routing key: routing.two, consumer tag: amq.ctag-%s
Queue: queue-two-%s, consumer tag: amq.ctag-%s
Queue and message consumer tag matches