1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
package ij.util;
import java.util.concurrent.*;
public class ThreadUtil {
/** Start all given threads and wait on each of them until all are done.
* From Stephan Preibisch's Multithreading.java class. See:
* http://repo.or.cz/w/trakem2.git?a=blob;f=mpi/fruitfly/general/MultiThreading.java;hb=HEAD
* @param threads
*/
public static void startAndJoin(Thread[] threads) {
for (int ithread = 0; ithread < threads.length; ++ithread) {
threads[ithread].setPriority(Thread.NORM_PRIORITY);
threads[ithread].start();
}
try {
for (int ithread = 0; ithread < threads.length; ++ithread) {
threads[ithread].join();
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
public static Thread[] createThreadArray(int nb) {
if (nb == 0) {
nb = getNbCpus();
}
Thread[] threads = new Thread[nb];
return threads;
}
public static Thread[] createThreadArray() {
return createThreadArray(0);
}
public static int getNbCpus() {
return Runtime.getRuntime().availableProcessors();
}
/*--------------------------------------------------------------------------*/
/* The following is for parallelization using a ThreadPool, which avoids the
* overhead of creating threads, and is therefore faster if each thread has
* only a short task to perform */
/** The threadPoolExecutor holds at least as many threads for parallel execution as the number of
* processors; additional threads are added as required. These additional threads will be
* terminated if idle for 120 seconds. */
public static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //minimum number of threads
Integer.MAX_VALUE, //maximum number of threads
120, //unused threads are terminated after this time
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>() //requests will be processed immediately (not a real queue)
);
/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
* and waits until each of them has finished.
* If the current thread is interrupted, each of the callables gets
* cancelled and interrupted. Also in that case, waits until all callables have
* finished. The 'interrupted' status of the current thread is
* preserved, as required for preview in an ImageJ ExtendedPlugInFilter.
* Note that ImageJ requires that all callables can run concurrently,
* and none of them must stay in the queue while others run.
* (This is required by the RankFilters, where the threads are not independent)
* @param callables Array of tasks. If no return value is needed,
* best use <code>Callable<Void></code> (then the <code>Void call()</code> method
* should return null). If the array size is 1, the <code>call()</code> method
* is executed in the current thread.
* @return Array of the <code>java.util.concurrent.Future</code>s,
* corresponding to the callables. If the call methods of the callables
* return results, the get() methods of these Futures may be used to get the results.
*/
public static Future[] startAndJoin(Callable[] callables) {
if (callables.length == 1) { //special case: call in current thread and create a Future
Object callResult = null;
try {
callResult = callables[0].call();
} catch (Exception e) {
ij.IJ.handleException(e);
}
final Object result = callResult;
Future[] futures = new Future[] {
new Future() {
public boolean cancel(boolean mayInterruptIfRunning) {return false;}
public Object get() {return result;}
public Object get(long timeout, TimeUnit unit) {return result;}
public boolean isCancelled() {return false;}
public boolean isDone() {return true;}
}
};
return futures;
} else {
Future[] futures = start(callables);
joinAll(futures);
return futures;
}
}
/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
* without waiting for the results.
* @param callables Array of tasks; these might be <code>Callable<Void></code>
* if no return value is needed (then the <code>call</code> methods should
* return null).
* @return Array of the <code>java.util.concurrent.Future</code>s,
* corresponding to the callables. The futures may be used to wait for
* completion of the callables or cancel them.
* If the call methods of the callables return results, these Futures
* may be used to get the results.
*/
public static Future[] start(Callable[] callables) {
Future[] futures = new Future[callables.length];
for (int i=0; i<callables.length; i++)
futures[i] = threadPoolExecutor.submit(callables[i]);
return futures;
}
/** Waits for completion of all <code>Callable</code>s corresponding to the
* <code>Future</code>s given.
* If the current thread is interrupted, each of the <code>Callable</code>s
* gets cancelled and interrupted. Also in that case, this method waits
* until all callables have finished.
* The 'interrupted' status of the current thread is preserved,
* as required for preview in an ImageJ ExtendedPlugInFilter.
*/
public static void joinAll(Future[] futures) {
boolean interrupted = false;
for (int i=0; i<futures.length; i++) {
Future f = futures[i];
try {
f.get();
} catch (InterruptedException e) {
interrupted = true;
for (int j=i; j<futures.length; j++)
futures[j].cancel(true);
i--; //we still have to wait for completion of this one
} catch (CancellationException e) { //cancellation is allowed, e.g. during preview
} catch (Exception eOther) {
ij.IJ.log("Error in thread called by "+Thread.currentThread().getName()+":\n"+eOther);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
threadPoolExecutor.purge();
}
}
}
|