1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
/*++
Module Name:
DataWriter.h
Abstract:
Headers for the DataWriter & related classes for the SNAP sequencer
Authors:
Ravi Pandya, Feb 2013
Environment:
User mode service.
Revision History:
--*/
#pragma once
#include "Compat.h"
#include "Read.h"
#include "ParallelTask.h"
#include "Genome.h"
// #define VALIDATE_WRITE 1
class DataWriterSupplier;
struct AlignerOptions;
// per-thread writer for data into a single destination
class DataWriter
{
public:
enum FilterType
{
ReadFilter, // reads data but does not modify it
ModifyFilter, // modifies data in place
CopyFilter, // copies data into new buffer, same size
TransformFilter, // copies data into new buffer, possibly different size
ResizeFilter, // rewrites data in same buffer, possibly different size,
DupMarkFilter
};
// single filter instance per thread
// points to filterSupplier for common data
class Filter
{
public:
Filter (FilterType i_filterType) : filterType(i_filterType) {}
const FilterType filterType;
virtual ~Filter() {}
// called to set whether we're writing a header vs. individual reads
virtual void inHeader(bool flag) {} // default do nothing
// called when a chunk of data (i.e. a single read) has been written into the file
virtual void onAdvance(DataWriter* writer, size_t batchOffset, char* data, GenomeDistance bytes, GenomeLocation location) = 0;
// called when a batch has been completed, after advancing to the next
// e.g. so use getBatch(-1, ...) to get the one that was just completed
// TransformFilters return #byte of transformed data in current buffer, so we need to advance again
// TransformFilters should call getBatch(0) to ensure current buffer has been written before they write into it
virtual size_t onNextBatch(DataWriter* writer, size_t offset, size_t bytes, bool lastBatch = false, bool* needMoreBuffer = NULL, size_t* fromBufferUsed = NULL) = 0;
};
// factory for per-thread filters
class FilterSupplier
{
public:
FilterSupplier (FilterType i_filterType) : filterType(i_filterType) {}
const FilterType filterType;
virtual ~FilterSupplier() {}
FilterSupplier* compose(FilterSupplier* other);
virtual Filter* getFilter() = 0;
// called when entire file is done; onClosing before file is closed, onClosed after
virtual void onClosing(DataWriterSupplier* supplier) = 0;
virtual void onClosed(DataWriterSupplier* supplier) = 0;
};
DataWriter(Filter* i_filter) : filter(i_filter) {}
virtual ~DataWriter() {}
void inHeader(bool flag)
{ if (filter != NULL) { filter->inHeader(flag); } }
// get remaining space in current buffer for writing
virtual bool getBuffer(char** o_buffer, size_t* o_size) = 0;
// advance within current buffer, reducing available space
// should be called on each read, with the location
virtual void advance(_int64 bytes, GenomeLocation location = 0) = 0;
// get complete data buffer in batch, relative==0 is current, relative==-1 is previous, etc.
// if negative gets old data written, else waits for write to complete so you can write into it
// o_offset gets physical offset (e.g. compressed), o_logical gets data offset (e.g. uncompressed)
virtual bool getBatch(int relative, char** o_buffer, size_t* o_size = NULL, size_t* o_used = NULL, size_t* o_offset = NULL, size_t* o_logicalUsed = 0, size_t* o_logicalOffset = NULL) = 0;
// advance to next buffer
virtual bool nextBatch(bool lastBatch = false) = 0;
// this thread is complete
virtual void close() = 0;
// nanosecond timers
static volatile _int64 FilterTime;
static volatile _int64 WaitTime;
protected:
Filter* filter;
};
class FileFormat;
class Genome;
class GzipWriterFilterSupplier;
class FileEncoder;
// creates writers for multiple threads
class DataWriterSupplier
{
public:
virtual ~DataWriterSupplier() {}
virtual DataWriter* getWriter() = 0;
// call when all threads are done, all filters destroyed
virtual void close() = 0;
static DataWriterSupplier* create(
const char* filename,
size_t bufferSize,
bool emitInternalScore,
char *internalScoreTag,
DataWriter::FilterSupplier* filterSupplier = NULL,
FileEncoder* encoder = NULL,
int count = 4);
static DataWriterSupplier* sorted(
const FileFormat* format,
const Genome* genome,
const char* tempFileName,
size_t tempBufferMemory,
int numThreads,
const char* sortedFileName,
DataWriter::FilterSupplier* sortedFilterSupplier,
size_t maxBufferSize,
bool emitInternalScore,
char *internalScoreTag,
FileEncoder* encoder = NULL);
static char *generateSortIntermediateFilePathName(AlignerOptions *options);
static DataWriter::FilterSupplier* samMarkDuplicates(const Genome* genome);
// defaults follow BAM output spec
static GzipWriterFilterSupplier* gzip(bool bamFormat, size_t chunkSize, int numThreads, bool bindToProcessors, bool multiThreaded);
static DataWriter::FilterSupplier* bamMarkDuplicates(const Genome* genome);
static DataWriter::FilterSupplier* bamIndex(const char* indexFileName, const Genome* genome, GzipWriterFilterSupplier* gzipSupplier);
};
class AsyncDataWriter;
class FileEncoder
{
public:
FileEncoder(int numThreads, bool bindToProcessors, ParallelWorkerManager* i_supplier);
~FileEncoder()
{
if (coworker != NULL) {
_ASSERT(! encoderRunning); coworker->stop(); delete coworker;
}
}
static FileEncoder* gzip(GzipWriterFilterSupplier* filterSupplier, int numThreads, bool bindToProcessor, size_t chunkSize = 65536, bool bam = true);
// post-construction initialization
void initialize(AsyncDataWriter* i_writer);
// called by writer when there is data to encode; threadsafe
void inputReady();
void close();
// called by supplier to get/set information about current batch
void setupEncode(int relative);
void getEncodeBatch(char** o_batch, size_t* o_batchSize, size_t* o_batchUsed);
void getOffsets(size_t* o_logicalOffset, size_t* o_physicalOffset);
void setEncodedBatchSize(size_t newSize);
private:
// static callback for encoder; threadsafe
static void outputReadyCallback(void *p);
// called by encoder when a block of data has been encoded; threadsafe
void outputReady();
// scans writer and kicks off encoder if there is something ready; must hold lock
void checkForInput();
AsyncDataWriter* writer;
ParallelCoworker* coworker;
ExclusiveLock* lock;
bool encoderRunning;
int encoderBatch;
friend class AsyncDataWriter;
};
class StdoutAsyncFile : public AsyncFile
{
public:
StdoutAsyncFile();
virtual ~StdoutAsyncFile();
bool close();
static StdoutAsyncFile *open(const char *filename, bool write);
AsyncFile::Writer* getWriter();
AsyncFile::Reader* getReader();
void beginWrite(void *buffer, size_t length, size_t offset, size_t *o_bytesWritten);
void waitForCompletion(size_t offset);
private:
ExclusiveLock lock;
struct WriteElement {
void *buffer;
size_t length;
size_t offset;
size_t *o_bytesWritten;
WriteElement *next;
WriteElement *prev;
void enqueue(WriteElement *previous);
void dequeue();
};
bool isQueueEmpty() {
return writeElementQueue->next == writeElementQueue;
}
size_t highestOffsetCompleted;
//
// The queue is kept in order, and the writer writes without gaps, so if you put on blocks 10 and 12, the writer will write
// 10, and then leave 12 on the queue and wait for 11 to be added and written before processing 12.
//
WriteElement writeElementQueue[1];
EventObject unexaminedElementsOnQueue; // This gets set when a writer puts a block on the queue, and cleared when the consumer has seen it.
EventObject elementsCompleted; // This gets set when any element is completed by the consumer, and reset when a waiter starts
SingleWaiterObject consumerThreadDone;
bool closing;
static void ConsumerThreadMain(void *param);
void runConsumer();
static bool anyCreated; // Because there's no way to multiplex stdout, you only get one per run of SNAP
};
|