File: Listener.pm

package info (click to toggle)
libio-async-perl 0.29-1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 684 kB
  • ctags: 239
  • sloc: perl: 6,439; makefile: 2
file content (450 lines) | stat: -rw-r--r-- 11,810 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2008-2010 -- leonerd@leonerd.org.uk

package IO::Async::Listener;

use strict;
use warnings;
use base qw( IO::Async::Handle );

our $VERSION = '0.29';

use IO::Async::Handle;

use POSIX qw( EAGAIN );
use Socket::GetAddrInfo qw( :Socket6api AI_PASSIVE );

use Socket qw( SOL_SOCKET SO_ACCEPTCONN SO_REUSEADDR );

use Carp;

=head1 NAME

C<IO::Async::Listener> - listen on network sockets for incoming connections

=head1 SYNOPSIS

 use IO::Async::Listener;
 use IO::Async::Stream;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new();

 my $listener = IO::Async::Listener->new(
    on_accept => sub {
       my ( $newclient ) = @_;

       $loop->add( IO::Async::Stream->new(
          handle => $newclient,

          on_read => sub {
             my ( $self, $buffref, $closed ) = @_;
             $self->write( $$buffref );
             $$buffref = "";
             return 0;
          },
       ) );
    },
 );

 $loop->add( $listener );

 $listener->listen(
    service  => "echo",
    socktype => 'stream',

    on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; },
    on_listen_error  => sub { print STDERR "Cannot listen\n"; },
 );

 $loop->loop_forever;

This object can also be used indirectly via an C<IO::Async::Loop>:

 use IO::Async::Stream;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new();

 $loop->listen(
    service  => "echo",
    socktype => 'stream',

    on_accept => sub {
       ...
    },

    on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; },
    on_listen_error  => sub { print STDERR "Cannot listen\n"; },
 );

 $loop->loop_forever;

=head1 DESCRIPTION

This subclass of L<IO::Async::Handle> adds behaviour which watches a socket in
listening mode, to accept incoming connections on them.

A Listener can be constructed and given a existing socket in listening mode.
Alternatively, the Listener can construct a socket by calling the C<listen>
method. Either a list of addresses can be provided, or a service name can be
looked up using the underlying loop's C<resolve> method.

This object may be used in one of two ways; with a callback function, or as a
base class.

=over 4

=item Callbacks

If the C<on_accept> key is supplied to the constructor, it should contain a
CODE reference to a callback function to be invoked when a new client connects
to the socket. It is passed an C<IO::Socket> reference to the newly accepted
socket:

 $on_accept->( $self, $clientsocket )

=item Base Class

If a subclass is built, then it can override the C<on_accept> method.

 $self->on_accept( $clientsocket )

=back

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=over 8

=item on_accept => CODE

A callback that is invoked whenever a new client connects to the socket. If
not supplied  the subclass method will be called instead.

=item handle => IO

The IO handle containing an existing listen-mode socket.

=back

=cut

sub configure
{
   my $self = shift;
   my %params = @_;

   if( exists $params{on_accept} ) {
      $self->{on_accept} = delete $params{on_accept};
   }

   croak "Cannot set 'on_read_ready' on a Listener" if exists $params{on_read_ready};

   if( exists $params{handle} ) {
      my $handle = delete $params{handle};
      # Sanity check it - it may be a bare GLOB ref, not an IO::Socket-derived handle
      defined getsockname( $handle ) or croak "IO handle $handle does not have a sockname";

      # So now we know it's at least some kind of socket. Is it listening?
      # SO_ACCEPTCONN would tell us, but not all OSes implement it. Since it's
      # only a best-effort sanity check, we won't mind if the OS doesn't.
      my $acceptconn = getsockopt( $handle, SOL_SOCKET, SO_ACCEPTCONN );
      !defined $acceptconn or unpack( "I", $acceptconn ) or croak "Socket is not accepting connections";

      # This is a bit naughty but hopefully nobody will mind...
      bless $handle, "IO::Socket" if ref( $handle ) eq "GLOB";

      $self->SUPER::configure( read_handle => $handle );
   }

   if( !$self->{on_accept} and !$self->can( 'on_accept' ) ) {
      croak 'Expected either a on_accept callback or an ->on_accept method';
   }

   if( keys %params ) {
      croak "Cannot pass though configuration keys to underlying Handle - " . join( ", ", keys %params );
   }
}

sub on_read_ready
{
   my $self = shift;

   my $newclient = $self->read_handle->accept();

   if( defined $newclient ) {
      $newclient->blocking( 0 );

      # TODO: make class/callback
      if( $self->{on_accept} ) {
         $self->{on_accept}->( $self, $newclient );
      }
      else {
         $self->on_accept( $newclient );
      }
   }
   elsif( $! == EAGAIN ) {
      # No client ready after all. Perhaps we're sharing the listen
      # socket with other processes? Anyway; not fatal, just ignore it
   }
   else {
      # TODO: make a callback
      die "Cannot accept - $!";
   }
}

=head1 METHODS

=cut

sub is_listening
{
   my $self = shift;

   return ( defined $self->sockname );
}

sub sockname
{
   my $self = shift;

   my $handle = $self->read_handle or return undef;
   return $handle->sockname;
}

=head2 $listener->listen( %params )

This method sets up a listening socket using the addresses given, and will
invoke the C<on_accept> callback each time a new connection is accepted on the
socket. Addresses may be given directly, or they may be looked up using the
system's name resolver.

If multiple addresses are given, or resolved from the service and hostname,
then each will be attempted in turn until one succeeds.

In plain address mode, the C<%params> hash takes the following keys:

=over 8

=item addrs => ARRAY

Reference to an array of (possibly-multiple) address structures to attempt to
listen on. Each should be in the layout described for C<addr>. Such a layout
is returned by the C<getaddrinfo> named resolver.

=item addr => ARRAY

Shortcut for passing a single address to listen on; it may be passed directly
with this key, instead of in another array of its own.

The address (or each element of the C<addrs> array) should be a reference to
an array, with at least the following elements:

 [ $family, $socktype, $protocol, $address ]

The first three arguments will be passed to a C<socket()> call and, if
successful, the fourth to a C<bind()> call on the resulting socket. The socket
will then be C<listen()>ed to put it into listening mode. Any trailing
elements in this array will be ignored.

=back

In named resolver mode, the C<%params> hash takes the following keys:

=over 8

=item service => STRING

The service name to listen on.

=item host => STRING

The hostname to listen on. Optional. Will listen on all addresses if not
supplied.

=item family => INT

=item socktype => INT

=item protocol => INT

=item flags => INT

Optional. Other arguments to pass along with C<host> and C<service> to the
C<getaddrinfo()> call.

=item socktype => STRING

Optionally may instead be one of the values C<'stream'>, C<'dgram'> or
C<'raw'> to stand for C<SOCK_STREAM>, C<SOCK_DGRAM> or C<SOCK_RAW>. This
utility is provided to allow the caller to avoid a separate C<use Socket> only
for importing these constants.

=item on_resolve_error => CODE

A continuation that is invoked when the name resolution attempt fails. This is
invoked in the same way as the C<on_error> continuation for the C<resolve>
method.

=back

In either case, the following keys are also taken:

=over 8

=item on_listen => CODE

Optional. A callback that is invoked when the listening socket is ready.

 $on_listen->( $listener )

=item on_listen_error => CODE

A continuation this is invoked after all of the addresses have been tried, and
none of them succeeded. Becasue there is no one error message that stands out
as particularly noteworthy, none is given to this continuation. To track
individual errors, see the C<on_fail> callback.

=item on_fail => CODE

Optional. A callback that is invoked if a syscall fails while attempting to
create a listening sockets. It is passed the name of the syscall that failed,
the arguments that were passed to it, and the error generated. I.e.

 $on_fail->( "socket", $family, $socktype, $protocol, $! );

 $on_fail->( "sockopt", $sock, $optname, $optval, $! );

 $on_fail->( "bind", $sock, $address, $! );

 $on_fail->( "listen", $sock, $queuesize, $! );

=item queuesize => INT

Optional. The queue size to pass to the C<listen()> calls. If not supplied,
then 3 will be given instead.

=item reuseaddr => BOOL

Optional. If true or not supplied then the C<SO_REUSEADDR> socket option will
be set. To prevent this, pass a false value such as 0.

=back

=cut

sub listen
{
   my $self = shift;
   my ( %params ) = @_;

   my $loop = $self->get_loop;
   defined $loop or croak "Cannot listen when not a member of a Loop"; # TODO: defer?

   # Shortcut
   if( $params{addr} and not $params{addrs} ) {
      $params{addrs} = [ delete $params{addr} ];
   }

   my $on_listen = $params{on_listen}; # optional
   !defined $on_listen or ref $on_listen or croak "Expected 'on_listen' to be a reference";

   my $on_listen_error = $params{on_listen_error};
   ref $on_listen_error or croak "Expected 'on_listen_error' as a reference";

   my $on_fail = $params{on_fail};
   !defined $on_fail or ref $on_fail or croak "Expected 'on_fail' to be a reference";

   my $queuesize = $params{queuesize} || 3;

   if( my $addrlist = $params{addrs} ) {
      my $reuseaddr = 1;
      $reuseaddr = 0 if defined $params{reuseaddr} and not $params{reuseaddr};

      foreach my $addr ( @$addrlist ) {
         my ( $family, $socktype, $proto, $address ) = @$addr;

         my $sock;

         unless( $sock = $loop->socket( $family, $socktype, $proto ) ) {
            $on_fail->( "socket", $family, $socktype, $proto, $! ) if $on_fail;
            next;
         }

         if( $reuseaddr ) {
            unless( $sock->sockopt( SO_REUSEADDR, 1 ) ) {
               $on_fail->( "sockopt", $sock, SO_REUSEADDR, 1, $! ) if $on_fail;
               next;
            }
         }

         unless( $sock->bind( $address ) ) {
            $on_fail->( "bind", $sock, $address, $! ) if $on_fail;
            next;
         }

         unless( $sock->listen( $queuesize ) ) {
            $on_fail->( "listen", $sock, $queuesize, $! ) if $on_fail;
            next;
         }

         $self->SUPER::configure( read_handle => $sock );

         $on_listen->( $self ) if defined $on_listen;

         return;
      }

      # If we got this far, then none of the addresses succeeded
      $on_listen_error->();
   }

   elsif( defined $params{service} ) {
      my $on_resolve_error = delete $params{on_resolve_error};
      ref $on_resolve_error or croak "Expected 'on_resolve_error' as a reference";

      my $host = delete $params{host} || "";

      my $service = delete $params{service};
      defined $service or $service = ""; # might be 0

      my $family   = delete $params{family} || 0;
      my $socktype = delete $params{socktype} || 0;
      my $protocol = delete $params{protocol} || 0;

      my $flags = ( delete $params{flags} || 0 ) | AI_PASSIVE;

      $loop->resolve(
         type => 'getaddrinfo',
         data => [ $host, $service, $family, $socktype, $protocol, $flags ],

         on_resolved => sub {
            $self->listen( 
               %params,
               addrs => [ @_ ],
            );
         },

         on_error => $on_resolve_error,
      );
   }

   else {
      croak "Expected either 'service' or 'addrs' or 'addr' arguments";
   }
}

# Keep perl happy; keep Britain tidy
1;

__END__

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>