File: ReadSupplierQueue.h

package info (click to toggle)
snap-aligner 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,988 kB
  • sloc: cpp: 36,500; ansic: 5,239; python: 227; makefile: 85; sh: 28
file content (221 lines) | stat: -rw-r--r-- 6,897 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/*++

Module Name:

    ReadSupplierQueue.h

Abstract:

    Headers for parallel queue of reads

Authors:

    Bill Bolosky, November, 2012

Environment:

    User mode service.

Revision History:


--*/

#pragma once
#include "Read.h"
#include "Compat.h"
#include "VariableSizeVector.h"
#include "VariableSizeMap.h"

using std::pair;

class ReadSupplierFromQueue;
class PairedReadSupplierFromQueue;

typedef VariableSizeVector<DataBatch> BatchVector;

struct ReadQueueElement {
    ReadQueueElement()
        : next(NULL), prev(NULL)
    {
        reads = (Read*) BigAlloc(MaxReadsPerElement * sizeof(Read));
    }

    ~ReadQueueElement()
    {
        BigDealloc(reads);
        reads = NULL;
    }

    // note this should be about read buffer size for input reads
#ifdef LONG_READS
    static const int    MaxReadsPerElement = 400; 
#else
    static const int    MaxReadsPerElement = 5000; 
#endif
    ReadQueueElement    *next;
    ReadQueueElement    *prev;
    int                 totalReads;
    Read*               reads;
    BatchVector         batches;

    void addToTail(ReadQueueElement *queueHead) {
        next = queueHead;
        prev = queueHead->prev;
        prev->next = this;
        next->prev = this;
    }

    void removeFromQueue() {
        prev->next = next;
        next->prev = prev;
        prev = next = NULL;
    }
};
    
class ReadSupplierQueue: public ReadSupplierGenerator, public PairedReadSupplierGenerator {
public:
    //
    // This queue can handle several different kinds of inputs and outputs.  It will do either single
    // ended or paired reads.  In both cases, it can accept multiple independent readers (typically
    // one per (pair of) input file(s).  For paired reads that come from pairs of input files (think
    // FASTQ) it will run them independently and then combine the results as they're extracted.  For
    // paired reads that come from single files (SAM/BAM/CRAM, etc.) it still uses two queues internally,
    // but they're both written by a single PairedReadReader.
    //

    //
    // The version for single ended reads.  This is useful for formats that can't be divided by the
    // RangeSplitter, like BAM (though that's theoretically possible, so maybe..)  It takes a set 
    // of readers (presumably for different files), each of which runs independently and in parallel.
    // 
    ReadSupplierQueue(ReadReader *i_reader);

    //
    // The version for paired reads for which each end comes from a different Reader (and presumably
    // file, think FASTQ).  This is mostly useful for cases where the RangeSplitter can't handle
    // the files, probably because they FASTQ files with unequal length reads).
    //
    ReadSupplierQueue(ReadReader *i_firstHalfReader, ReadReader *i_secondHalfReader);

    //
    // The version for paired reads that come from a single file but for which RangeSplitter won't
    // work (BAM, CRAM, compressed FASTQ, maybe SRA).
    //
    ReadSupplierQueue(PairedReadReader *pairedReader);
    
    virtual ~ReadSupplierQueue();

    bool startReaders();
    void waitUntilFinished();
    ReadSupplier *generateNewReadSupplier();
    PairedReadSupplier *generateNewPairedReadSupplier();
    ReaderContext* getContext();

    ReadQueueElement *getElement();     // Called from the supplier threads
    bool getElements(ReadQueueElement **element1, ReadQueueElement **element2);   // Called from supplier threads
    void doneWithElement(ReadQueueElement *element);
    void supplierFinished();

    void holdBatch(DataBatch batch);
    bool releaseBatch(DataBatch batch);

    static int BufferCount(int numThreads)
    { return (__max(numThreads,2) + 1) * BatchesPerElement; }

private:

    static const int BatchesPerElement = 4;

    void commonInit();

    ReadReader          *singleReader[2];   // Only [0] is filled in for single ended reads
    PairedReadReader    *pairedReader;      // This is filled in iff there are no single readers

    ReadQueueElement    readyQueue[2];      // Queue [1] is used only when there are two single end readers

    BatchTracker        tracker;            // track batches used in queues, use refcount per element (not per read)

    EventObject         throttle[2];        // Two throttles, one for each of the readers.  At least one must be open at all times.
    int balance;                            // The size of readyQueue[0] - the size of readyQueue[1].  This is used to throttle.
    static const int MaxImbalance = 5;      // Engage the throttle when |balance| > MaxImbalance

    volatile unsigned   elementSize;        // reads per element, used to ensure paired single readers use same size that is ~ buffer size
 
    int                 nReadersRunning;
    int                 nSuppliersRunning;
    volatile bool       allReadsQueued;

    ReadQueueElement* getEmptyElement(); // must hold the lock to call this

    bool areAnyReadsReady(); // must hold the lock to call this.

    //
    // Empty buffers waiting for the readers.
    //
    ReadQueueElement    emptyQueue[1];
  
    //
    // Just one lock for all of the shared objects (the queues and Waiter objects, and counts of
    // readers and suppliers running, as well as allReadsQueued).
    //
    ExclusiveLock       lock;
    EventObject         readsReady;
    EventObject         emptyBuffersAvailable;

    EventObject         allReadsConsumed;

    struct ReaderThreadParams {
        ReadSupplierQueue       *queue;
        bool                     isSecondReader;
    };

    static void ReaderThreadMain(void *);
    void ReaderThread(ReaderThreadParams *params);
};

//
// A read supplier that takes its data from a ReadSupplierQueue.
//
class ReadSupplierFromQueue: public ReadSupplier {
public:
    ReadSupplierFromQueue(ReadSupplierQueue *i_queue);
    ~ReadSupplierFromQueue() {}

    Read *getNextRead();
    
    virtual void holdBatch(DataBatch batch)
    { queue->holdBatch(batch); }

    virtual bool releaseBatch(DataBatch batch)
    { return queue->releaseBatch(batch); }

private:
    bool                done;
    ReadSupplierQueue   *queue;
    bool                outOfReads;
    ReadQueueElement    *currentElement;
    int                 nextReadIndex;          
};

class PairedReadSupplierFromQueue: public PairedReadSupplier {
public:
    PairedReadSupplierFromQueue(ReadSupplierQueue *i_queue, bool i_twoFiles);
    ~PairedReadSupplierFromQueue();

    bool getNextReadPair(Read **read0, Read **read1);

    virtual void holdBatch(DataBatch batch)
    { queue->holdBatch(batch); }

    virtual bool releaseBatch(DataBatch batch)
    { return queue->releaseBatch(batch); }

private:
    ReadSupplierQueue   *queue;
    bool                done;
    bool                twoFiles;
    ReadQueueElement    *currentElement;
    ReadQueueElement    *currentSecondElement;
    int                 nextReadIndex;          
};