File: DataWriter.h

package info (click to toggle)
snap-aligner 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,988 kB
  • sloc: cpp: 36,500; ansic: 5,239; python: 227; makefile: 85; sh: 28
file content (283 lines) | stat: -rw-r--r-- 8,714 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/*++

Module Name:

    DataWriter.h

Abstract:

    Headers for the DataWriter & related classes for the SNAP sequencer

Authors:

    Ravi Pandya, Feb 2013

Environment:

    User mode service.

Revision History:

--*/

#pragma once

#include "Compat.h"
#include "Read.h"
#include "ParallelTask.h"
#include "Genome.h"

// #define VALIDATE_WRITE 1

class DataWriterSupplier;
struct AlignerOptions;

// per-thread writer for data into a single destination
class DataWriter
{
public:
    
    enum FilterType
    {
        ReadFilter, // reads data but does not modify it
        ModifyFilter, // modifies data in place
        CopyFilter, // copies data into new buffer, same size
        TransformFilter, // copies data into new buffer, possibly different size
        ResizeFilter, // rewrites data in same buffer, possibly different size,
        DupMarkFilter
    };
    // single filter instance per thread
    // points to filterSupplier for common data
    class Filter
    {
    public:
        Filter (FilterType i_filterType) : filterType(i_filterType) {}

        const FilterType filterType;

        virtual ~Filter() {}

		// called to set whether we're writing a header vs. individual reads
		virtual void inHeader(bool flag) {} // default do nothing

        // called when a chunk of data (i.e. a single read) has been written into the file
        virtual void onAdvance(DataWriter* writer, size_t batchOffset, char* data, GenomeDistance bytes, GenomeLocation location) = 0;

        // called when a batch has been completed, after advancing to the next
        // e.g. so use getBatch(-1, ...) to get the one that was just completed
        // TransformFilters return #byte of transformed data in current buffer, so we need to advance again
        // TransformFilters should call getBatch(0) to ensure current buffer has been written before they write into it
        virtual size_t onNextBatch(DataWriter* writer, size_t offset, size_t bytes, bool lastBatch = false, bool* needMoreBuffer = NULL, size_t* fromBufferUsed = NULL) = 0;
    };
    
    // factory for per-thread filters
    class FilterSupplier
    {
    public:
        FilterSupplier (FilterType i_filterType) : filterType(i_filterType) {}
        
        const FilterType filterType;

        virtual ~FilterSupplier() {}

        FilterSupplier* compose(FilterSupplier* other);

        virtual Filter* getFilter() = 0;

        // called when entire file is done; onClosing before file is closed, onClosed after
        virtual void onClosing(DataWriterSupplier* supplier) = 0;
        virtual void onClosed(DataWriterSupplier* supplier) = 0;
    };

    DataWriter(Filter* i_filter) : filter(i_filter) {}

    virtual ~DataWriter() {}

	void inHeader(bool flag)
	{ if (filter != NULL) { filter->inHeader(flag); } }

    // get remaining space in current buffer for writing
    virtual bool getBuffer(char** o_buffer, size_t* o_size) = 0;

    // advance within current buffer, reducing available space
    // should be called on each read, with the location
    virtual void advance(_int64 bytes, GenomeLocation location = 0) = 0;

    // get complete data buffer in batch, relative==0 is current, relative==-1 is previous, etc.
    // if negative gets old data written, else waits for write to complete so you can write into it
    // o_offset gets physical offset (e.g. compressed), o_logical gets data offset (e.g. uncompressed)
    virtual bool getBatch(int relative, char** o_buffer, size_t* o_size = NULL, size_t* o_used = NULL, size_t* o_offset = NULL, size_t* o_logicalUsed = 0, size_t* o_logicalOffset = NULL) = 0;

    // advance to next buffer
    virtual bool nextBatch(bool lastBatch = false) = 0;

    // this thread is complete
    virtual void close() = 0;

    // nanosecond timers
    static volatile _int64 FilterTime;
    static volatile _int64 WaitTime;

protected:
    Filter* filter;
};

class FileFormat;
class Genome;
class GzipWriterFilterSupplier;
class FileEncoder;

// creates writers for multiple threads
class DataWriterSupplier
{
public:
    virtual ~DataWriterSupplier() {}

    virtual DataWriter* getWriter() = 0;

    // call when all threads are done, all filters destroyed
    virtual void close() = 0;
    
    static DataWriterSupplier* create(
        const char* filename,
        size_t bufferSize,
        bool emitInternalScore,
        char *internalScoreTag,
        DataWriter::FilterSupplier* filterSupplier = NULL,
        FileEncoder* encoder = NULL,
        int count = 4);
    
    static DataWriterSupplier* sorted(
        const FileFormat* format,
        const Genome* genome,
        const char* tempFileName,
        size_t tempBufferMemory,
        int numThreads,
        const char* sortedFileName,
        DataWriter::FilterSupplier* sortedFilterSupplier,
        size_t maxBufferSize,
        bool emitInternalScore,
        char *internalScoreTag,
        FileEncoder* encoder = NULL);

    static char *generateSortIntermediateFilePathName(AlignerOptions *options);

    static DataWriter::FilterSupplier* samMarkDuplicates(const Genome* genome);

    // defaults follow BAM output spec
    static GzipWriterFilterSupplier* gzip(bool bamFormat, size_t chunkSize, int numThreads, bool bindToProcessors, bool multiThreaded);

    static DataWriter::FilterSupplier* bamMarkDuplicates(const Genome* genome);

    static DataWriter::FilterSupplier* bamIndex(const char* indexFileName, const Genome* genome, GzipWriterFilterSupplier* gzipSupplier);
};

class AsyncDataWriter;

class FileEncoder
{
public:
    FileEncoder(int numThreads, bool bindToProcessors, ParallelWorkerManager* i_supplier);

    ~FileEncoder()
    {
        if (coworker != NULL) {
            _ASSERT(! encoderRunning); coworker->stop(); delete coworker;
        }
    }

    static FileEncoder* gzip(GzipWriterFilterSupplier* filterSupplier, int numThreads, bool bindToProcessor, size_t chunkSize = 65536, bool bam = true);

    // post-construction initialization
    void initialize(AsyncDataWriter* i_writer);

    // called by writer when there is data to encode; threadsafe
    void inputReady();

    void close();

    // called by supplier to get/set information about current batch

    void setupEncode(int relative);
    
    void getEncodeBatch(char** o_batch, size_t* o_batchSize, size_t* o_batchUsed);

    void getOffsets(size_t* o_logicalOffset, size_t* o_physicalOffset);

    void setEncodedBatchSize(size_t newSize);

private:
    // static callback for encoder; threadsafe
    static void outputReadyCallback(void *p);

    // called by encoder when a block of data has been encoded; threadsafe
    void outputReady();

    // scans writer and kicks off encoder if there is something ready; must hold lock
    void checkForInput();

    AsyncDataWriter* writer;
    ParallelCoworker* coworker;
    ExclusiveLock* lock;
    bool encoderRunning;
    int encoderBatch;

    friend class AsyncDataWriter;
};

class StdoutAsyncFile : public AsyncFile
{
public:
    StdoutAsyncFile();
    virtual ~StdoutAsyncFile();

    bool close();

    static StdoutAsyncFile *open(const char *filename, bool write);

    AsyncFile::Writer* getWriter();
    AsyncFile::Reader* getReader();

    void beginWrite(void *buffer, size_t length, size_t offset, size_t *o_bytesWritten);
    void waitForCompletion(size_t offset);

private:
    ExclusiveLock   lock;

    struct WriteElement {
        void                *buffer;
        size_t               length;
        size_t               offset;
        size_t              *o_bytesWritten;

        WriteElement        *next;
        WriteElement        *prev;

        void enqueue(WriteElement *previous);
        void dequeue();
    };

    bool isQueueEmpty() {
        return writeElementQueue->next == writeElementQueue;
    }

    size_t          highestOffsetCompleted;

    //
    // The queue is kept in order, and the writer writes without gaps, so if you put on blocks 10 and 12, the writer will write
    // 10, and then leave 12 on the queue and wait for 11 to be added and written before processing 12.
    //
    WriteElement writeElementQueue[1];

    EventObject  unexaminedElementsOnQueue;     // This gets set when a writer puts a block on the queue, and cleared when the consumer has seen it.
    EventObject  elementsCompleted;             // This gets set when any element is completed by the consumer, and reset when a waiter starts

    SingleWaiterObject  consumerThreadDone;

    bool                closing;

    static void ConsumerThreadMain(void *param);
    void runConsumer();

    static bool anyCreated;              // Because there's no way to multiplex stdout, you only get one per run of SNAP
};