File: mplex.5c

package info (click to toggle)
nickle 2.107
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 3,756 kB
  • sloc: ansic: 27,954; yacc: 1,874; lex: 954; sh: 204; makefile: 13; lisp: 1
file content (120 lines) | stat: -rw-r--r-- 4,051 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
 * Copyright © 2007 Bart Massey
 * ALL RIGHTS RESERVED
 *
 * You may use this code under terms listed at the end of
 * this file.
 */

/* Multiplexing threaded producer output streams---demo code. */

autoimport Thread;
autoimport Semaphore;
autoimport Mutex;
autoimport PRNG;

int nprods = 15;   /* number of producer processes to run */
int niters = 10;   /* number of values for each producer to generate */
int npsleep = 2;   /* sleep for 1..npsleep msec per value produced */
int nmsleep = 2;   /* sleep for 1..nmsleep msec per value mplexed */

/* make sure lines of output don't get interleaved. */
mutex dprintf_lock = Mutex::new();
void dprintf(string fmt, poly args ...) {
    twixt(acquire(dprintf_lock); release(dprintf_lock))
	printf(fmt, args ...);
}

/* Sleep for a random number of milliseconds 1..msecs . */
void randsleep(int msecs) {
    sleep(randint(msecs - 1) + 1);
}

/* We use semaphores instead of mutexes here only because of
   the ownership check for mutexes---we want the acquiring
   and releasing threads to be different for this code. */

/* Make sure producers access the shared location
   properly. */
semaphore serializing = Semaphore::new(1);

/* The multiplexor blocks here when no values are available. */
semaphore blocking = Semaphore::new(0);

/* The shared value a producer uses to communicate with the
   multiplexor. */
int mbox;

/* When a value is made available by a producer, retrieve it
   from the mbox and print it on the output. */
void multiplex() {
    dprintf("enter multiplex\n");
    for (int i = 0; i < nprods * niters; i++) {
	wait(blocking);
	dprintf("%d\n", mbox);
	signal(serializing);
	randsleep(nmsleep);
    }
    dprintf("finish multiplex\n");
}

/* Actually start the multiplexor. */
thread consumer = fork(multiplex());

/* For best performance, the multiplexor should always run
   in preference to producers.  Otherwise producers will
   block unnecessarily waiting for the multiplexor to wake
   up and consume the value currently in the mbox. */
set_priority(consumer, get_priority(consumer) + 1);

/* Produce values, sleeping randomly between, to simulate
   some real-world data collection process. */
void produce(int offset) {
    dprintf("enter producer %d\n", offset);
    for (int i = 0; i < niters; i++) {
	randsleep(npsleep);
	wait(serializing);
	mbox = i + niters * offset;
	signal(blocking);
    }
    dprintf("finish producer %d\n", offset);
} 

/* Actually start all the producers (using an array comprehension). */
thread[nprods] prods = { [i] = fork(produce(i)) };

/* Wait for all the threads to finish and gather them up. */
dprintf("joining producers\n");
for (int i = 0; i < nprods; i++)
    join(prods[i]);
dprintf("joining multiplexer\n");
join(consumer);

# Permission is hereby granted, free of charge, to any
# person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the
# Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the
# Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice
# shall be included in all copies or substantial portions of
# the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
# THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# Except as contained in this notice, the names of the
# authors or their institutions shall not be used in
# advertising or otherwise to promote the sale, use or other
# dealings in this Software without prior written
# authorization from the authors.