1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
package Message::Passing::Output::AMQP;
use Moo;
use Types::Standard qw( Str CodeRef );
use namespace::autoclean;
with qw/
Message::Passing::AMQP::Role::DeclaresExchange
Message::Passing::Role::Output
/;
has routing_key => (
isa => Str,
is => 'ro',
default => '',
);
has header_cb => (
isa => CodeRef,
is => 'ro',
);
has serialize_cb => (
isa => CodeRef,
is => 'ro',
);
sub consume {
my $self = shift;
my $data = shift;
if (ref $data && ! defined $self->serialize_cb) {
warn("Passed non-serialized data - is a perl reference. Dropping.\n");
return;
}
unless ($self->_exchange) {
warn("No exchange yet, dropping message");
return;
}
my $header;
$header = $self->header_cb->($data)
if defined $self->header_cb;
$data = $self->serialize_cb->($data)
if defined $self->serialize_cb;
$self->_channel->publish(
body => $data,
header => $header,
exchange => $self->exchange_name,
routing_key => $self->routing_key,
);
}
1;
=head1 NAME
Message::Passing::Output::AMQP - output messages to AMQP.
=head1 SYNOPSIS
message-pass --input STDIN --output AMQP --output_options '{"exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'
=head1 DESCRIPTION
A L<Message::Passing> L<AnyEvent::RabbitMQ> output class.
Can be used as part of a chain of classes with the L<message-pass> utility, or directly as
a logger in normal perl applications.
=head1 ATTRIBUTES
=head2 routing_key
The routing key for all messages, defaults to ''.
=head2 header_cb
Optional callback function which gets passed the message before it is
serialized using L</serialize_cb>.
Should return a hashref which gets passed to publish( header => ).
NOTE: if you want to set the message headers (note the s) you have to pass them inside headers, e.g.:
{
content_type => 'application/json',
headers => {
key => 'value',
}
}
=head2 serialize_cb
Optional callback function which gets passed the message and should return a
scalar. This is useful when passing structured messages e.g. hashrefs or
objects where some attributes should be accessible for the L</header_cb> function.
If the serialization happens before using a L<Message::Passing::Role::Filter>
it would require to deserialize it again in header_cb.
To use a Message::Passing filter you can instantiate it and pass it's filter
function to serialize_cb:
my $filter = Message::Passing::Filter::Encoder::JSON->new(output_to => undef);
...
{
serialize_cb => sub { $filter->filter(shift) },
}
=head1 METHODS
=head2 consume
Sends a message.
=head1 SEE ALSO
=over
=item L<Message::Passing::AMQP>
=item L<Message::Passing::Input::AMQP>
=item L<Message::Passing>
=item L<AMQP>
=item L<http://www.zeromq.org/>
=back
=head1 AUTHOR, COPYRIGHT AND LICENSE
See L<Message::Passing::AMQP>.
=cut
|