1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
package sketch;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import shared.Tools;
public class AlignmentThreadPool {
public AlignmentThreadPool(int maxThreads_) {
maxThreads=maxThreads_;
assert(maxThreads>0);
tlist=new ArrayList<AlignmentThread>(maxThreads);
}
public void addJobs(ArrayList<Comparison> list, int maxRecords){
if(list==null || list.isEmpty() || maxRecords<1){return;}
final int limit=Tools.min(list.size(), maxRecords);
ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit);
int added=0;
for(int i=0; i<limit; i++){
Comparison c=list.get(i);
if(c.needsAlignment()){
addJob(c, dest);
added++;
}
}
for(int i=0; i<added; i++){
take(dest);
}
}
public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){
if(tlist.size()<maxThreads){spawnThread();}
assert(!poisoned);
AlignmentJob job=new AlignmentJob(c, dest);
put(job);
}
private synchronized void spawnThread(){
final int size=tlist.size();
if(size<maxThreads && busy.get()>=size){
// AlignmentThread alt=new AlignmentThread(source, busy);
AlignmentThread alt=new AlignmentThread();
tlist.add(alt);
alt.start();
}
}
synchronized void poison(){
assert(!poisoned);
if(poisoned){return;}
put(poison);
poisoned=true;
}
private void put(AlignmentJob job){
busy.incrementAndGet();
boolean success=false;
while(!success){
try {
source.put(job);
success=true;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private final <X> X take(ArrayBlockingQueue<X> queue){
X x=null;
while(x==null){
try {
x=queue.take();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
return x;
}
private class AlignmentThread extends Thread {
AlignmentThread(){}
private final AlignmentJob next(){
return take(source);
}
@Override
public void run(){
AlignmentJob job=null;
for(job=next(); !job.isPoison(); job=next()){
job.doWork();
busy.decrementAndGet();
}
put(poison);
}
// private final ArrayBlockingQueue<AlignmentJob> source;
// private final AtomicInteger busy;
//
// private static final AlignmentJob poison=new AlignmentJob(null, null);
}
final ArrayList<AlignmentThread> tlist;
final int maxThreads;
final AtomicInteger busy=new AtomicInteger(0);
private boolean poisoned=false;
private static final AlignmentJob poison=new AlignmentJob(null, null);
private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096);
}
|