1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
use Test::More skip_all => "TODO";
BEGIN { use_ok 'Mojo::RabbitMQ::Client' }
sub failure {
my ($test, $details) = @_;
fail($test);
diag("Details: " . $details) if $details;
Mojo::IOLoop->stop;
}
sub handle_error {
my $desc = $_[0] // 'Error';
return sub {
failure($desc, $_[1]->method_frame->reply_text);
}
}
my $run_id = time();
my $exchange_name = 'mrc_test_' . $run_id;
my $queue_name = 'mrc_test_queue' . $run_id;
my $url = $ENV{MOJO_RABBITMQ_URL} || 'rabbitmq://guest:guest@127.0.0.1:5672/?exchange=' . $exchange_name . '&queue=' . $queue_name;
Mojo::IOLoop->timer( # Global test timeout
10 => sub {
failure('Test timeout');
}
);
my $client = Mojo::RabbitMQ::Client->new(url => $url);
$client->catch(handle_error('Connection or other server errors'));
$client->on(
open => sub {
pass('Client connected');
my $channel = Mojo::RabbitMQ::Client::Channel->new();
$channel->catch(handle_error("Channel error"));
$channel->on(close => handle_error("Channel error"));
$channel->on(
open => sub {
pass('Channel opened');
my $exchange = $channel->declare_exchange(
exchange => $exchange_name,
type => 'topic',
auto_delete => 1,
);
$exchange->catch(handle_error('Failed to declare exchange'));
$exchange->on(
success => sub {
pass('Exchange declared');
my $queue = $channel->declare_queue(queue => $queue_name,
auto_delete => 1,);
$queue->catch(handle_error('Failed to declare queue'));
$queue->on(
success => sub {
pass('Queue declared');
my $bind = $channel->bind_queue(
exchange => $exchange_name,
queue => $queue_name,
routing_key => $queue_name,
);
$bind->catch(handle_error('Failed to bind queue'));
$bind->on(
success => sub {
pass('Queue bound');
my $publish = $channel->publish(
exchange => $exchange_name,
routing_key => $queue_name,
body => 'Test message'
);
$publish->on(success => sub {
pass('Message published');
start_consumer();
$client->close();
});
$publish->deliver();
}
);
$bind->deliver();
}
);
$queue->deliver();
}
);
$exchange->deliver();
}
);
$client->open_channel($channel);
}
);
$client->connect();
sub start_consumer {
my $consumer = Mojo::RabbitMQ::Client->consumer(
url => $url,
defaults => {
qos => {prefetch_count => 1},
queue => {auto_delete => 1},
consumer => {no_ack => 0},
}
);
$consumer->catch(sub { failure('Consumer: Connection or other server errors') });
$consumer->on(connect => sub { pass('Consumer: Connected to server') });
$consumer->on(
'message' => sub {
my ($consumer, $message) = @_;
pass('Consumer: Got message');
$consumer->close();
}
);
$consumer->on(close => sub { pass('Consumer: Disconnected'); Mojo::IOLoop->stop });
$consumer->start();
}
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|