File: DataReader.h

package info (click to toggle)
snap-aligner 2.0.3%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,652 kB
  • sloc: cpp: 41,051; ansic: 5,239; python: 227; makefile: 85; sh: 28
file content (217 lines) | stat: -rw-r--r-- 6,778 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*++

Module Name:

    DataReader.h

Abstract:

    Headers for the DataReader & related classes for the SNAP sequencer

Authors:

    Ravi Pandya, Jan 2013

Environment:

    User mode service.

Revision History:

--*/

#pragma once

#include "Compat.h"
#include "VariableSizeMap.h"
//
// This defines a family of composable classes for efficiently reading data with flow control.
//
// DataReader
//      Reads data from one or more files either sequentially, in ranges, or memory-mapped.
//      A DataReader should be accessed by only one thread at a time,
//      except for release() which may be called from any thread.
//      Divides data into sequential batches each of which is identified by a file ID and batch ID.
//      Data in a batch will remain stable until it is released by the consumer.
//      Consumers should release batches as soon as possible to make buffers free for read-ahead.
//      Batches may include extra data for higher layers that also remains stable.
//      Extra data size is defined as a factor of the underlying data size, and/or a fixed number of bytes.
//
// DataSupplier
//      A factory for DataReaders, which may be called from multiple threads.
//

struct DataBatch
{
    _uint32   fileID;
    _uint32   batchID;

    inline DataBatch() : fileID(0), batchID(0) {}

    inline DataBatch(_uint32 i_batchID, _uint32 i_fileID = 0) : fileID(i_fileID), batchID(i_batchID) {}

    inline DataBatch(const DataBatch& o) : fileID(o.fileID), batchID(o.batchID) {}
    
    static bool comparator(const DataBatch& a, const DataBatch& b)
    { return a.fileID < b.fileID || (a.fileID == b.fileID && a.batchID < b.batchID); }

    inline bool operator<=(const DataBatch& b) const
    { return fileID < b.fileID || (fileID == b.fileID && batchID <= b.batchID); }
    
    inline bool operator<(const DataBatch& b) const
    { return fileID < b.fileID || (fileID == b.fileID && batchID < b.batchID); }
    
    inline bool operator==(const DataBatch& b) const
    { return fileID == b.fileID && batchID == b.batchID; }
    
    inline bool operator!=(const DataBatch& b) const
    { return batchID != b.batchID || fileID != b.fileID; }

    inline DataBatch Min(const DataBatch& b) const
    { return *this <= b ? *this : b; }

    inline bool isZero() const
    { return fileID == 0 && batchID == 0; }

    // convert to _int64 for use as a hashtable key

    typedef _int64 Key;

    inline Key asKey()
    { return (((_int64) fileID) << 32) + (_int64) batchID; }

    inline DataBatch(Key key) : fileID((_uint32) (key >> 32)), batchID((_uint32) key) {}
};

// read data from a file or other source
// should all be called from a single thread, except for hold/releaseBatch which are thread-safe
class DataReader
{
public:
    DataReader() {}

    virtual ~DataReader() {}
    
    // initialize to use a specific filename
    virtual bool init(const char* fileName) = 0;

    // read bytes from the beginning of the file for the header
    virtual char* readHeader(_int64* io_headerSize) = 0;

    // seek to a particular range in the file.  Set amountOfFileToProcess to go to EOF.
    virtual void reinit(_int64 startingOffset, _int64 amountOfFileToProcess) = 0;

    // get all remaining data in current batch
    // return false if no more data in current batch
    // startBytes is data "owned" by this block in which reads may start
    // validBytes may also include overflow bytes to handle records spanning batches
    // if you advance() past startBytes, nextBatch() will start offset at that point
    virtual bool getData(char** o_buffer, _int64* o_validBytes, _int64* o_startBytes = NULL) = 0;

    // advance through data in current batch, reducing results from next getData call
    virtual void advance(_int64 bytes) = 0;

    // advance to next batch
    // by default automatically releases previous batch
    virtual void nextBatch() = 0;

    // whether current batch is last in file
    virtual bool isEOF() = 0;

    // get current batch identifier
    virtual DataBatch getBatch() = 0;
    
    // hold buffers associated with this batch for reuse, increments refcount
    // NOTE: this may be called from another thread,
    // so anything it touches must be thread-safe!
    virtual void holdBatch(DataBatch batch) = 0;

    // release buffers associated with this batch for reuse
    // decrements refcount, returns true if last release
    // NOTE: this may be called from another thread,
    // so anything it touches must be thread-safe!
    virtual bool releaseBatch(DataBatch batch) = 0;

    // get current offset into file
    virtual _int64 getFileOffset() = 0;

    // get pointer to extra data area for current batch
    // todo: allow this to grow dynamically while keeping stable pointers to previous data
    virtual void getExtra(char** o_extra, _int64* o_length) = 0;

    // get filename for debugging / error printing
    virtual const char* getFilename() = 0;

    // timing for performance tuning (in nanos)
    static volatile _int64 ReadWaitTime;
    static volatile _int64 ReleaseWaitTime;

    //
    // debugging
    //
    virtual void dumpState() {} // Override in a subclass if needed
};

class DataSupplier
{
public:
    DataSupplier() {}
    
    virtual ~DataSupplier() {}

    virtual DataReader* getDataReader(int bufferCount, _int64 overflowBytes, double extraFactor, size_t bufferSpace) = 0;

    //
    // creating specific factories
    //

    // 
    static DataSupplier* GzipBam(DataSupplier* inner);
    static DataSupplier* Gzip(DataSupplier* inner);
    static DataSupplier* StdioSupplier();

    // memmap works on both platforms (but better on Linux)
    static DataSupplier* MemMap;

#ifdef _MSC_VER
    // overlapped is only on Windows
    static DataSupplier* WindowsOverlapped;
#endif

    static DataSupplier* AsyncFile;

    // default raw data supplier for platform
    static DataSupplier* Default;
    static DataSupplier* GzipDefault;
    static DataSupplier* GzipBamDefault;

    static DataSupplier* GzipStdio;
    static DataSupplier* Stdio;
    static DataSupplier* GzipBamStdio;

    // hack: must be set to communicate thread count into suppliers
    static int ThreadCount;

    // hack: global for additional expansion factor
    static double ExpansionFactor;
};

// manages lifetime tracking for batches of reads
class BatchTracker
{
public:
    BatchTracker(int i_capacity);

    // reference was added from a batch, increment reference count
    // return true if first hold
    bool holdBatch(DataBatch batch);

    // reference was removed from a batch
    // returns true if the batch has no more references
    bool releaseBatch(DataBatch batch);

private:
    typedef VariableSizeMap<DataBatch::Key,unsigned> BatchMap;
    BatchMap pending;
};