File: Publisher.pm

package info (click to toggle)
libmojo-rabbitmq-client-perl 0.3.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, trixie
  • size: 308 kB
  • sloc: perl: 2,165; xml: 489; makefile: 2
file content (217 lines) | stat: -rw-r--r-- 5,334 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package Mojo::RabbitMQ::Client::Publisher;
use Mojo::Base -base;

use Mojo::Promise;
use Mojo::JSON qw(encode_json);
use Scalar::Util 'weaken';
require Mojo::RabbitMQ::Client;

use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;

has url      => undef;
has client   => undef;
has channel  => undef;
has setup    => 0;
has defaults => sub { {} };

sub publish_p {
  my $self    = shift;
  my $body    = shift;
  my $headers = {};
  my %args    = ();

  if (ref($_[0]) eq 'HASH') {
    $headers = shift;
  }
  if (@_) {
    %args = (@_);
  }

  my $promise = Mojo::Promise->new()->resolve;

  weaken $self;
  unless ($self->client) {
    $promise = $promise->then(
      sub {
        warn "-- spawn new client\n" if DEBUG;
        my $client_promise = Mojo::Promise->new();

        my $client = Mojo::RabbitMQ::Client->new(url => $self->url);
        $self->client($client);

        # Catch all client related errors
        $self->client->catch(sub { $client_promise->reject($_[1]) });

        # When connection is in Open state, open new channel
        $self->client->on(
          open => sub {
            warn "-- client open\n" if DEBUG;
            $client_promise->resolve;
          }
        );

        # Start connection
        $client->connect;

        return $client_promise;
      }
    );
  }

  # Create a new channel with auto-assigned id
  unless ($self->channel) {
    $promise = $promise->then(
      sub {
        warn "-- create new channel\n" if DEBUG;
        my $channel_promise = Mojo::Promise->new();

        my $channel         = Mojo::RabbitMQ::Client::Channel->new();

        $channel->catch(sub { $channel_promise->reject($_[1]) });

        $channel->on(
          open => sub {
            my ($channel) = @_;
            $self->channel($channel);

            warn "-- channel opened\n" if DEBUG;

            $channel_promise->resolve;
          }
        );
        $channel->on(close => sub { warn 'Channel closed: ' . $_[1]->method_frame->reply_text; });

        $self->client->open_channel($channel);

        return $channel_promise;
      }
    );
  }

  $promise = $promise->then(
    sub {
      warn "-- publish message\n" if DEBUG;
      my $query         = $self->client->url->query;
      my $exchange_name = $query->param('exchange');
      my $routing_key   = $query->param('routing_key');
      my %headers       = (content_type => 'text/plain', %$headers);

      if (ref($body)) {
        $headers{content_type} = 'application/json';
        $body = encode_json $body;
      }

      return $self->channel->publish_p(
        exchange    => $exchange_name,
        routing_key => $routing_key,
        mandatory   => 0,
        immediate   => 0,
        header      => \%headers,
        %args,
        body        => $body
      );
    }
  );

  return $promise;
}

1;

=encoding utf8

=head1 NAME

Mojo::RabbitMQ::Client::Publisher - simple Mojo::RabbitMQ::Client based publisher

=head1 SYNOPSIS

  use Mojo::RabbitMQ::Client::Publisher;
  my $publisher = Mojo::RabbitMQ::Client::Publisher->new(
    url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
  );

  $publisher->publish_p(
    {encode => { to => 'json'}},
    routing_key => 'mojo_mq'
  )->then(sub {
    say "Message published";
  })->catch(sub {
    die "Publishing failed"
  })->wait;

=head1 DESCRIPTION



=head1 ATTRIBUTES

L<Mojo::RabbitMQ::Client::Publisher> has following attributes.

=head2 url

Sets all connection parameters in one string, according to specification from
L<https://www.rabbitmq.com/uri-spec.html>.

For detailed description please see L<Mojo::RabbitMQ::Client#url>.

=head1 METHODS

L<Mojo::RabbitMQ::Client::Publisher> implements only single method.

=head2 publish_p

  $publisher->publish_p('simple plain text body');

  $publisher->publish_p({ some => 'json' });

  $publisher->publish_p($body, { header => 'content' }, routing_key => 'mojo', mandatory => 1);

Method signature

  publish_p($body!, \%headers?, *@params)

=over 2

=item body

First argument is mandatory body content of published message.
Any reference passed here will be encoded as JSON and accordingly C<content_type> header
will be set to C<application/json>.

=item headers

If second argument is a HASHREF it will be merged to message headers.

=item params

Any other arguments will be considered key/value pairs and passed to the Client's publish
method as arguments overriding everything besides body argument.

So this:

  $publisher->publish({ json => 'object' }, { header => 'content' });

is similar to this:

  $publisher->publish({ json => 'object' }, header => { header => 'content' });

But beware - headers passed as a HASHREF get merged into the header constructed by the Publisher,
but params override values; so if you pass C<header> as a param like this, it will override the
header constructed by the Publisher, and the message will lack the C<content_type> header, even
though you passed a reference as the body argument! With the first example, the C<content_type>
header would be included.

=back

=head1 SEE ALSO

L<Mojo::RabbitMQ::Client>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2015-2017, Sebastian Podjasek and others

This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.

=cut