File: AlignmentThreadPool.java

package info (click to toggle)
bbmap 39.20%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 26,008 kB
  • sloc: java: 312,743; sh: 18,096; python: 5,247; ansic: 2,074; perl: 96; makefile: 39; xml: 38
file content (118 lines) | stat: -rwxr-xr-x 2,866 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package sketch;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import shared.Tools;

public class AlignmentThreadPool {
	
	public AlignmentThreadPool(int maxThreads_) {
		maxThreads=maxThreads_;
		assert(maxThreads>0);
		tlist=new ArrayList<AlignmentThread>(maxThreads);
	}
	
	public void addJobs(ArrayList<Comparison> list, int maxRecords){
		if(list==null || list.isEmpty() || maxRecords<1){return;}
		final int limit=Tools.min(list.size(), maxRecords);
		ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit);
		int added=0;
		for(int i=0; i<limit; i++){
			Comparison c=list.get(i);
			if(c.needsAlignment()){
				addJob(c, dest);
				added++;
			}
		}
		for(int i=0; i<added; i++){
			take(dest);
		}
	}
	
	public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){
		if(tlist.size()<maxThreads){spawnThread();}
		assert(!poisoned);
		AlignmentJob job=new AlignmentJob(c, dest);
		put(job);
	}
	
	private synchronized void spawnThread(){
		final int size=tlist.size();
		if(size<maxThreads && busy.get()>=size){
//			AlignmentThread alt=new AlignmentThread(source, busy);
			AlignmentThread alt=new AlignmentThread();
			tlist.add(alt);
			alt.start();
		}
	}
	
	synchronized void poison(){
		assert(!poisoned);
		if(poisoned){return;}
		put(poison);
		poisoned=true;
	}
	
	private void put(AlignmentJob job){
		busy.incrementAndGet();
		boolean success=false;
		while(!success){
			try {
				source.put(job);
				success=true;
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	private final <X> X take(ArrayBlockingQueue<X> queue){
		X x=null;
		while(x==null){
			try {
				x=queue.take();
			} catch (InterruptedException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
		}
		return x;
	}
	
	private class AlignmentThread extends Thread {
		
		AlignmentThread(){}
		
		private final AlignmentJob next(){
			return take(source);
		}
		
		@Override
		public void run(){
			AlignmentJob job=null;
			for(job=next(); !job.isPoison(); job=next()){
				job.doWork();
				busy.decrementAndGet();
			}
			put(poison);
		}
		
//		private final ArrayBlockingQueue<AlignmentJob> source;
//		private final AtomicInteger busy;
//
//		private static final AlignmentJob poison=new AlignmentJob(null, null);
		
	}
	
	final ArrayList<AlignmentThread> tlist;
	final int maxThreads;
	final AtomicInteger busy=new AtomicInteger(0);
	private boolean poisoned=false;

	private static final AlignmentJob poison=new AlignmentJob(null, null);
	private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096);
	
}