File: ZeroMQ.pm

package info (click to toggle)
libmessage-passing-zeromq-perl 0.010-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 276 kB
  • sloc: perl: 1,621; makefile: 2
file content (170 lines) | stat: -rw-r--r-- 3,959 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package Message::Passing::Output::ZeroMQ;
use Moo;
use MooX::Types::MooseLike::Base qw/ :all /;
use namespace::clean -except => 'meta';

use ZMQ::FFI::Constants qw/ :all /;
use Time::HiRes;

with qw/
    Message::Passing::ZeroMQ::Role::HasASocket
    Message::Passing::Role::Output
/;

has '+_socket' => (
    handles => {
        '_zmq_send' => 'send',
    },
);

sub _socket_type { 'PUB' }

has socket_hwm => (
    is      => 'rw',
    default => 10000,
);

has subscribe_delay => (
    is      => 'ro',
    isa     => Num,
    default => 0.2,
    );

# socket_(probably)_subscribed, but who has the bytes for that
has socket_subscribed => (
    is  => 'rw',
    isa => Bool,
    );
has socket_connect_time => (
    is  => 'rw',
    isa => Num,
    );

sub BUILD {
    my $self = shift;
    # Force a socket to be built, so that there's more chance the first message will be sent
    if ($self->_should_connect){
        my $socket = $self->_socket;
        return;
    }

    return;
}

sub consume {
    my ($self, $data) = @_;

    # See the slow joiner problem for PUB/SUB, outlined in
    # http://zguide.zeromq.org/page:all#Getting-the-Message-Out
    if (!$self->socket_subscribed && $self->socket_connect_time){
        my $time = Time::HiRes::time;
        my $alive_time = $time - $self->socket_connect_time;
        my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
        # warn "Alive $alive_time, so sleep time $sleep_time";
        if ($sleep_time > 0){
            Time::HiRes::sleep $sleep_time;
        }
        $self->socket_subscribed(1);
    }

    return $self->_zmq_send($data);
}

sub setsockopt {
    my ($self, $socket) = @_;

    if ($self->zmq_major_version >= 3){
        $socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
    }
    else {
        $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
    }

    return;
}

after _build_socket => sub {
    my $self = shift;
    $self->socket_connect_time( Time::HiRes::time );
};

1;

=head1 NAME

Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.

=head1 SYNOPSIS

    use Message::Passing::Output::ZeroMQ;

    my $logger = Message::Passing::Output::ZeroMQ->new;
    $logger->consume({data => { some => 'data'}, '@metadata' => 'value' });

    # Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
    # simple logging.

    # Or use directly on command line:
    message-passing --input STDIN --output ZeroMQ --output_options \
        '{"connect":"tcp://192.168.0.1:5552"}'
    {"data":{"some":"data"},"@metadata":"value"}

=head1 DESCRIPTION

A L<Message::Passing> ZeroMQ output class.

Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
a logger in normal perl applications.

=head1 ATTRIBUTES


See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>.

=head2 subscribe_delay

Time in floating seconds to sleep to ensure the receiving socket has subscribed.
This is the longest the sleep might take.

See the slow-joiner problem: L<http://zguide.zeromq.org/page:all#Getting-the-Message-Out>.

DEFAULT: 0.2 seconds

=head1 METHODS

=head2 consume ($msg)

Sends a message, as-is. This means that you must have encoded the message to a string before
sending it. The C<message-pass> utility will do this for you into JSON, or you can
do it manually as shown in the example in L<Message::Passing::ZeroMQ>.

=head1 SEE ALSO

=over

=item L<Message::Passing::ZeroMQ>

=item L<Message::Passing::Input::ZeroMQ>

=item L<Message::Passing>

=item L<ZeroMQ>

=item L<http://www.zeromq.org/>

=back

=head1 SPONSORSHIP

This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API - 
<http://www.surevoip.co.uk/support/wiki/api_documentation>

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing>.

=cut