File: dequeue_timed.t

package info (click to toggle)
libmce-perl 1.901-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,316 kB
  • sloc: perl: 14,091; makefile: 7
file content (109 lines) | stat: -rw-r--r-- 2,491 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env perl

use strict;
use warnings;
use Test::More;

BEGIN {
   use_ok 'MCE';
   use_ok 'MCE::Flow';
   use_ok 'MCE::Queue';
}

my @a = ();
my $q = MCE::Queue->new( queue => \@a );

sub check_enqueue {
   my ($description) = @_;
   is( join('', @a), '12345', $description );
}

sub check_dequeue_nb {
   my ($description, $value) = @_;
   is( $value, '12345', $description );
   is( join('', @a), '', 'queue emptied' );
}

sub check_dequeue_timed {
   my ($description, $success) = @_;
   is( $success, 1, $description );
}

## Manager tests

{
   $q->enqueue('12345');
   check_enqueue('manager: check enqueue');
   check_dequeue_nb('manager: check dequeue_nb', $q->dequeue_timed);

   my $start = MCE::Util::_time();
   my $ret = $q->dequeue_timed(2.0); # no timed support for the manager process
   my $success = (!$ret && MCE::Util::_time() - $start < 1.0) ? 1 : 0;
   check_dequeue_timed('manager: check dequeue_timed', $success);
}

## Worker tests

MCE::Flow->init( max_workers => 1 );

mce_flow sub {
   my ($mce) = @_;

   $q->enqueue('12345');
   MCE->do('check_enqueue', 'worker: check enqueue');
   MCE->do('check_dequeue_nb', 'worker: check dequeue_nb', $q->dequeue_timed);

   my $start = MCE::Util::_time();
   my $ret = $q->dequeue_timed(2.0);
   my $success = (!$ret && MCE::Util::_time() - $start > 1.0) ? 1 : 0;
   MCE->do('check_dequeue_timed', 'worker: check dequeue_timed', $success);

   return;
};

MCE::Flow->finish;

## Parallel demo

my $s = MCE::Util::_time();
my @r;

MCE->new(
   user_tasks => [{
      # consumers
      max_workers => 8,
      chunk_size  => 1,
      sequence    => [ 1, 40 ],
      gather      => \@r,
      user_func   => sub {
         # each worker calls dequeue_timed approximately 5 times
         if (defined(my $ret = $q->dequeue_timed(1.0))) {
            MCE->printf("$ret: time %0.3f, pid $$\n", MCE::Util::_time());
            MCE->gather($ret);
         }
      }
   },{
      # provider
      max_workers => 1,
      user_func   => sub {
         $q->enqueue($_) for 'a'..'d';
         sleep 1;
         $q->enqueue('e');
         sleep 1;
         $q->enqueue('f');
         sleep 1;
         $q->enqueue('g');
      }
   }]
)->run;

my $duration = MCE::Util::_time() - $s;
printf "%0.3f seconds\n", $duration;

my $success = (abs(5.0 - $duration) < 2.0) ? 1 : 0;
is( $success, 1, 'parallel demo duration' );
is( scalar(@r), 7, 'gathered size' );
is( join('', sort @r), 'abcdefg', 'gathered data' );

done_testing;