1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
# No-op plugins in action.
use warnings;
use strict;
use lib 't';
use share;
use IO::Stream::Noop;
@CheckPoint = (
[ 'EVENT', RESOLVED, undef ], 'EventLog::EVENT(RESOLVED)',
[ 'WRITE' ], 'EventLog::WRITE',
[ 'EVENT', CONNECTED|OUT|SENT, undef], 'EventLog::EVENT(CONNECTED|OUT|SENT)',
[ 'client', SENT ], 'client: SENT',
[ 'server', EOF ], 'server: EOF',
[ 'server', 'test' ], ' got "test"',
[ 'server', SENT ], 'server: SENT',
[ 'EVENT', IN, undef ], 'EventLog::EVENT(IN)',
[ 'EVENT', EOF, undef ], 'EventLog::EVENT(EOF)',
[ 'client', EOF ], 'client: EOF',
[ 'client', 'echo: test' ], ' got "echo: test"',
);
plan tests => 2 + @CheckPoint/2;
my $srv_sock = tcp_server('127.0.0.1', 0);
my $srv_w = EV::io($srv_sock, EV::READ, sub {
if (accept my $sock, $srv_sock) {
IO::Stream->new({
fh => $sock,
cb => \&server,
wait_for => EOF,
in_buf_limit=> 1024,
});
}
elsif ($! != EAGAIN) {
die "accept: $!\n";
}
});
my $io = IO::Stream->new({
host => '127.0.0.1',
port => sockport($srv_sock),
cb => \&client,
wait_for => SENT,
in_buf_limit=> 1024,
out_buf => 'test',
plugin => [
noop => IO::Stream::Noop->new(),
eventlog => IO::Stream::EventLog->new(),
],
});
is(ref $io->{plugin}{noop}, 'IO::Stream::Noop',
'{plugin}{noop} available');
is(ref $io->{plugin}{eventlog}, 'IO::Stream::EventLog',
'{plugin}{eventlog} available');
EV::loop;
sub server {
my ($io, $e, $err) = @_;
die $err if $err;
checkpoint($e);
if ($e & EOF) {
checkpoint($io->{in_buf});
$io->{wait_for} = SENT;
$io->write("echo: $io->{in_buf}");
}
if ($e & SENT) {
$io->close();
}
}
sub client {
my ($io, $e, $err) = @_;
die $err if $err;
checkpoint($e);
if ($e & SENT) {
$io->{wait_for} = EOF;
shutdown $io->{fh}, 1;
}
if ($e & EOF) {
checkpoint($io->{in_buf});
$io->close();
EV::unloop();
}
}
package IO::Stream::EventLog;
use base 'IO::Stream::Noop';
sub WRITE {
main::checkpoint();
shift->SUPER::WRITE(@_);
}
sub EVENT {
main::checkpoint($_[1], $_[2]);
shift->SUPER::EVENT(@_);
}
|