File: WorkerTeam.java

package info (click to toggle)
libpj-java 0.0~20150107%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 13,396 kB
  • sloc: java: 99,543; ansic: 987; sh: 153; xml: 26; makefile: 10; sed: 4
file content (388 lines) | stat: -rw-r--r-- 11,049 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
//******************************************************************************
//
// File:    WorkerTeam.java
// Package: edu.rit.pj
// Unit:    Class edu.rit.pj.WorkerTeam
//
// This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
// reserved. For further information, contact the author, Alan Kaminsky, at
// ark@cs.rit.edu.
//
// This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
// software; you can redistribute it and/or modify it under the terms of the GNU
// General Public License as published by the Free Software Foundation; either
// version 3 of the License, or (at your option) any later version.
//
// PJ is distributed in the hope that it will be useful, but WITHOUT ANY
// WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
// A PARTICULAR PURPOSE. See the GNU General Public License for more details.
//
// Linking this library statically or dynamically with other modules is making a
// combined work based on this library. Thus, the terms and conditions of the
// GNU General Public License cover the whole combination.
//
// As a special exception, the copyright holders of this library give you
// permission to link this library with independent modules to produce an
// executable, regardless of the license terms of these independent modules, and
// to copy and distribute the resulting executable under terms of your choice,
// provided that you also meet, for each linked independent module, the terms
// and conditions of the license of that module. An independent module is a
// module which is not derived from or based on this library. If you modify this
// library, you may extend this exception to your version of the library, but
// you are not obligated to do so. If you do not wish to do so, delete this
// exception statement from your version.
//
// A copy of the GNU General Public License is provided in the file gpl.txt. You
// may also obtain a copy of the GNU General Public License on the World Wide
// Web at http://www.gnu.org/licenses/gpl.html.
//
//******************************************************************************

package edu.rit.pj;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

/**
 * Class WorkerTeam provides a team of threads, distributed across the processes
 * of a cluster parallel program, for executing a {@linkplain WorkerRegion} in
 * parallel.
 * <P>
 * A worker team uses a communicator for message passing. The communicator is
 * specified as a constructor argument; if not specified, the world communicator
 * is used. Every process that is part of the communicator must create the
 * worker team. In class WorkerTeam, there is one worker thread per process. (To
 * get more than one worker thread per process, use class {@linkplain
 * HybridTeam}.) Every worker thread in every process has a unique index, going
 * from index 0 for the first thread in the first process to index
 * <I>K</I>&minus;1 for the last thread in the last process, where <I>K</I> is
 * the total number of worker threads in all the processes. In process rank 0,
 * there is an additional master thread.
 * <P>
 * To execute a worker region, create a WorkerTeam object; create an instance of
 * a concrete subclass of class {@linkplain WorkerRegion}; and pass this
 * instance to the worker team's <TT>execute()</TT> method. For further
 * information, see class {@linkplain WorkerRegion}.
 *
 * @author  Alan Kaminsky
 * @version 19-Jan-2010
 */
public class WorkerTeam
	{

// Hidden data members.

	// Number of worker threads in this process.
	int K;

	// Communicator for message passing, its size, this process's rank.
	Comm comm;
	int size;
	int rank;

	// Number of worker threads in all processes.
	int count;

	// Array of worker and master team threads in this process. There are K
	// worker threads. There is an additional master thread in the last process
	// of the communicator.
	WorkerTeamThread[] myThread;

	// Worker region being executed, or null if none is being executed.
	WorkerRegion myRegion;

	// Semaphore for synchronizing threads at the end of a worker region.
	Semaphore myRegionEndSemaphore = new Semaphore (0);

	// Exception map for worker region, or null if none is being executed.
	ConcurrentHashMap<Integer,Throwable> myExceptionMap;

// Hidden constructors.

	/**
	 * Construct a new, uninitialized worker team.
	 *
	 * @param  flag  To distinguish this constructor from the others.
	 */
	WorkerTeam
		(boolean flag)
		{
		}

// Exported constructors.

	/**
	 * Construct a new worker team with one thread per process and using the
	 * world communicator for message passing.
	 */
	public WorkerTeam()
		{
		this (Comm.world());
		}

	/**
	 * Construct a new worker team with one thread per process and using the
	 * given communicator for message passing.
	 *
	 * @param  comm  Communicator to use for message passing.
	 *
	 * @exception  NullPointerException
	 *     (unchecked exception) Thrown if <TT>comm</TT> is null.
	 */
	public WorkerTeam
		(Comm comm)
		{
		if (comm == null)
			{
			throw new NullPointerException
				("WorkerTeam(): comm is null");
			}
		initialize
			(/*K    */ 1,
			 /*comm */ comm,
			 /*size */ comm.size(),
			 /*rank */ comm.rank(),
			 /*count*/ comm.size(),
			 /*wlb  */ comm.rank());
		}

// Hidden initializers.

	/**
	 * Initialize a new worker team.
	 *
	 * @param  K      Number of worker threads in this process.
	 * @param  comm   Communicator to use for message passing.
	 * @param  size   Communicator's size.
	 * @param  rank   This process's rank in the communicator.
	 * @param  count  Number of worker threads in all processes.
	 * @param  wlb    First worker index in this process.
	 */
	void initialize
		(int K,
		 Comm comm,
		 int size,
		 int rank,
		 int count,
		 int wlb)
		{
		// Record parameters.
		this.K = K;
		this.comm = comm;
		this.size = size;
		this.rank = rank;
		this.count = count;

		// Set up worker team threads. Additional master thread in process 0.
		int WM = K + (rank == 0 ? 1 : 0);
		myThread = new WorkerTeamThread [WM];
		for (int i = 0; i < K; ++ i)
			{
			myThread[i] = new WorkerTeamThread (this, wlb + i);
			}
		if (WM > K)
			{
			myThread[K] = new WorkerTeamThread (this, -1);
			}
		}

// Exported operations.

	/**
	 * Execute the given worker region.
	 *
	 * @param  theRegion  Worker region.
	 *
	 * @exception  NullPointerException
	 *     (unchecked exception) Thrown if <TT>theRegion</TT> is null.
	 * @exception  IllegalStateException
	 *     (unchecked exception) Thrown if this worker team is already executing
	 *     a worker region. Thrown if <TT>theRegion</TT> is already being
	 *     executed by a worker team.
	 * @exception  Exception
	 *     Exception thrown by the worker region's <TT>start()</TT>,
	 *     <TT>run()</TT>, or <TT>finish()</TT> methods.
	 */
	public final void execute
		(WorkerRegion theRegion)
		throws Exception
		{
		// Verify preconditions.
		if (theRegion == null)
			{
			throw new NullPointerException
				("WorkerTeam.execute(): theRegion is null");
			}
		if (myRegion != null)
			{
			throw new IllegalStateException
				("WorkerTeam.execute(): Already executing a worker region");
			}
		if (theRegion.myTeam != null)
			{
			throw new IllegalStateException
				("WorkerTeam.execute(): theRegion already being executed by a worker team");
			}

		// Record worker region.
		myRegion = theRegion;
		myExceptionMap = new ConcurrentHashMap<Integer,Throwable> (K, 0.75f, K);
		theRegion.myTeam = this;

		try
			{
			// Perform the worker region's start() method. Any exception aborts
			// the execute() method.
			myRegion.start();

			// Release the team threads to perform the worker region's run()
			// method.
			for (WorkerTeamThread thread : myThread)
				{
				thread.myRegionBeginSemaphore.release();
				}

			// Wait until all team threads have returned from the worker
			// region's run() method.
			myRegionEndSemaphore.acquireUninterruptibly (myThread.length);

			// Propagate any exceptions thrown by the run() method.
			if (myExceptionMap.isEmpty())
				{
				}
			else if (myExceptionMap.size() == 1)
				{
				rethrow (myExceptionMap.values().iterator().next());
				}
			else
				{
				throw new MultipleParallelException
					("WorkerTeam.execute(): Multiple threads threw exceptions",
					 myExceptionMap);
				}

			// Perform the worker region's finish() method. Any exception aborts
			// the execute() method.
			myRegion.finish();
			}

		finally
			{
			// Clean up.
			myRegion.myTeam = null;
			myExceptionMap = null;
			myRegion = null;
			}
		}

	/**
	 * Determine if this worker team is executing a worker region.
	 *
	 * @return  True if this worker team is executing a worker region, false
	 *          otherwise.
	 */
	public final boolean isExecutingInParallel()
		{
		return myRegion != null;
		}

	/**
	 * Returns the worker region of code that this worker team is executing.
	 *
	 * @return  Worker region.
	 *
	 * @exception  IllegalStateException
	 *     (unchecked exception) Thrown if this worker team is not executing a
	 *     worker region.
	 */
	public final WorkerRegion region()
		{
		if (myRegion == null)
			{
			throw new IllegalStateException
				("WorkerTeam.region(): Not executing a worker region");
			}
		return myRegion;
		}

	/**
	 * Determine the number of worker threads in this worker team in this
	 * process. This does not include the master thread if any.
	 *
	 * @return  Number of worker threads in this process.
	 */
	public final int getThreadCount()
		{
		return K;
		}

	/**
	 * Determine the total number of worker threads in this worker team in all
	 * processes. This does not include the master thread.
	 *
	 * @return  Number of worker threads in all processes.
	 */
	public final int getTotalThreadCount()
		{
		return count;
		}

	/**
	 * Determine the rank of the process that contains the master thread.
	 * At present, this is always rank 0.
	 *
	 * @return  Master process rank.
	 */
	public int masterRank()
		{
		return 0;
		}

	/**
	 * Determine the rank of the process that contains the worker thread with
	 * the given index.
	 *
	 * @param  w  Worker index.
	 *
	 * @return  Worker process rank.
	 *
	 * @exception  IllegalArgumentException
	 *     (unchecked exception) Thrown if <TT>w</TT> is not in the range 0 ..
	 *     <TT>getTotalThreadCount()</TT>&minus;1.
	 */
	public int workerRank
		(int w)
		{
		if (0 > w || w >= count)
			{
			throw new IllegalArgumentException
				("WorkerTeam.workerRank(): w (= "+w+") illegal");
			}
		return w;
		}

// Hidden operations.

	/**
	 * Re-throw the given object as a checked or unchecked exception. If the
	 * given object is null or is not throwable, do nothing.
	 */
	static void rethrow
		(Object exc)
		throws Exception
		{
		if (exc instanceof RuntimeException)
			{
			throw (RuntimeException) exc;
			}
		else if (exc instanceof Exception)
			{
			throw (Exception) exc;
			}
		else if (exc instanceof Error)
			{
			throw (Error) exc;
			}
		}

	}