1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
#!perl -w
use strict;
use Test::More 0.98;
use Test::SharedFork 0.31;
use Test::SharedObject;
use AnyEvent;
use AnyEvent::ForkManager;
use Time::HiRes;
my $MAX_WORKERS = 2;
my $JOB_COUNT = $MAX_WORKERS * 3;
my $TEST_COUNT =
($JOB_COUNT) + # in child process tests
($JOB_COUNT * 2) + # on_start
($JOB_COUNT * 3) + # on_finish
($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_enqueue
($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_dequeue
($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_working_max
4;# wait_all_children
plan tests => $TEST_COUNT;
my $pm = AnyEvent::ForkManager->new(
max_workers => $MAX_WORKERS,
on_start => sub{
my($pm, $pid, $exit_code) = @_;
note 'start on_start';
cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
is $$, $pm->manager_pid, 'called by manager';
note 'end on_start';
},
on_finish => sub{
my($pm, $pid, $status, $exit_code) = @_;
note 'start on_finish';
is $status >> 8, $exit_code, 'status';
cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
is $$, $pm->manager_pid, 'called by manager';
note 'end on_finish';
},
on_enqueue => sub{
my($pm, $exit_code) = @_;
note 'start on_enqueue';
is $pm->num_workers, $pm->max_workers, 'working max';
is $$, $pm->manager_pid, 'called by manager';
note 'end on_start';
},
on_dequeue => sub{
my($pm, $exit_code) = @_;
note 'start on_dequeue';
cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
is $$, $pm->manager_pid, 'called by manager';
note 'end on_dequeue';
},
on_working_max => sub{
my($pm, $exit_code) = @_;
note 'start on_working_max';
is $pm->num_workers, $pm->max_workers, 'working max';
is $$, $pm->manager_pid, 'called by manager';
note 'end on_working_max';
}
);
my $ready = Test::SharedObject->new(0);
my @all_data = (1 .. $JOB_COUNT);
my $started_all_process = 0;
foreach my $exit_code (@all_data) {
$pm->start(
cb => sub {
my($pm, $exit_code) = @_;
isnt $$, $pm->manager_pid, 'called by child';
Time::HiRes::usleep(100) until $ready->get();
note "exit_code: $exit_code";
$pm->finish($exit_code);
fail 'finish failed';
},
args => [$exit_code]
);
}
$ready->set(1);
my $cv = AnyEvent->condvar;
my $callback_called;
$callback_called++;
$pm->wait_all_children(
cb => sub {
my($pm) = @_;
note 'start wait_all_children callback';
is $$, $pm->manager_pid, 'called by manager';
is $pm->num_workers, 0, 'finished all child process';
is $pm->num_queues, 0, 'empty all child process queue';
note 'end wait_all_children callback';
$cv->send;
},
);
$cv->recv;
is $callback_called, 1, 'wait_all_children callback is called at once';
|