1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
package stream;
import java.util.ArrayList;
import fileIO.FileFormat;
import shared.Shared;
/**
* Abstract superclass for ConcurrentReadOutputStream implementations.
* These manage ReadStreamWriters, which write reads to a file in their own thread.
* ConcurrentReadOutputStreams allow paired reads output to twin files to be treated as a single stream.
* @author Brian Bushnell
* @date Jan 26, 2015
*
*/
public abstract class ConcurrentReadOutputStream {
/*--------------------------------------------------------------*/
/*---------------- Factory ----------------*/
/*--------------------------------------------------------------*/
/** @See primary method */
public static ConcurrentReadOutputStream getStream(FileFormat ff1, int rswBuffers, CharSequence header, boolean useSharedHeader){
return getStream(ff1, null, null, null, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
}
/** @See primary method */
public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, int rswBuffers, CharSequence header, boolean useSharedHeader){
return getStream(ff1, ff2, null, null, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
}
/** @See primary method */
public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, String qf1, String qf2,
int rswBuffers, CharSequence header, boolean useSharedHeader){
return getStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
}
/**
* Create a ConcurrentReadOutputStream.
* @param ff1 Read 1 file (required)
* @param ff2 Read 2 file (optional)
* @param qf1 Qual file 1 (optional)
* @param qf2 Qual file 2 (optional)
* @param rswBuffers Maximum number of lists to buffer for each ReadStreamWriter
* @param header A header to write to each output file before anything else
* @param useSharedHeader Write the shared header to each output file (mainly for sam output)
* @param mpi True if MPI will be used
* @param keepAll In MPI mode, tells this stream to keep all reads instead of just a fraction
* @return
*/
public static ConcurrentReadOutputStream getStream(FileFormat ff1, FileFormat ff2, String qf1, String qf2,
int rswBuffers, CharSequence header, boolean useSharedHeader, final boolean mpi, final boolean keepAll){
if(mpi){
final int rank=Shared.MPI_RANK;
final ConcurrentReadOutputStream cros0;
if(rank==0){
cros0=new ConcurrentGenericReadOutputStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader);
}else{
cros0=null;
}
final ConcurrentReadOutputStream crosD;
if(Shared.USE_CRISMPI){
assert(false) : "To support MPI, uncomment this.";
crosD=null;
// crosD=new ConcurrentReadOutputStreamMPI(cros0, rank==0);
}else{
crosD=new ConcurrentReadOutputStreamD(cros0, rank==0);
}
return crosD;
}else{
return new ConcurrentGenericReadOutputStream(ff1, ff2, qf1, qf2, rswBuffers, header, useSharedHeader);
}
}
/*--------------------------------------------------------------*/
/*---------------- Initialization ----------------*/
/*--------------------------------------------------------------*/
ConcurrentReadOutputStream(FileFormat ff1_, FileFormat ff2_){
ff1=ff1_;
ff2=ff2_;
ordered=(ff1==null ? true : ff1.ordered());
}
/** Must be called before writing to the stream */
public abstract void start();
public final boolean started(){return started;}
/*--------------------------------------------------------------*/
/*---------------- Outer Methods ----------------*/
/*--------------------------------------------------------------*/
/**
* Enqueue this list to be written.
* @param list List of reads
* @param listnum A number, starting at 0. In ordered mode, lists will only be written in numeric order, regardless of adding order.
*/
public abstract void add(ArrayList<Read> list, long listnum);
public abstract void close();
public abstract void join();
public abstract void resetNextListID();
public abstract String fname();
/** Return true if this stream has detected an error */
public abstract boolean errorState();
public abstract boolean finishedSuccessfully();
/*--------------------------------------------------------------*/
/*---------------- Inner Methods ----------------*/
/*--------------------------------------------------------------*/
/*--------------------------------------------------------------*/
/*---------------- Getters ----------------*/
/*--------------------------------------------------------------*/
public long basesWritten(){
long x=0;
ReadStreamWriter rsw1=getRS1();
ReadStreamWriter rsw2=getRS2();
if(rsw1!=null){x+=rsw1.basesWritten();}
if(rsw2!=null){x+=rsw2.basesWritten();}
return x;
}
public long readsWritten(){
long x=0;
ReadStreamWriter rsw1=getRS1();
ReadStreamWriter rsw2=getRS2();
if(rsw1!=null){x+=rsw1.readsWritten();}
if(rsw2!=null){x+=rsw2.readsWritten();}
return x;
}
public abstract ReadStreamWriter getRS1();
public abstract ReadStreamWriter getRS2();
/*--------------------------------------------------------------*/
/*---------------- Fields ----------------*/
/*--------------------------------------------------------------*/
public final FileFormat ff1, ff2;
public final boolean ordered;
boolean errorState=false;
boolean finishedSuccessfully=false;
boolean started=false;
/*--------------------------------------------------------------*/
/*---------------- Static Fields ----------------*/
/*--------------------------------------------------------------*/
public static boolean verbose=false;
}
|