File: filterchange.perl

package info (click to toggle)
libpoe-perl 2%3A0.19-1
  • links: PTS
  • area: main
  • in suites: woody
  • size: 1,376 kB
  • ctags: 1,294
  • sloc: perl: 18,032; makefile: 43
file content (432 lines) | stat: -rwxr-xr-x 12,086 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
#!/usr/bin/perl -w
# $Id: filterchange.perl,v 1.6 2001/05/07 12:23:04 rcaputo Exp $

# This program tests the new filter-changing capabilities of
# Wheel::ReadWrite

use strict;
use lib '..';

##############################################################################
# This is the caller.
# It causes the other side to switch filters insessantly

package Cause;
use strict;

use POE qw(Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Stream);
use Data::Dumper;
use Storable qw(freeze thaw);

###############################################
# Create our top session
sub create
{
    my($port)=@_;
    POE::Session->new
    (
        _start=>\&c_start,
        _stop=>\&c_stop,
        error=>\&c_error,
        connected=>\&c_connected,
        [$port],
    );
}

###############################################
# Start the top session
sub c_start
{
    my($heap, $port)=@_[HEAP, ARG0];
    $heap->{wheel} = POE::Wheel::SocketFactory->new
    ( RemotePort     => $port,
      RemoteAddress  => '127.0.0.1',
      SuccessEvent   => 'connected',    # generating this event on connection
      FailureEvent   => 'error'         # generating this event on error
    );
}

###############################################
# Simple notice when we stop
sub c_stop
{
    print "Cause  [$$] stoped\n";
}
###############################################
# Errors at connect time
sub c_error
{
    my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
    print "Cause  [$$] encountered $operation error $errnum: $errstr\n";
    delete $heap->{wheel};
}

###############################################
# Connected the Effect.
# Create a small session that sends orders
sub c_connected
{
    my ($heap, $handle) = @_[HEAP, ARG0];
    POE::Session->new
    (   __PACKAGE__, [qw(_start error received)],
        [$handle]
    );
}


################################################
# Creating the session that sends stuff
sub _start
{
    my($heap, $handle)=@_[HEAP, ARG0];
    $heap->{wheel_client} = POE::Wheel::ReadWrite->new
    ( Handle     => $handle,                    # on this handle
      Driver     => POE::Driver::SysRW->new(),  # using sysread and syswrite
      InputEvent => 'received',

      Filter     => POE::Filter::Stream->new(),
      ErrorEvent => 'error',            # generate this event on error
    );


    #############################
    # This is the list of stuff we want to send to the other side
    $heap->{send_these}=
    [
        # starts in Stream mode
        ## Switch between each type w/o any chance of buffering (easy)
        '"IWANT Line"',
        '"IWANT Stream\n"',
        '"IWANT Reference"',
        '{my $f = freeze(\ "IWANT Stream"); return length($f) . "\0" . $f}',
        '"IWANT Reference"',
        '{my $f = freeze(\ "IWANT Line"); return length($f) . "\0" . $f}',
        # now in Line mode

        ## Switch between 2 types w/ some extra stuff
        # NOTE-1 that switching Stream -> something will loose the
        # end of the something because Filter::Stream doesn't do any buffering
        # NOTE-2 Switching from Line -> something will cause problems
        # if the trailing data contains newlines.  While we can avoid
        # this if we switch to Stream, we can't when we switch to Reference

        '"IWANT Stream\nHELLO"',
        '"IWANT Reference"',
        ( '{my $f = freeze(\ "IWANT Line"); return length($f) ' .
          '. "\0" . $f . "HELLO\n"}'
        ),
        '"IWANT Reference\n"',
        ( '{my $f = freeze(\ "IWANT Stream"); return length($f) ' .
          '. "\0" . $f . "HELLO"}'
        ),
        '"DONE"',
    ];
}

################################################
# I/O error or maybe disconnect
sub error
{
    my ($heap, $kernel, $operation, $errnum, $errstr) =
        @_[HEAP, KERNEL, ARG0, ARG1, ARG2];

    if ($errnum)
    {
        print "Cause  [$$] encountered $operation error $errnum: $errstr\n"
    }
    else
    {
        print "Cause  [$$] remote closed its connection\n"
    }
                                        # either way, shut down
    delete $heap->{wheel_client};
}

################################################
# Other side sent us something

sub received
{
    my($heap, $buffer)=@_[HEAP, ARG0];

    my $ok=1;
    if($buffer=~s/^(\d+)\0//s)          # maybe from Filter::Reference
    {
        my $n=$1;
        $buffer=thaw(substr($buffer, 0, $n));
        $buffer=$$buffer;
    }

    if($buffer =~ /DONE/)               # Last message
    {
        delete $heap->{wheel_client};   # disconnect
        return;
    }
    if($buffer =~ /HI/)                 # response to our "HELLO"
    {
        print "Cause  [$$] how nice...\n";
        $ok=1;
    }
    if($buffer =~/NOT/)                 # something bad happened :(
    {
        print "Cause  [$$] something went wrong :(\n";
        exit;
    }
    if($buffer =~ /OK/)                 # it made the switch, now give it
    {                                   # another order
        my $send=shift @{$heap->{send_these}};
        if($send)
        {
            print "Cause  [$$] send '$send'\n";
            #print "Cause  [$$] (running $send)\n";
            $send=eval($send);
            die $@ if $@;
            # print "Cause  [$$] send '", quotemeta($send), "'\n";
            $heap->{wheel_client}->put($send);
        } else
        {
            print "Finished...";            # unless we've run out of orders
            delete $heap->{wheel_client};   # Disconnect
        }
        $ok=1;
    }
    unless($ok)                             # Hmm... this message doesn't
    {                                       # make sense
        $buffer=quotemeta $buffer;
        print "Cause  [$$] received '$buffer'...\n";
        exit;
    }
}


##############################################################################

##############################################################################
## This is the listener side of the connection.  It receives orders from
## Cause, and jumps between Filters

package Effect;
use strict;
use POE qw(Wheel::SocketFactory Wheel::ReadWrite
           Driver::SysRW Filter::Stream Filter::Line Filter::Reference
          );

################################################
# Create our top session
sub create
{
    my($port)=@_;
    POE::Session->new
    (
        '_start'=>\&e_start,
        'error'=>\&e_error,
        'accept'=>\&e_accept,
        [$port],
    );
}

################################################
# Start our top session
sub e_start
{
    my($heap, $port)=@_[HEAP, ARG0];
    $heap->{wheel} = POE::Wheel::SocketFactory->new
    ( BindPort     => $port,
      BindAddress  => '127.0.0.1',
      Reuse         => 1,
      SuccessEvent   => 'accept',       # generating this event on connection
      FailureEvent   => 'error'         # generating this event on error
    );
}

################################################
# Some sort of error
sub e_error
{
    my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
    print "Effect [$$] encountered $operation error $errnum: $errstr\n";
    delete $heap->{wheel};
}

################################################
# Effect has connected to us, so we now create a session for it
sub e_accept
{
    my ($heap, $handle) = @_[HEAP, ARG0];
    POE::Session->new
    (   __PACKAGE__, [qw(_start _stop error r_stream r_line r_reference)],
        [$handle]
    );
}

################################################
# Start of the connection session
# We start off with a Stream filter, because that is the simplest
sub _start
{
    my($heap, $session, $handle, $wheel)=@_[HEAP, SESSION, ARG0];

    # Create all the filters now
    $heap->{filters}=
    {
        Stream=>['r_stream', POE::Filter::Stream->new(), 0],
        Line=>['r_line', POE::Filter::Line->new(), 0],
        Reference=>['r_reference', POE::Filter::Reference->new(), 1],
    };

    $heap->{wheel_client} = POE::Wheel::ReadWrite->new
    ( Handle     => $handle,                    # on this handle
      Driver     => POE::Driver::SysRW->new(),  # using sysread and syswrite
      ErrorEvent => 'error',            # generate this event on error

      InputEvent => $heap->{filters}->{Stream}->[0],
      Filter     => $heap->{filters}->{Stream}->[1],
    );
    _response($heap, "OK");                     # start the dialog
}

################################################
# Internal function -- Send a message back
sub _response
{
    my($heap, $resp)=@_;
    return unless $resp;
    print "Effect [$$] Send $resp\n";
    if($heap->{'ref'})
    {
        print "Effect [$$] As a reference...\n";
        $resp=\ "$resp";
    }

    $heap->{wheel_client}->put($resp);
}

################################################
# Internal funciont -- Decode and follow the order
sub _received
{
    my($heap, $current, $line)=@_;
    my $resp="OK";

    ## IWANT means Effect wants us to change filters
    if($line =~ /^IWANT (Line|Reference|Stream)$/)
    {
        my $type=$1;
        if($current ne $type)               # only do it if we aren't already
        {
            my $f=$heap->{filters}->{$type};
            if($f)
            {
                print "Effect [$$] Switching to $type\n";
                $heap->{wheel_client}->event(InputEvent=>$f->[0]);
                $heap->{wheel_client}->set_filter($f->[1]);
                $heap->{'ref'}=$f->[2];
            } else
            {
                                        # Effect is messed up
                print "Effect [$$] Unknown filter $type\n";
                $resp='NOT';
            }
        } else
        {
            print "Effect [$$] Already a $type\n";
        }
    } elsif($line eq 'HELLO')               # This is pending data
    {
        $resp='HI';
    } elsif($line eq 'DONE')                # Game over :)
    {
        print "Effect [$$] Done!\n";
        $resp='DONE';
    } else                                  # Something else... :(
    {
        print "Effect [$$] Hey! Received $current '$line'\n";
        $resp='NOT';
    }
    _response($heap, $resp);
}


################################################
# InputEvent when we are using Filter::Stream
sub r_stream
{
    my($heap, $data)=@_[HEAP, ARG0];
    _received($heap, 'Stream', $data);
}


################################################
# InputEvent when we are using Filter::Line
sub r_line
{
    my($heap, $line)=@_[HEAP, ARG0];
    _received($heap, 'Line', $line);
}

################################################
# InputEvent when we are using Filter::Referenece
sub r_reference
{
    my($heap, $reference)=@_[HEAP, ARG0];
    _received($heap, 'Reference', $$reference);
}


################################################
# I/O error or disconnection
sub error
{
    my ($heap, $kernel, $operation, $errnum, $errstr) =
        @_[HEAP, KERNEL, ARG0, ARG1, ARG2];

    if ($errnum)
    {
        print "Effect [$$] encountered $operation error $errnum: $errstr\n";
    }
    else
    {
        print "Effect [$$] Remote closed its connection.\n";
    }
    delete $heap->{wheel_client};       # either way, shut down
}


################################################
# When this session shuts down, we also want to kill the kernel
sub _stop
{
    my ($kernel, $heap) = @_[KERNEL, HEAP];
    $kernel->signal($kernel, 'HUP');
    delete $heap->{wheel_client};
}



##############################################################################
package main;
use strict;

use POE;

my $port=12345;
my $me;

my $pid=fork();                         # Split in two
if(not defined $pid)                    # wha?  we can't!
{
    die "Unable to fork: $!\n";
} elsif($pid)                           # Parent side
{
    Effect::create($port);              # Create a listener
    $me='Effect';
} else                                  # Child side
{
    sleep(2);                           # wait for Effect to come up
    Cause::create($port);               # create the caller
    $me='Cause ';
}

print "$me [$$] POE->run\n";
$poe_kernel->run();
print "$me [$$] Exit\n";