File: ThreadUtil.java

package info (click to toggle)
imagej 1.54g-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 6,520 kB
  • sloc: java: 132,209; sh: 286; xml: 255; makefile: 6
file content (149 lines) | stat: -rw-r--r-- 5,832 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package ij.util;
import java.util.concurrent.*;

public class ThreadUtil {
	
	/** Start all given threads and wait on each of them until all are done.
	 * From Stephan Preibisch's Multithreading.java class. See:
	 * http://repo.or.cz/w/trakem2.git?a=blob;f=mpi/fruitfly/general/MultiThreading.java;hb=HEAD
	 * @param threads 
	 */
	public static void startAndJoin(Thread[] threads) {
		for (int ithread = 0; ithread < threads.length; ++ithread) {
			threads[ithread].setPriority(Thread.NORM_PRIORITY);
			threads[ithread].start();
		}

		try {
			for (int ithread = 0; ithread < threads.length; ++ithread) {
				threads[ithread].join();
			}
		} catch (InterruptedException ie) {
			throw new RuntimeException(ie);
		}
	}

	public static Thread[] createThreadArray(int nb) {
		if (nb == 0) {
			nb = getNbCpus();
		}
		Thread[] threads = new Thread[nb];

		return threads;
	}

	public static Thread[] createThreadArray() {
		return createThreadArray(0);
	}

	public static int getNbCpus() {
		return Runtime.getRuntime().availableProcessors();
	}

	/*--------------------------------------------------------------------------*/
	/* The following is for parallelization using a ThreadPool, which avoids the
	 * overhead of creating threads, and is therefore faster if each thread has
	 * only a short task to perform */

	/** The threadPoolExecutor holds at least as many threads for parallel execution as the number of
	 *  processors; additional threads are added as required. These additional threads will be
	 *  terminated if idle for 120 seconds. */
	public static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
			Runtime.getRuntime().availableProcessors(),	//minimum number of threads
			Integer.MAX_VALUE,							//maximum number of threads
			120,										//unused threads are terminated after this time
			TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>()			//requests will be processed immediately (not a real queue)
			);

	/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
	 *  and waits until each of them has finished.
	 *  If the current thread is interrupted, each of the callables gets
	 *  cancelled and interrupted. Also in that case, waits until all callables have
	 *  finished. The 'interrupted' status of the current thread is
	 *  preserved, as required for preview in an ImageJ ExtendedPlugInFilter.
	 *  Note that ImageJ requires that all callables can run concurrently,
	 *  and none of them must stay in the queue while others run.
	 *  (This is required by the RankFilters, where the threads are not independent)
	 *  @param callables Array of tasks. If no return value is needed,
	 *  best use <code>Callable<Void></code> (then the <code>Void call()</code> method
	 *  should return null). If the array size is 1, the <code>call()</code> method
	 *  is executed in the current thread.
	 *  @return Array of the <code>java.util.concurrent.Future</code>s,
	 *  corresponding to the callables. If the call methods of the callables
	 *  return results, the get() methods of these Futures may be used to get the results.
	 */
	public static Future[] startAndJoin(Callable[] callables) {
		if (callables.length == 1) {	//special case: call in current thread and create a Future
			Object callResult = null;
			try {
				callResult = callables[0].call();
			} catch (Exception e) {
				ij.IJ.handleException(e);
			}
			final Object result = callResult;
			Future[] futures = new Future[] {
				new Future() {
					public boolean cancel(boolean mayInterruptIfRunning) {return false;}
					public Object get() {return result;}
					public Object get(long timeout, TimeUnit unit) {return result;}
					public boolean isCancelled() {return false;}
					public boolean isDone() {return true;}
				}	
			};
			return futures;
		} else {
			Future[] futures = start(callables);
			joinAll(futures);
			return futures;
		}
	}

	/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
	 *  without waiting for the results.
	 *  @param callables Array of tasks; these might be <code>Callable<Void></code>
	 *  if no return value is needed (then the <code>call</code> methods should
	 *  return null).
	 *  @return Array of the <code>java.util.concurrent.Future</code>s,
	 *  corresponding to the callables. The futures may be used to wait for
	 *  completion of the callables or cancel them.
	 *  If the call methods of the callables return results, these Futures
	 *  may be used to get the results.
	 */
	public static Future[] start(Callable[] callables) {
		Future[] futures = new Future[callables.length];
		for (int i=0; i<callables.length; i++)
			futures[i] = threadPoolExecutor.submit(callables[i]);
		return futures;
	}

	/** Waits for completion of all <code>Callable</code>s corresponding to the
	 *  <code>Future</code>s given.
	 *  If the current thread is interrupted, each of the <code>Callable</code>s
	 *  gets cancelled and interrupted. Also in that case, this method waits
	 *  until all callables have finished.
	 *  The 'interrupted' status of the current thread is preserved,
	 *  as required for preview in an ImageJ ExtendedPlugInFilter.
	 */
	public static void joinAll(Future[] futures) {
		boolean interrupted = false;
		for (int i=0; i<futures.length; i++) {
			Future f = futures[i];
			try {
				f.get();
			} catch (InterruptedException e) {
				interrupted = true;
				for (int j=i; j<futures.length; j++)
					futures[j].cancel(true);
				i--;  //we still have to wait for completion of this one
			} catch (CancellationException e) { //cancellation is allowed, e.g. during preview
			} catch (Exception eOther) {
				ij.IJ.log("Error in thread called by "+Thread.currentThread().getName()+":\n"+eOther);
			}
		}
		if (interrupted) {
			Thread.currentThread().interrupt();
			threadPoolExecutor.purge();
		}
	}
}