1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
/*
* Copyright © 2007 Bart Massey
* ALL RIGHTS RESERVED
*
* You may use this code under terms listed at the end of
* this file.
*/
/* Multiplexing threaded producer output streams---demo code. */
autoimport Thread;
autoimport Semaphore;
autoimport Mutex;
autoimport PRNG;
int nprods = 15; /* number of producer processes to run */
int niters = 10; /* number of values for each producer to generate */
int npsleep = 2; /* sleep for 1..npsleep msec per value produced */
int nmsleep = 2; /* sleep for 1..nmsleep msec per value mplexed */
/* make sure lines of output don't get interleaved. */
mutex dprintf_lock = Mutex::new();
void dprintf(string fmt, poly args ...) {
twixt(acquire(dprintf_lock); release(dprintf_lock))
printf(fmt, args ...);
}
/* Sleep for a random number of milliseconds 1..msecs . */
void randsleep(int msecs) {
sleep(randint(msecs - 1) + 1);
}
/* We use semaphores instead of mutexes here only because of
the ownership check for mutexes---we want the acquiring
and releasing threads to be different for this code. */
/* Make sure producers access the shared location
properly. */
semaphore serializing = Semaphore::new(1);
/* The multiplexor blocks here when no values are available. */
semaphore blocking = Semaphore::new(0);
/* The shared value a producer uses to communicate with the
multiplexor. */
int mbox;
/* When a value is made available by a producer, retrieve it
from the mbox and print it on the output. */
void multiplex() {
dprintf("enter multiplex\n");
for (int i = 0; i < nprods * niters; i++) {
wait(blocking);
dprintf("%d\n", mbox);
signal(serializing);
randsleep(nmsleep);
}
dprintf("finish multiplex\n");
}
/* Actually start the multiplexor. */
thread consumer = fork(multiplex());
/* For best performance, the multiplexor should always run
in preference to producers. Otherwise producers will
block unnecessarily waiting for the multiplexor to wake
up and consume the value currently in the mbox. */
set_priority(consumer, get_priority(consumer) + 1);
/* Produce values, sleeping randomly between, to simulate
some real-world data collection process. */
void produce(int offset) {
dprintf("enter producer %d\n", offset);
for (int i = 0; i < niters; i++) {
randsleep(npsleep);
wait(serializing);
mbox = i + niters * offset;
signal(blocking);
}
dprintf("finish producer %d\n", offset);
}
/* Actually start all the producers (using an array comprehension). */
thread[nprods] prods = { [i] = fork(produce(i)) };
/* Wait for all the threads to finish and gather them up. */
dprintf("joining producers\n");
for (int i = 0; i < nprods; i++)
join(prods[i]);
dprintf("joining multiplexer\n");
join(consumer);
# Permission is hereby granted, free of charge, to any
# person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the
# Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the
# Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice
# shall be included in all copies or substantial portions of
# the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
# THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Except as contained in this notice, the names of the
# authors or their institutions shall not be used in
# advertising or otherwise to promote the sale, use or other
# dealings in this Software without prior written
# authorization from the authors.
|