File: AMQP.pm

package info (click to toggle)
libmessage-passing-amqp-perl 0.003-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 268 kB
  • ctags: 139
  • sloc: perl: 1,654; makefile: 12
file content (70 lines) | stat: -rw-r--r-- 1,416 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package Message::Passing::Input::AMQP;
use Moose;
use AnyEvent;
use Scalar::Util qw/ weaken refaddr /;
use Try::Tiny;
use namespace::autoclean;

with qw/
    Message::Passing::AMQP::Role::BindsAQueue
    Message::Passing::Role::Input
/;


after '_set_queue' => sub {
    my $self = shift;
    weaken($self);
    $self->_channel->consume(
        on_consume => sub {
            my $message = shift;
            try {
                $self->output_to->consume($message->{body}->payload);
            }
            catch {
                warn("Error in consume_message callback: $_");
            };
        },
        consumer_tag => refaddr($self),
        on_success => sub {
        },
        on_failure => sub {
            Carp::cluck("Failed to start message consumer in $self response " . Dumper(@_));
        },
    );
};

__PACKAGE__->meta->make_immutable;
1;

=head1 NAME

Message::Passing::Input::AMQP - input logstash messages from AMQP.

=head1 SYNOPSIS

    message-pass --output STDOUT --input AMQP --input_options '{"queue_name":"test","exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'

=head1 DESCRIPTION

=head1 SEE ALSO

=over

=item L<Message::Passing::AMQP>

=item L<Message::Passing::Output::AMQP>

=item L<Message::Passing>

=item L<AMQP>

=item L<http://www.zeromq.org/>

=back

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::AMQP>.

=cut