1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#!/usr/bin/perl
use strict;
use warnings;
use threads 1.39;
use threads::shared;
use Thread::Queue;
### Global Variables ###
# Maximum working threads
my $MAX_THREADS = 10;
# Flag to inform all threads that application is terminating
my $TERM :shared = 0;
# Threads add their ID to this queue when they are ready for work
# Also, when app terminates a -1 is added to this queue
my $IDLE_QUEUE = Thread::Queue->new();
### Signal Handling ###
# Gracefully terminate application on ^C or command line 'kill'
$SIG{'INT'} = $SIG{'TERM'} =
sub {
print(">>> Terminating <<<\n");
$TERM = 1;
# Add -1 to head of idle queue to signal termination
$IDLE_QUEUE->insert(0, -1);
};
### Main Processing Section ###
MAIN:
{
### INITIALIZE ###
# Thread work queues referenced by thread ID
my %work_queues;
# Create the thread pool
for (1..$MAX_THREADS) {
# Create a work queue for a thread
my $work_q = Thread::Queue->new();
# Create the thread, and give it the work queue
my $thr = threads->create('worker', $work_q);
# Remember the thread's work queue
$work_queues{$thr->tid()} = $work_q;
}
### DO WORK ###
# Manage the thread pool until signalled to terminate
while (! $TERM) {
# Wait for an available thread
my $tid = $IDLE_QUEUE->dequeue();
# Check for termination condition
last if ($tid < 0);
# Give the thread some work to do
my $work = 5 + int(rand(10));
$work_queues{$tid}->enqueue($work);
}
### CLEANING UP ###
# Signal all threads that there is no more work
$work_queues{$_}->enqueue(-1) foreach keys(%work_queues);
# Wait for all the threads to finish
$_->join() foreach threads->list();
}
print("Done\n");
exit(0);
### Thread Entry Point Subroutines ###
# A worker thread
sub worker
{
my ($work_q) = @_;
# This thread's ID
my $tid = threads->tid();
# Work loop
do {
# Indicate that were are ready to do work
printf("Idle -> %2d\n", $tid);
$IDLE_QUEUE->enqueue($tid);
# Wait for work from the queue
my $work = $work_q->dequeue();
# If no more work, exit
last if ($work < 0);
# Do some work while monitoring $TERM
printf(" %2d <- Working\n", $tid);
while (($work > 0) && ! $TERM) {
$work -= sleep($work);
}
# Loop back to idle state if not told to terminate
} while (! $TERM);
# All done
printf("Finished -> %2d\n", $tid);
}
__END__
=head1 NAME
pool_reuse.pl - Simple 'threads' example with a reusable thread pool
=head1 DESCRIPTION
A simplistic example illustrating the following:
=over
=item * Creation of a pool of reusable threads
=item * Threads use a queue to indicate readiness for handling work
=item * Sending work to threads using queues
=item * Interrupting a threaded program
=item * Cleaning up threads before terminating
=back
=head1 SEE ALSO
L<threads>, L<threads::shared>, and L<Thread::Queue>
=head1 AUTHOR
Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
=head1 COPYRIGHT AND LICENSE
Copyright 2006 - 2009 Jerry D. Hedden. All rights reserved.
This program is free software; you can redistribute it and/or modify it under
the same terms as Perl itself.
=cut
|