1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
use Test::More tests => 12;
BEGIN { use_ok 'Mojo::RabbitMQ::Client' }
sub failure {
my ($test, $details) = @_;
fail($test);
diag("Details: " . $details) if $details;
Mojo::IOLoop->stop;
}
my $run_id = time();
my $exchange_name = 'mrc_test_' . $run_id;
my $queue_name = 'mrc_test_queue' . $run_id;
my $amqp
= Mojo::RabbitMQ::Client->new(
url => ($ENV{MOJO_RABBITMQ_URL} || 'rabbitmq://guest:guest@127.0.0.1:5672/')
);
$amqp->ioloop->timer( # Global test timeout
10 => sub {
failure('Test timeout');
}
);
$amqp->catch(sub { failure('Connection or other server errors') });
$amqp->on(connect => sub { pass('Connected to server') });
$amqp->on(
open => sub {
my ($self) = @_;
pass('Protocol opened');
my $channel = Mojo::RabbitMQ::Client::Channel->new();
$channel->on(
open => sub {
my ($channel) = @_;
pass('Channel opened');
my $exchange = $channel->declare_exchange(
exchange => $exchange_name,
type => 'topic',
auto_delete => 1,
);
$exchange->catch(sub { failure('Failed to declare exchange') });
$exchange->on(
success => sub {
pass('Exchange declared');
my $queue = $channel->declare_queue(queue => $queue_name,
auto_delete => 1,);
$queue->catch(sub { failure('Failed to declare queue') });
$queue->on(
success => sub {
pass('Queue declared');
my $bind = $channel->bind_queue(
exchange => $exchange_name,
queue => $queue_name,
routing_key => $queue_name,
);
$bind->catch(sub { failure('Failed to bind queue') });
$bind->on(
success => sub {
pass('Queue bound');
my $publish = $channel->publish(
exchange => $exchange_name,
routing_key => $queue_name,
body => 'Test message',
mandatory => 0,
immediate => 0,
header => {}
);
$publish->catch(sub { failure('Message not published') });
$publish->on(
success => sub {
pass('Message published');
}
);
$publish->on(return => sub { failure('Message returned') }
);
$publish->deliver();
my $consumer = $channel->consume(queue => $queue_name,);
$consumer->on(
success => sub {
pass('Subscribed to queue');
}
);
$consumer->on(
message => sub {
pass('Got message');
$amqp->close;
}
);
$consumer->catch(sub { failure('Subscription failed') });
$consumer->deliver;
}
);
$bind->deliver();
}
);
$queue->deliver();
}
);
$exchange->deliver();
}
);
$channel->on(close =>
sub { failure('Channel closed', $_[1]->method_frame->reply_text) });
$channel->catch(sub { failure('Channel error') });
$self->open_channel($channel);
}
);
$amqp->on(close => sub { pass('Connection closed') });
$amqp->on(disconnect => sub { pass('Disconnected'); Mojo::IOLoop->stop });
$amqp->connect();
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|