File: Queue.pm

package info (click to toggle)
libmojo-ioloop-readwriteprocess-perl 1.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 540 kB
  • sloc: perl: 4,655; sh: 101; makefile: 2
file content (162 lines) | stat: -rw-r--r-- 4,466 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package Mojo::IOLoop::ReadWriteProcess::Queue;
use Mojo::Base -base;
use Mojo::IOLoop::ReadWriteProcess::Pool;
use Mojo::IOLoop::ReadWriteProcess;
use Mojo::IOLoop::ReadWriteProcess::Session;

use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};

has queue   => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has pool    => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has done    => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };

sub _dequeue {
  my ($self, $process) = @_;

  $self->pool($self->pool->grep(sub { $process ne $_ }));
  shift @{$self->queue}
    if ($self->queue->first && $self->pool->add($self->queue->first));
}

sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 }

sub consume {
  my $self = shift;
  $self->session->enable;
  $self->done->maximum_processes(
    $self->queue->maximum_processes + $self->pool->maximum_processes);
  until ($self->exhausted) {
    sleep .5;
    $self->session->consume_collected_info;
    $self->session->_protect(
      sub {
        $self->pool->each(
          sub {
            my $p = shift;
            return unless $p;
            return if exists $p->{started};
            $p->{started}++;
            $p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) });
            $p->start;
          });
      });
  }
}

sub add {
  my $self = shift;
  $self->pool->add(@_) // $self->queue->add(@_);
}

sub AUTOLOAD {
  our $AUTOLOAD;
  my $fn = $AUTOLOAD;
  $fn =~ s/.*:://;
  return if $fn eq "DESTROY";
  my $self = shift;
  return (
    eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) },
    (grep(/once|on|emit/, $fn))
    ? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }
    : ());
}

1;

=encoding utf-8

=head1 NAME

Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.

=head1 SYNOPSIS

    use Mojo::IOLoop::ReadWriteProcess qw(queue process);
    my $n_proc = 20;
    my $fired;

    my $q = queue;

    $q->pool->maximum_processes(2); # Max 2 processes in parallel
    $q->queue->maximum_processes(10); # Max queue is 10

    $q->add( process sub { return 42 } ) for 1..7;

    # Subscribe to all "stop" events in the pool
    $q->once(stop => sub { $fired++; });

    # Consume the queue
    $q->consume();

    my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess

    # Set your own running pool
    $q->pool(parallel sub { return 42 } => 5);

    # Set your own queue
    $q->queue(parallel sub { return 42 } => 20);

    $q->consume();

=head1 METHODS

L<Mojo::IOLoop::ReadWriteProcess::Queue> inherits all methods from L<Mojo::Base> and implements
the following new ones.
Note: It proxies all the other methods of L<Mojo::IOLoop::ReadWriteProcess> for the whole process group.

=head2 add

    use Mojo::IOLoop::ReadWriteProcess qw(queue process);
    my $q = queue();
    $q->add(sub { print "Hello 2! " });
    $q->add(process sub { print "Hello 2! " });

Add the process to the queue.

=head2 consume

    use Mojo::IOLoop::ReadWriteProcess qw(queue);
    my $q = queue();
    $q->add(sub { print "Hello 2! " });
    $q->add(process sub { print "Hello 2! " });
    $q->consume; # executes and exhaust the processes

Starts the processes and empties the queue.
Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
and for the queue (that gets exhausted during the C<consume()> phase).

    $q->pool->maximum_processes(2); # Max 2 processes in parallel
    $q->queue->maximum_processes(10); # Max queue is 10

=head2 exhausted

    use Mojo::IOLoop::ReadWriteProcess qw(queue);
    my $q = queue();
    $q->add(sub { print "Hello 2! " });
    $q->add(process sub { print "Hello 2! " });
    $q->consume; # executes and exhaust the processes
    $q->exhausted; # 1

Returns 1 if the queue is exhausted.

=head1 ENVIRONMENT

You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
the maximum number of processes allowed in the pool and the queue, that are
L<Mojo::IOLoop::ReadWriteProcess::Pool> instances.

    MOJO_PROCESS_MAXIMUM_PROCESSES=10000

=head1 LICENSE

Copyright (C) Ettore Di Giacinto.

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=head1 AUTHOR

Ettore Di Giacinto E<lt>edigiacinto@suse.comE<gt>

=cut