File: BufferedMultiCros.java

package info (click to toggle)
bbmap 39.20%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 26,024 kB
  • sloc: java: 312,743; sh: 18,099; python: 5,247; ansic: 2,074; perl: 96; makefile: 39; xml: 38
file content (384 lines) | stat: -rwxr-xr-x 13,588 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package stream;

import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

import fileIO.FileFormat;
import shared.KillSwitch;
import shared.Parse;
import shared.Shared;
import shared.Tools;
import structures.ByteBuilder;

/**
 * Allows output of reads to multiple different output streams.
 * Each output stream is controlled by a buffer,
 * which stores reads until there is a sufficient quantity to dump.
 * 
 * @author Brian Bushnell
 * @date May 14, 2019
 *
 */
public abstract class BufferedMultiCros extends Thread {
	
	/*--------------------------------------------------------------*/
	/*----------------         Constructors         ----------------*/
	/*--------------------------------------------------------------*/
	
	public static BufferedMultiCros make(String out1, String out2, boolean overwrite, boolean append, 
			boolean allowSubprocess, boolean useSharedHeader, int defaultFormat) {
		return make(out1, out2, overwrite, append, allowSubprocess, useSharedHeader, 
				defaultFormat, defaultThreaded, defaultMcrosType, defaultMaxStreams);
	}
	
	public static BufferedMultiCros make(String out1, String out2, boolean overwrite, boolean append, 
			boolean allowSubprocess, boolean useSharedHeader, int defaultFormat, boolean threaded,
			int mcrosType, int maxStreams) {

		BufferedMultiCros mcros=null;
		if(mcrosType==2){//Slow, synchronous mcros type
			mcros=new MultiCros2(out1, out2, overwrite, append, true, useSharedHeader, FileFormat.FASTQ, threaded);
		}else if(mcrosType==3){//Faster, asynchronous type
			mcros=new MultiCros3(out1, out2, overwrite, append, true, useSharedHeader, FileFormat.FASTQ, threaded, maxStreams);
		}else if(mcrosType==4){//Threaded file closing
			mcros=new MultiCros4(out1, out2, overwrite, append, true, useSharedHeader, FileFormat.FASTQ, threaded, maxStreams);
		}else if(mcrosType==5){//New retirement ordering by timer
			mcros=new MultiCros5(out1, out2, overwrite, append, true, useSharedHeader, FileFormat.FASTQ, threaded, maxStreams);
		}else if(mcrosType==6){//New retirement ordering by timer
			mcros=new MultiCros6(out1, out2, overwrite, append, true, useSharedHeader, FileFormat.FASTQ, threaded, maxStreams);
		}else{
			throw new RuntimeException("Bad mcrosType: "+mcrosType);
		}
		return mcros;
	}
	
	/**
	 * Primary constructor.
	 * @param pattern1_ Name pattern for file 1; must contain % (required)
	 * @param pattern2_ Name pattern for file 2; must contain % (optional)
	 * @param overwrite_ Permission to overwrite
	 * @param append_ Permission to append to existing files (this should generally be false)
	 * @param allowSubprocess_ Allow subprocesses such as pigz, bgzip, or samtools
	 * @param useSharedHeader_ Print the stored header (from an input sam file) in all output sam files 
	 * @param defaultFormat_ Assume files are in this format if they don't have a valid extension
	 * @param threaded_ Run this mcros in its own thread
	 * @param maxStreams_ Max allowed number of concurrent open streams
	 */
	public BufferedMultiCros(String pattern1_, String pattern2_,
			boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, 
			int defaultFormat_, boolean threaded_, int maxStreams_){
		assert(pattern1_!=null && pattern1_.indexOf('%')>=0);
		assert(pattern2_==null || pattern1_.indexOf('%')>=0);
		
		//Perform # expansion for twin files
		if(pattern2_==null && pattern1_.indexOf('#')>=0){
			pattern1=pattern1_.replaceFirst("#", "1");
			pattern2=pattern1_.replaceFirst("#", "2");
		}else{
			pattern1=pattern1_;
			pattern2=pattern2_;
		}
		
		overwrite=overwrite_;
		append=append_;
		allowSubprocess=allowSubprocess_;
		useSharedHeader=useSharedHeader_;
		
		defaultFormat=defaultFormat_;
		
		threaded=threaded_;
		transferQueue=threaded ? new ArrayBlockingQueue<ArrayList<Read>>(8) : null;
		maxStreams=maxStreams_;
		
		//Significantly impacts performance.
		//Higher numbers give more retires but less time per retire.
		//Optimal seems to be around 4-6, at least for 16 streams.
		streamsToRetire=Tools.mid(2, (maxStreams+1)/3, 16);

		final long bytes=Shared.memAvailable();
		memLimitLower=Tools.max(50000000, (long)(memLimitLowerMult*bytes));
		memLimitMid=Tools.max(70000000, (long)(memLimitMidMult*bytes));
		memLimitUpper=Tools.max(90000000, (long)(memLimitUpperMult*bytes));
	}
	
	/*--------------------------------------------------------------*/
	/*----------------           Parsing            ----------------*/
	/*--------------------------------------------------------------*/
	
	public static boolean parseStatic(String arg, String a, String b){
		if(a.equals("mcrostype")){
			defaultMcrosType=Integer.parseInt(b);
		}else if(a.equals("threaded")){
			defaultThreaded=Parse.parseBoolean(b);
		}else if(a.equals("streams")){
			defaultMaxStreams=Integer.parseInt(b);
		}else if(a.equalsIgnoreCase("readsPerBuffer")){
			defaultReadsPerBuffer=Integer.parseInt(b);
		}else if(a.equalsIgnoreCase("bytesPerBuffer")){
			defaultBytesPerBuffer=Integer.parseInt(b);
		}else if(a.equalsIgnoreCase("memLimitLowerMult") || a.equals("mllmult") || a.equals("mllm")){
			memLimitLowerMult=Float.parseFloat(b);
			assert(memLimitLowerMult>=0 && memLimitLowerMult<1);
		}else if(a.equalsIgnoreCase("memLimitMidMult") || a.equals("mlmmult") || a.equals("mlmm")){
			memLimitMidMult=Float.parseFloat(b);
			assert(memLimitMidMult>=0 && memLimitMidMult<1);
		}else if(a.equalsIgnoreCase("memLimitUpperMult") || a.equals("mlumult") || a.equals("mlum")){
			memLimitUpperMult=Float.parseFloat(b);
			assert(memLimitUpperMult>=0 && memLimitUpperMult<1);
		}else{
			return false;
		}
		
		return true;
	}
	
	/*--------------------------------------------------------------*/
	/*----------------       Abstract Methods       ----------------*/
	/*--------------------------------------------------------------*/

	/** True if no errors were encountered */
	public abstract boolean finishedSuccessfully();

	/** 
	 * Add a single read.  Should not be used in threaded mode.
	 * Should only be used by this class.
	 * @param r Read to add.
	 * @param name Name of destination buffer.
	 */
	abstract void add(Read r, String name);
	
	/** 
	 * Dump all buffered reads to disk, except when minReadsToDump forbids it.
	 * @return Number of reads dumped.
	 */
	abstract long dumpAll();
	
	/** 
	 * Dump all residual reads to this stream.
	 * @param rosu Destination stream.
	 * @return Number of residual reads dumped.
	 */
	public abstract long dumpResidual(ConcurrentReadOutputStream rosu);
	
	/** Dump everything and close any open streams. */
	abstract long closeInner();
	
	/** Generate a report on how many reads went to each file */
	public abstract ByteBuilder report();
	
	/** Time for shutting down output threads */
	public String printRetireTime() {
		throw new RuntimeException("printRetireTime not available for "+getClass().getName());
	}
	
	/** Time for shutting down output threads */
	public String printCreateTime() {
		throw new RuntimeException("printRetireTime not available for "+getClass().getName());
	}
	
	public abstract Set<String> getKeys();
	
	/*--------------------------------------------------------------*/
	/*----------------        Final Methods         ----------------*/
	/*--------------------------------------------------------------*/
	
	/** Shut this down and perform any cleanup needed. */
	public final void close(){
		if(threaded){poisonAndWait();}
		else{closeInner();}
	}
	
	/** Primary file pattern */
	public final String fname(){return pattern1;}
	
	/** Return true if this stream has detected an error */
	public final boolean errorState(){
		return errorState;
	}
	
	/** 
	 * Send a list of reads to an output buffer.
	 * The reads must have a name attached to the object field in order to be written. 
	 */
	public final void add(ArrayList<Read> list) {
		if(threaded){//Send to the transfer queue
			addToQueue(list);
		}else{//Add the reads from this thread
			addToBuffers(list);
		}
	}
	
	/** Send individual reads to their designated buffer */
	private final void addToBuffers(ArrayList<Read> list) {
		for(Read r : list){
			if(r.obj!=null){
				String name=(String)r.obj;
				readsInTotal++;
				add(r, name);//Reads without a name in the obj field get ignored here.
			}
		}
		handleLoad0();
	}
	
	/** Called to handle load after adding a list */
	void handleLoad0() {
		//Do nothing
	}
	
	/*--------------------------------------------------------------*/
	/*----------------       Threaded Methods       ----------------*/
	/*--------------------------------------------------------------*/
	
	@Override
	/** For threaded mode */
	public final void run(){
		assert(threaded) : "This should only be called in threaded mode.";
		try {
			for(ArrayList<Read> list=transferQueue.take(); list!=poisonToken; list=transferQueue.take()){
				if(verbose){System.err.println("Got list; size=\"+transferQueue.size())");}
				addToBuffers(list);
				if(verbose){System.err.println("Added list; size="+transferQueue.size());}
			}
		} catch (InterruptedException e) {
			//Terminate JVM if something goes wrong
			KillSwitch.exceptionKill(e);
		}
		closeInner();
	}
	
	/** Indicate that no more reads will be sent, for threaded mode */
	public final void poison(){
		assert(threaded) : "This should only be called in threaded mode.";
		addToQueue(poisonToken);
	}
	
	boolean addToQueue(ArrayList<Read> list) {
		boolean success=false;
		for(int i=0; i<10 && !success; i++) {
			try {
				transferQueue.put(list);
				success=true;
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		if(!success) {
			KillSwitch.kill("Something went wrong when adding to "+getClass().getName());
		}
		return success;
	}
	
	/** Indicate that no more reads will be sent, for threaded mode */
	public final void poisonAndWait(){
		assert(threaded) : "This should only be called in threaded mode.";
		poison();
		waitForFinish();
	}
	
	/** Wait for this object's thread to terminate */
	public final void waitForFinish(){
		assert(threaded);
		if(verbose){System.err.println("Waiting for finish.");}
		while(this.getState()!=Thread.State.TERMINATED){
			if(verbose){System.err.println("Attempting join.");}
			try {
				this.join(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	/*--------------------------------------------------------------*/
	/*----------------             Fields           ----------------*/
	/*--------------------------------------------------------------*/
	
	/** Output file patterns containing a % symbol */
	public final String pattern1, pattern2;
	
	/** True if an error was encountered */
	boolean errorState=false;
	
	/** File overwrite permission */
	final boolean overwrite;
	
	/** File append permission */
	final boolean append;
	
	/** Subprocess spawning permission (e.g., for pigz) */
	final boolean allowSubprocess;
	
	/** Output file format, if unclear from file extension */
	final int defaultFormat;
	
	/** Buffers for each ReadStreamWriter */
	int rswBuffers=1;
	
	/** Print the shared header (for sam files) */
	final boolean useSharedHeader;
	
	/** Don't retire below this limit */
	final long memLimitLower;
	
	/** Possibly take some action */
	final long memLimitMid;
	
	/** Dump everything if this limit is reached from buffered reads */
	final long memLimitUpper;

	/** Allow this many active streams, for MCros3+ */
	public final int maxStreams;
	
	/** Retire this many streams at a time */
	public final int streamsToRetire;
	
	/** Dump a buffer once it holds this many reads */
	public int readsPerBuffer=defaultReadsPerBuffer;
	
	/** Dump a buffer once it holds this many bytes (estimated) */
	public int bytesPerBuffer=defaultBytesPerBuffer;
	
	/** Never write files with fewer than this many reads */
	public long minReadsToDump=0;

	/** Number of reads encountered that were not written */
	public long residualReads=0, residualBases=0;
	
	long readsInTotal=0;
	
	/** Current number of buffered reads */
	long readsInFlight=0;
	
	/** Current number of buffered bytes (estimated) */
	long bytesInFlight=0;
	
	/** Used when MultiCros is run in threaded mode */
	private final ArrayBlockingQueue<ArrayList<Read>> transferQueue;
	
	/** Signal to terminate when in threaded mode */
	private final ArrayList<Read> poisonToken=new ArrayList<Read>(0);
	
	/** True if this object is intended to run in a separate thread */
	public final boolean threaded;
	
	/** Use a LogLog to track cardinality for each output file */
	public boolean trackCardinality=false;
	
	/*--------------------------------------------------------------*/
	/*----------------        Static Fields         ----------------*/
	/*--------------------------------------------------------------*/
	
	private static float memLimitLowerMult=0.20f;
	private static float memLimitMidMult=0.40f;
	private static float memLimitUpperMult=0.60f;
	public static boolean defaultThreaded=true;
	public static int defaultMaxStreams=12;
	public static int defaultMcrosType=6;
	public static int defaultReadsPerBuffer=32000;
	public static int defaultBytesPerBuffer=16000000;
	
	public static boolean verbose=false;

}