1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
package Message::Passing::Output::ZeroMQ;
use Moo;
use MooX::Types::MooseLike::Base qw/ :all /;
use namespace::clean -except => 'meta';
use ZMQ::FFI::Constants qw/ :all /;
use Time::HiRes;
with qw/
Message::Passing::ZeroMQ::Role::HasASocket
Message::Passing::Role::Output
/;
has '+_socket' => (
handles => {
'_zmq_send' => 'send',
},
);
sub _socket_type { 'PUB' }
has socket_hwm => (
is => 'rw',
default => 10000,
);
has subscribe_delay => (
is => 'ro',
isa => Num,
default => 0.2,
);
# socket_(probably)_subscribed, but who has the bytes for that
has socket_subscribed => (
is => 'rw',
isa => Bool,
);
has socket_connect_time => (
is => 'rw',
isa => Num,
);
sub BUILD {
my $self = shift;
# Force a socket to be built, so that there's more chance the first message will be sent
if ($self->_should_connect){
my $socket = $self->_socket;
return;
}
return;
}
sub consume {
my ($self, $data) = @_;
# See the slow joiner problem for PUB/SUB, outlined in
# http://zguide.zeromq.org/page:all#Getting-the-Message-Out
if (!$self->socket_subscribed && $self->socket_connect_time){
my $time = Time::HiRes::time;
my $alive_time = $time - $self->socket_connect_time;
my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
# warn "Alive $alive_time, so sleep time $sleep_time";
if ($sleep_time > 0){
Time::HiRes::sleep $sleep_time;
}
$self->socket_subscribed(1);
}
return $self->_zmq_send($data);
}
sub setsockopt {
my ($self, $socket) = @_;
if ($self->zmq_major_version >= 3){
$socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
}
else {
$socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
}
return;
}
after _build_socket => sub {
my $self = shift;
$self->socket_connect_time( Time::HiRes::time );
};
1;
=head1 NAME
Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.
=head1 SYNOPSIS
use Message::Passing::Output::ZeroMQ;
my $logger = Message::Passing::Output::ZeroMQ->new;
$logger->consume({data => { some => 'data'}, '@metadata' => 'value' });
# Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
# simple logging.
# Or use directly on command line:
message-passing --input STDIN --output ZeroMQ --output_options \
'{"connect":"tcp://192.168.0.1:5552"}'
{"data":{"some":"data"},"@metadata":"value"}
=head1 DESCRIPTION
A L<Message::Passing> ZeroMQ output class.
Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
a logger in normal perl applications.
=head1 ATTRIBUTES
See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>.
=head2 subscribe_delay
Time in floating seconds to sleep to ensure the receiving socket has subscribed.
This is the longest the sleep might take.
See the slow-joiner problem: L<http://zguide.zeromq.org/page:all#Getting-the-Message-Out>.
DEFAULT: 0.2 seconds
=head1 METHODS
=head2 consume ($msg)
Sends a message, as-is. This means that you must have encoded the message to a string before
sending it. The C<message-pass> utility will do this for you into JSON, or you can
do it manually as shown in the example in L<Message::Passing::ZeroMQ>.
=head1 SEE ALSO
=over
=item L<Message::Passing::ZeroMQ>
=item L<Message::Passing::Input::ZeroMQ>
=item L<Message::Passing>
=item L<ZeroMQ>
=item L<http://www.zeromq.org/>
=back
=head1 SPONSORSHIP
This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API -
<http://www.surevoip.co.uk/support/wiki/api_documentation>
=head1 AUTHOR, COPYRIGHT AND LICENSE
See L<Message::Passing>.
=cut
|