1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
use strict;
{
package TPCTParseWords;
use base qw(POE::Filter);
use Text::ParseWords;
my $VERSION = '1.02';
sub new {
my $class = shift;
my %opts = @_;
$opts{lc $_} = delete $opts{$_} for keys %opts;
$opts{keep} = 0 unless $opts{keep};
$opts{delim} = '\s+' unless $opts{delim};
$opts{BUFFER} = [];
bless \%opts, $class;
}
sub get {
my ($self, $raw) = @_;
my $events = [];
push @$events, [ parse_line( $self->{delim}, $self->{keep}, $_ ) ] for @$raw;
return $events;
}
sub get_one_start {
my ($self, $raw) = @_;
push @{ $self->{BUFFER} }, $_ for @$raw;
}
sub get_one {
my $self = shift;
my $events = [];
my $event = shift @{ $self->{BUFFER} };
push @$events, [ parse_line( $self->{delim}, $self->{keep}, $event ) ] if defined $event;
return $events;
}
sub put {
warn "PUT is unimplemented\n";
return;
}
sub clone {
my $self = shift;
my $nself = { };
$nself->{$_} = $self->{$_} for keys %{ $self };
$nself->{BUFFER} = [ ];
return bless $nself, ref $self;
}
}
use Socket;
use Test::More tests => 10;
use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
use_ok('Test::POE::Client::TCP');
POE::Session->create(
package_states => [
'main' => [qw(
_start
_accept
_failed
_sock_in
_sock_err
testc_registered
testc_connected
testc_disconnected
testc_input
testc_flushed
)],
],
);
$poe_kernel->run();
exit 0;
sub _start {
my ($kernel,$heap) = @_[KERNEL,HEAP];
$heap->{listener} = POE::Wheel::SocketFactory->new(
BindAddress => '127.0.0.1',
SuccessEvent => '_accept',
FailureEvent => '_failed',
SocketDomain => AF_INET, # Sets the socket() domain
SocketType => SOCK_STREAM, # Sets the socket() type
SocketProtocol => 'tcp', # Sets the socket() protocol
Reuse => 'on', # Lets the port be reused
);
$heap->{testc} = Test::POE::Client::TCP->spawn(
inputfilter => TPCTParseWords->new(),
outputfilter => POE::Filter::Line->new(),
);
isa_ok( $heap->{testc}, 'Test::POE::Client::TCP' );
return;
}
sub _accept {
my ($kernel,$heap,$socket) = @_[KERNEL,HEAP,ARG0];
my $wheel = POE::Wheel::ReadWrite->new(
Handle => $socket,
InputEvent => '_sock_in',
ErrorEvent => '_sock_err',
);
$heap->{wheels}->{ $wheel->ID } = $wheel;
$wheel->put('"This is just a test" "line" "so there"');
return;
}
sub _failed {
my ($kernel,$heap,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,HEAP,ARG0..ARG3];
die "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
return;
}
sub _sock_in {
my ($heap,$input,$wheel_id) = @_[HEAP,ARG0,ARG1];
pass('Got input from client');
# $heap->{wheels}->{ $wheel_id }->put( $input ) if $heap->{wheels}->{ $wheel_id };
delete $heap->{wheels}->{ $wheel_id } if $input eq 'quit';
return;
}
sub _sock_err {
my ($heap,$wheel_id) = @_[HEAP,ARG3];
pass('Client disconnected');
delete $heap->{wheels}->{ $wheel_id };
return;
}
sub testc_registered {
my ($kernel,$sender,$object) = @_[KERNEL,SENDER,ARG0];
pass($_[STATE]);
isa_ok( $object, 'Test::POE::Client::TCP' );
my $port = ( sockaddr_in( $_[HEAP]->{listener}->getsockname() ) )[0];
$kernel->post( $sender, 'connect', { address => '127.0.0.1', port => $port } );
return;
}
sub testc_connected {
my ($kernel,$sender) = @_[KERNEL,SENDER];
pass($_[STATE]);
# $kernel->post( $sender, 'send_to_server', 'Hello, is it me you are looking for?' );
return;
}
sub testc_flushed {
pass($_[STATE]);
return;
}
sub testc_input {
my ($heap,$input) = @_[HEAP,ARG0];
pass('Got something back from the server');
# ok( $input eq 'Hello, is it me you are looking for?', $input );
ok( ( $input->[0] eq 'This is just a test' and $input->[1] eq 'line' and $input->[2] eq 'so there' ) , 'Test Get' );
$heap->{testc}->send_to_server('quit');
return;
}
sub testc_disconnected {
my ($heap,$state) = @_[HEAP,STATE];
pass($state);
delete $heap->{wheels};
delete $heap->{listener};
$heap->{testc}->shutdown();
return;
}
|