File: Pool.pm

package info (click to toggle)
libmojo-ioloop-readwriteprocess-perl 1.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 540 kB
  • sloc: perl: 4,655; sh: 101; makefile: 2
file content (134 lines) | stat: -rw-r--r-- 3,186 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package Mojo::IOLoop::ReadWriteProcess::Pool;
use Mojo::Base 'Mojo::Collection';
use constant MAXIMUM_PROCESSES => $ENV{MOJO_PROCESS_MAXIMUM_PROCESSES} // 100;
use Scalar::Util qw(blessed);

my %max_proc;

sub new {
  my $s = shift->SUPER::new(@_);
  $max_proc{$s} = MAXIMUM_PROCESSES;
  $s;
}

sub get    { @{$_[0]}[$_[1]] }
sub remove { delete @{$_[0]}[$_[1]] }

sub add {
  return undef unless $_[0]->size < $max_proc{$_[0]};
  my $self = shift;
  push @{$self},
    blessed $_[0] ? $_[0] : Mojo::IOLoop::ReadWriteProcess->new(@_);
  $self->last;
}

sub maximum_processes {
  $max_proc{$_[0]} = pop() if $_[1];
  $max_proc{$_[0]};
}

sub _cmd {
  my $c    = shift;
  my $f    = pop;
  my @args = @_;
  my @r;
  $c->each(sub { push(@r, +shift()->$f(@args)) });
  wantarray ? @r : $c;
}

sub AUTOLOAD {
  our $AUTOLOAD;
  my $fn = $AUTOLOAD;
  $fn =~ s/.*:://;
  return if $fn eq "DESTROY";
  return eval { shift->_cmd(@_, $fn) };
}

1;

=encoding utf-8

=head1 NAME

Mojo::IOLoop::ReadWriteProcess::Pool - Pool of Mojo::IOLoop::ReadWriteProcess objects.

=head1 SYNOPSIS

    use Mojo::IOLoop::ReadWriteProcess qw(parallel);
    my $n_proc = 20;
    my $fired;

    my $p = parallel sub { print "Hello world\n"; } => $n_proc;

    # Subscribe to all "stop" events in the pool
    $p->once(stop => sub { $fired++; });

    # Start all processes belonging to the pool
    $p->start();

    # Receive the process output
    $p->each(sub { my $p = shift; $p->getline(); });
    $p->wait_stop;

    # Get the last one! (it's a Mojo::Collection!)
    $p->last()->stop();

=head1 METHODS

L<Mojo::IOLoop::ReadWriteProcess::Pool> inherits all methods from L<Mojo::Collection> and implements
the following new ones.
Note: It proxies all the other methods of L<Mojo::IOLoop::ReadWriteProcess> for the whole process group.

=head2 get

    use Mojo::IOLoop::ReadWriteProcess qw(parallel);
    my $pool = parallel(sub { print "Hello" } => 5);
    $pool->get(4);

Get the element specified in the pool (starting from 0).

=head2 add

    use Mojo::IOLoop::ReadWriteProcess qw(pool);
    my $pool = pool(maximum_processes => 2);
    $pool->add(sub { print "Hello 2! " });

Add the element specified in the pool.

=head2 remove

    use Mojo::IOLoop::ReadWriteProcess qw(parallel);
    my $pool = parallel(sub { print "Hello" } => 5);
    $pool->remove(4);

Remove the element specified in the pool.

=head2 maximum_processes

    use Mojo::IOLoop::ReadWriteProcess qw(parallel);
    my $pool = parallel(sub { print "Hello" } => 5);
    $pool->maximum_processes(30);
    $pool->add(...);

Prevent from adding processes to the pool. If we reach C<maximum_processes> number
of processes, C<add()> will refuse to add more to the pool.

=head1 ENVIRONMENT

You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
the maximum number of processes allowed in L<Mojo::IOLoop::ReadWriteProcess> instances.

    MOJO_PROCESS_MAXIMUM_PROCESSES=10000

=head1 LICENSE

Copyright (C) Ettore Di Giacinto.

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=head1 AUTHOR

Ettore Di Giacinto E<lt>edigiacinto@suse.comE<gt>

=cut