1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
use Test::More tests => 9;
BEGIN {
use_ok 'Mojo::RabbitMQ::Client';
use_ok 'Mojo::RabbitMQ::Client::Publisher';
}
SKIP: {
skip "Not requested by user, set TEST_RMQ=1 environment variable to test", 7 unless $ENV{TEST_RMQ};
my $run_id = time();
my $exchange_name = 'mrc_test_' . $run_id;
my $queue_name = 'mrc_test_queue' . $run_id;
my $url
= $ENV{MOJO_RABBITMQ_URL}
|| 'amqp://guest:guest@127.0.0.1:5672/?exchange='
. $exchange_name
. '&routing_key='
. $queue_name;
# setup
my $client = Mojo::RabbitMQ::Client->new(url => $url);
$client->connect_p->then(
sub {
shift->acquire_channel_p();
}
)->then(
sub {
shift->declare_exchange_p(
exchange => $exchange_name,
type => 'topic',
auto_delete => 1
);
}
)->then(
sub {
shift->declare_queue_p(queue => $queue_name, auto_delete => 1);
}
)->then(
sub {
shift->bind_queue_p(
exchange => $exchange_name,
queue => $queue_name,
routing_key => $queue_name,
);
}
)->wait;
# tests
my @tests = (
['plain text', 'plain text', 'text/plain'],
['hash as json', {json => 'object'}, 'application/json'],
['array as json', ['array'], 'application/json'],
);
my $publisher = Mojo::RabbitMQ::Client::Publisher->new(url => $url);
foreach my $t (@tests) {
$publisher->publish_p($t->[1])->then(sub { pass('published: ' . $t->[0]) })->wait;
}
$publisher->publish_p(
{json => 'object'},
{content_type => 'text/plain'},
routing_key => '#'
)->then(sub { pass('json published into the void') })->wait;
# verify
my $channel;
Mojo::RabbitMQ::Client->new(url => $url)->connect_p->then(
sub {
shift->acquire_channel_p();
}
)->then(
sub {
$channel = shift;
}
)->wait;
foreach my $t (@tests) {
$channel->get_p(queue => $queue_name, no_ack => 1)->then(
sub {
my $channel = shift;
my $frame = shift;
my $message = shift;
if ($message and $message->{header}->{content_type} eq $t->[2]) {
pass("received valid content_type: " . $t->[2]);
} else {
diag explain $frame;
diag explain $message;
pass("received something not valid, expecting " . $t->[2] . " got " . ($message->{header}->{content_type} // '(undef)'));
# SHOULD fail
}
}
)->wait;
}
# There should be nothing else waiting
$channel->get_p(queue => $queue_name, no_ack => 1)->then(
sub {
my $channel = shift;
diag explain \@_;
pass("received something extra") if defined $_[1]; # SHOULD fail
}
)->wait;
}
__END__
my $channel = Mojo::RabbitMQ::Client->new(url => $url)->connect_p->then(sub { shift->acquire_channel_p() }->wait;
foreach my $t (@tests) {
my ($channel, $frame, $message) = $channel->get_p(queue => $queue_name, no_ack => 1)->wait;
if ($message and $message->{header}->{content_type} eq $t->[2]) {
pass("received valid content_type");
} else {
fail("received something not valid");
}
}
|