File: AMQP.pm

package info (click to toggle)
libmessage-passing-amqp-perl 0.008-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 252 kB
  • sloc: perl: 1,688; makefile: 2; sh: 1
file content (135 lines) | stat: -rw-r--r-- 2,968 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package Message::Passing::Output::AMQP;
use Moo;
use Types::Standard qw( Str CodeRef );
use namespace::autoclean;

with qw/
    Message::Passing::AMQP::Role::DeclaresExchange
    Message::Passing::Role::Output
/;

has routing_key => (
    isa => Str,
    is => 'ro',
    default => '',
);

has header_cb => (
    isa => CodeRef,
    is => 'ro',
);

has serialize_cb => (
    isa => CodeRef,
    is => 'ro',
);

sub consume {
    my $self = shift;
    my $data = shift;
    if (ref $data && ! defined $self->serialize_cb) {
        warn("Passed non-serialized data - is a perl reference. Dropping.\n");
        return;
    }
    unless ($self->_exchange) {
        warn("No exchange yet, dropping message");
        return;
    }
    my $header;
    $header = $self->header_cb->($data)
        if defined $self->header_cb;

    $data = $self->serialize_cb->($data)
        if defined $self->serialize_cb;

    $self->_channel->publish(
        body => $data,
        header => $header,
        exchange => $self->exchange_name,
        routing_key => $self->routing_key,
    );
}

1;

=head1 NAME

Message::Passing::Output::AMQP - output messages to AMQP.

=head1 SYNOPSIS

    message-pass --input STDIN --output AMQP --output_options '{"exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'

=head1 DESCRIPTION

A L<Message::Passing> L<AnyEvent::RabbitMQ> output class.

Can be used as part of a chain of classes with the L<message-pass> utility, or directly as
a logger in normal perl applications.

=head1 ATTRIBUTES

=head2 routing_key

The routing key for all messages, defaults to ''.

=head2 header_cb

Optional callback function which gets passed the message before it is
serialized using L</serialize_cb>.
Should return a hashref which gets passed to publish( header => ).

NOTE: if you want to set the message headers (note the s) you have to pass them inside headers, e.g.:

  {
      content_type => 'application/json',
      headers => {
          key => 'value',
      }
  }

=head2 serialize_cb

Optional callback function which gets passed the message and should return a
scalar. This is useful when passing structured messages e.g. hashrefs or
objects where some attributes should be accessible for the L</header_cb> function.
If the serialization happens before using a L<Message::Passing::Role::Filter>
it would require to deserialize it again in header_cb.
To use a Message::Passing filter you can instantiate it and pass it's filter
function to serialize_cb:

  my $filter = Message::Passing::Filter::Encoder::JSON->new(output_to => undef);

  ...

  {
      serialize_cb => sub { $filter->filter(shift) },
  }

=head1 METHODS

=head2 consume

Sends a message.

=head1 SEE ALSO

=over

=item L<Message::Passing::AMQP>

=item L<Message::Passing::Input::AMQP>

=item L<Message::Passing>

=item L<AMQP>

=item L<http://www.zeromq.org/>

=back

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::AMQP>.

=cut