1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
package Mojo::IOLoop::ReadWriteProcess::Queue;
use Mojo::Base -base;
use Mojo::IOLoop::ReadWriteProcess::Pool;
use Mojo::IOLoop::ReadWriteProcess;
use Mojo::IOLoop::ReadWriteProcess::Session;
use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
has queue => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has pool => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has done => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
sub _dequeue {
my ($self, $process) = @_;
$self->pool($self->pool->grep(sub { $process ne $_ }));
shift @{$self->queue}
if ($self->queue->first && $self->pool->add($self->queue->first));
}
sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 }
sub consume {
my $self = shift;
$self->session->enable;
$self->done->maximum_processes(
$self->queue->maximum_processes + $self->pool->maximum_processes);
until ($self->exhausted) {
sleep .5;
$self->session->consume_collected_info;
$self->session->_protect(
sub {
$self->pool->each(
sub {
my $p = shift;
return unless $p;
return if exists $p->{started};
$p->{started}++;
$p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) });
$p->start;
});
});
}
}
sub add {
my $self = shift;
$self->pool->add(@_) // $self->queue->add(@_);
}
sub AUTOLOAD {
our $AUTOLOAD;
my $fn = $AUTOLOAD;
$fn =~ s/.*:://;
return if $fn eq "DESTROY";
my $self = shift;
return (
eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) },
(grep(/once|on|emit/, $fn))
? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }
: ());
}
1;
=encoding utf-8
=head1 NAME
Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.
=head1 SYNOPSIS
use Mojo::IOLoop::ReadWriteProcess qw(queue process);
my $n_proc = 20;
my $fired;
my $q = queue;
$q->pool->maximum_processes(2); # Max 2 processes in parallel
$q->queue->maximum_processes(10); # Max queue is 10
$q->add( process sub { return 42 } ) for 1..7;
# Subscribe to all "stop" events in the pool
$q->once(stop => sub { $fired++; });
# Consume the queue
$q->consume();
my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess
# Set your own running pool
$q->pool(parallel sub { return 42 } => 5);
# Set your own queue
$q->queue(parallel sub { return 42 } => 20);
$q->consume();
=head1 METHODS
L<Mojo::IOLoop::ReadWriteProcess::Queue> inherits all methods from L<Mojo::Base> and implements
the following new ones.
Note: It proxies all the other methods of L<Mojo::IOLoop::ReadWriteProcess> for the whole process group.
=head2 add
use Mojo::IOLoop::ReadWriteProcess qw(queue process);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
Add the process to the queue.
=head2 consume
use Mojo::IOLoop::ReadWriteProcess qw(queue);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
$q->consume; # executes and exhaust the processes
Starts the processes and empties the queue.
Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
and for the queue (that gets exhausted during the C<consume()> phase).
$q->pool->maximum_processes(2); # Max 2 processes in parallel
$q->queue->maximum_processes(10); # Max queue is 10
=head2 exhausted
use Mojo::IOLoop::ReadWriteProcess qw(queue);
my $q = queue();
$q->add(sub { print "Hello 2! " });
$q->add(process sub { print "Hello 2! " });
$q->consume; # executes and exhaust the processes
$q->exhausted; # 1
Returns 1 if the queue is exhausted.
=head1 ENVIRONMENT
You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
the maximum number of processes allowed in the pool and the queue, that are
L<Mojo::IOLoop::ReadWriteProcess::Pool> instances.
MOJO_PROCESS_MAXIMUM_PROCESSES=10000
=head1 LICENSE
Copyright (C) Ettore Di Giacinto.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=head1 AUTHOR
Ettore Di Giacinto E<lt>edigiacinto@suse.comE<gt>
=cut
|