1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
#!/usr/bin/env perl
use strict;
use warnings;
use Test::More;
BEGIN {
use_ok 'MCE::Step';
}
## preparation
my $in_file = MCE->tmp_dir . '/input.txt';
my $fh_data = \*DATA;
open my $fh, '>', $in_file;
binmode $fh;
print {$fh} "1\n2\n3\n4\n5\n6\n7\n8\n9\n";
close $fh;
## output iterator to ensure output order
sub output_iterator {
my ($gather_ref) = @_;
my %tmp; my $order_id = 1;
@{ $gather_ref } = (); ## reset array
return sub {
my ($data_ref, $chunk_id) = @_;
$tmp{ $chunk_id } = $data_ref;
while (1) {
last unless exists $tmp{$order_id};
push @{ $gather_ref }, @{ $tmp{$order_id} };
delete $tmp{$order_id++};
}
return;
};
}
## sub-tasks
sub task_a {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @ans; chomp @{ $chunk_ref };
push @ans, map { $_ * 2 } @{ $chunk_ref };
MCE->step(\@ans, $chunk_id); # forward to task_b
}
sub task_b {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @ans;
push @ans, map { $_ * 3 } @{ $chunk_ref };
MCE->gather(\@ans, $chunk_id); # send to output_iterator
}
## Reminder; MCE::Step processes sub-tasks from left-to-right
my $answers = '6 12 18 24 30 36 42 48 54';
my @a;
MCE::Step->init(
max_workers => [ 2 , 2 ], # run with 2 workers for both sub-tasks
task_name => [ 'a' , 'b' ]
);
mce_step { gather => output_iterator(\@a) }, \&task_a, \&task_b, ( 1..9 );
is( join(' ', @a), $answers, 'check results for array' );
mce_step { gather => output_iterator(\@a) }, \&task_a, \&task_b, [ 1..9 ];
is( join(' ', @a), $answers, 'check results for array ref' );
mce_step_f { gather => output_iterator(\@a) }, \&task_a, \&task_b, $in_file;
is( join(' ', @a), $answers, 'check results for path' );
mce_step_f { gather => output_iterator(\@a) }, \&task_a, \&task_b, $fh_data;
is( join(' ', @a), $answers, 'check results for glob' );
mce_step_s { gather => output_iterator(\@a) }, \&task_a, \&task_b, 1, 9;
is( join(' ', @a), $answers, 'check results for sequence' );
MCE::Step->finish;
## process hash, current API available since 1.828
MCE::Step->init(
max_workers => 1
);
my %hash = map { $_ => $_ } ( 1 .. 9 );
my %res = mce_step sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my %ret;
for my $key ( keys %{ $chunk_ref } ) {
$ret{$key} = $chunk_ref->{$key} * 2;
}
MCE->gather(%ret);
}, \%hash;
@a = map { $res{$_} } ( 1 .. 9 );
is( join(' ', @a), "2 4 6 8 10 12 14 16 18", 'check results for hash ref' );
MCE::Step->finish;
## cleanup
unlink $in_file;
done_testing;
__DATA__
1
2
3
4
5
6
7
8
9
|