File: ConcurrentReadOutputStream.java

package info (click to toggle)
bbmap 39.20%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 26,024 kB
  • sloc: java: 312,743; sh: 18,099; python: 5,247; ansic: 2,074; perl: 96; makefile: 39; xml: 38
file content (161 lines) | stat: -rwxr-xr-x 6,005 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package stream;

import java.util.ArrayList;

import fileIO.FileFormat;
import shared.Shared;

/**
 * Abstract superclass for ConcurrentReadOutputStream implementations.
 * These manage ReadStreamWriters, which write reads to a file in their own thread.
 * ConcurrentReadOutputStreams allow paired reads output to twin files to be treated as a single stream.
 * @author Brian Bushnell
 * @date Jan 26, 2015
 *
 */
public abstract class ConcurrentReadOutputStream {
	
	/*--------------------------------------------------------------*/
	/*----------------           Factory            ----------------*/
	/*--------------------------------------------------------------*/
	
	/** @See primary method */
	public static ConcurrentReadOutputStream getStream(FileFormat ff1, int rswBuffers, CharSequence header, boolean useSharedHeader){
		return getStream(ff1, null, null, null, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
	}
	
	/** @See primary method */
	public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, int rswBuffers, CharSequence header, boolean useSharedHeader){
		return getStream(ff1, ff2, null, null, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
	}
	
	/** @See primary method */
	public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, String qf1, String qf2,
			int rswBuffers, CharSequence header, boolean useSharedHeader){
		return getStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
	}
	
	/**
	 * Create a ConcurrentReadOutputStream.
	 * @param ff1 Read 1 file (required)
	 * @param ff2 Read 2 file (optional)
	 * @param qf1 Qual file 1 (optional)
	 * @param qf2 Qual file 2 (optional)
	 * @param rswBuffers Maximum number of lists to buffer for each ReadStreamWriter
	 * @param header A header to write to each output file before anything else
	 * @param useSharedHeader Write the shared header to each output file (mainly for sam output)
	 * @param mpi True if MPI will be used
	 * @param keepAll In MPI mode, tells this stream to keep all reads instead of just a fraction
	 * @return
	 */
	public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, String qf1, String qf2,
			int rswBuffers, CharSequence header, boolean useSharedHeader, final boolean mpi, final boolean keepAll){
		if(mpi){
			final int rank=Shared.MPI_RANK;
			final ConcurrentReadOutputStream cros0;
			if(rank==0){
				cros0=new ConcurrentGenericReadOutputStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader);
			}else{
				cros0=null;
			}
			final ConcurrentReadOutputStream crosD;
			if(Shared.USE_CRISMPI){
				assert(false) : "To support MPI, uncomment this.";
				crosD=null;
//				crosD=new ConcurrentReadOutputStreamMPI(cros0, rank==0);
			}else{
				crosD=new ConcurrentReadOutputStreamD(cros0, rank==0);
			}
			return crosD;
		}else{
			return new ConcurrentGenericReadOutputStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader);
		}
		
	}
	
	/*--------------------------------------------------------------*/
	/*----------------        Initialization        ----------------*/
	/*--------------------------------------------------------------*/
	
	ConcurrentReadOutputStream(FileFormat ff1_, FileFormat ff2_){
		ff1=ff1_;
		ff2=ff2_;
		ordered=(ff1==null ? true : ff1.ordered());
	}
	
	/** Must be called before writing to the stream */
	public abstract void start();
	
	public final boolean started(){return started;}
	
	/*--------------------------------------------------------------*/
	/*----------------        Outer Methods         ----------------*/
	/*--------------------------------------------------------------*/
	
	/** 
	 * Enqueue this list to be written.
	 * @param list List of reads
	 * @param listnum A number, starting at 0.  In ordered mode, lists will only be written in numeric order, regardless of adding order.
	 */
	public abstract void add(ArrayList<Read> list, long listnum);
	
	public abstract void close();
	
	public abstract void join();
	
	public abstract void resetNextListID();
	
	public abstract String fname();
	
	/** Return true if this stream has detected an error */
	public abstract boolean errorState();

	public abstract boolean finishedSuccessfully();
	
	/*--------------------------------------------------------------*/
	/*----------------        Inner Methods         ----------------*/
	/*--------------------------------------------------------------*/
	
	/*--------------------------------------------------------------*/
	/*----------------           Getters            ----------------*/
	/*--------------------------------------------------------------*/
	
	public long basesWritten(){
		long x=0;
		ReadStreamWriter rsw1=getRS1();
		ReadStreamWriter rsw2=getRS2();
		if(rsw1!=null){x+=rsw1.basesWritten();}
		if(rsw2!=null){x+=rsw2.basesWritten();}
		return x;
	}
	
	public long readsWritten(){
		long x=0;
		ReadStreamWriter rsw1=getRS1();
		ReadStreamWriter rsw2=getRS2();
		if(rsw1!=null){x+=rsw1.readsWritten();}
		if(rsw2!=null){x+=rsw2.readsWritten();}
		return x;
	}
	
	public abstract ReadStreamWriter getRS1();
	public abstract ReadStreamWriter getRS2();
	
	/*--------------------------------------------------------------*/
	/*----------------             Fields           ----------------*/
	/*--------------------------------------------------------------*/
	
	public final FileFormat ff1, ff2;
	public final boolean ordered;
	
	boolean errorState=false;
	boolean finishedSuccessfully=false;
	boolean started=false;
	
	/*--------------------------------------------------------------*/
	/*----------------        Static Fields         ----------------*/
	/*--------------------------------------------------------------*/
	
	public static boolean verbose=false;
	
}