File: plink2_bgzf.h

package info (click to toggle)
plink2 2.00~a3.5-220809%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 8,272 kB
  • sloc: cpp: 142,987; ansic: 546; makefile: 479; sh: 79; python: 37
file content (300 lines) | stat: -rw-r--r-- 12,182 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
#ifndef __PLINK2_BGZF_H__
#define __PLINK2_BGZF_H__

// This library is part of PLINK 2.00, copyright (C) 2005-2022 Shaun Purcell,
// Christopher Chang.
//
// This library is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by the
// Free Software Foundation, either version 3 of the License, or (at your
// option) any later version.
//
// This library is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
// for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this library.  If not, see <http://www.gnu.org/licenses/>.


// Binary BGZF readers and writers, based on libdeflate.
//
// htslib dependency has been dropped due to impedance mismatches with plink2
// in several areas:
// - input streams (in particular, being forced to open the file once to detect
//   file type, close it, and then reopen it as a BGZF* breaks pipe file
//   descriptors; and while PLINK 2.0 avoids guaranteeing that pipe file
//   descriptors work in any given scenario, there's still a big practical
//   difference between "usually works, with a few exceptions where two-pass
//   processing is simpler" and "always fails")
// - Windows multithreading
// - error handling
// (Note that htslib has great BGZF compression/decompression performance, as
// long as v1.8+ is used with libdeflate.)

#include "plink2_string.h"
#include "plink2_thread.h"
#include <libdeflate.h>

#ifdef __cplusplus
namespace plink2 {
#endif

HEADER_INLINE int32_t IsBgzfHeader(const void* buf) {
  const uint32_t magic4 = *S_CAST(const uint32_t*, buf);
  return ((magic4 & 0x4ffffff) == 0x4088b1f) && memequal_k(&(S_CAST(const unsigned char*, buf)[10]), "\6\0BC\2", 6);
}

typedef struct BgzfRawDecompressStreamStruct {
  unsigned char* in;
  struct libdeflate_decompressor* ldc;
  uint32_t in_size;
  uint32_t in_pos;
} BgzfRawDecompressStream;

// (tested a few different values for this, 1 MiB appears to work well on the
// systems we care most about)
CONSTI32(kDecompressChunkSize, 1048576);
static_assert(!(kDecompressChunkSize % kCacheline), "kDecompressChunkSize must be a multiple of kCacheline.");
static_assert(kDecompressChunkSize >= kMaxMediumLine, "kDecompressChunkSize too small.");

CONSTI32(kMaxBgzfDecompressThreads, 5);
CONSTI32(kMaxBgzfCompressedBlockSize, 65536);
static_assert(kMaxBgzfDecompressThreads * 3 * kMaxBgzfCompressedBlockSize < kDecompressChunkSize, "kMaxBgzfDecompressThreads too large relative to kDecompressChunkSize.");
CONSTI32(kMaxBgzfDecompressedBlockSize, 65536);
static_assert((kMaxBgzfDecompressedBlockSize % kCacheline) == 0, "kMaxBgzfDecompressedBlockSize is assumed to be a cacheline multiple.");

// Communication with reader thread (i.e. worker thread 0).
typedef struct BgzfMtReadCommWithRStruct {
  // Reader -> consumer.  Loaded-but-not-decompressed byte interval.
  uint32_t remaining_start;
  uint32_t remaining_end;
  uint32_t remaining_end_is_eof;
#if __cplusplus >= 201103L
  // Can't just use PglErr, since BgzfRawMtDecompressStream is part of a union,
  // hence BgzfMtReadBody must have a trivial default constructor.
  PglErr::ec reterr;
#else
  int32_t reterr;
#endif
  const char* errmsg;

  // Consumer -> reader.  Currently-being-decompressed byte interval.
  uint32_t locked_start;
  uint32_t locked_end;
} BgzfMtReadCommWithR;

// Communication with decompressor threads.
typedef struct BgzfMtReadCommWithDStruct {
  // Decompressors -> consumer.
  unsigned char* overflow;
  // Consumer is responsible for tracking
  // overflow_start/overflow_end/target_written.

  // This can be set to 1 by any decompressor thread, but no other changes are
  // possible, so we don't need an extra write-lock (or expand this into a
  // kMaxBgzfDecompressThreads-length array).  It takes precedence over
  // reader-thread errors, since it refers to earlier data.
  uint32_t invalid_bgzf;

  // Consumer -> decompressors.
  uint32_t target_capacity;
  unsigned char* target;
  uint32_t in_offsets[kMaxBgzfDecompressThreads + 1];
  uint32_t out_offsets[kMaxBgzfDecompressThreads];
} BgzfMtReadCommWithD;

typedef struct BgzfMtReadBodyStruct {
  // Thread 0 performs read-ahead into trs.in.
  // Threads 1..(thread_ct-1) perform decompression from trs.in directly to
  // target when there's sufficient space, and to overflow[] when there isn't.
  // overflow actually points 64KiB after the start of the associated buffer;
  // this simplifies handling of half-overflowing blocks.
  struct libdeflate_decompressor* ldcs[kMaxBgzfDecompressThreads];

  // Borrowed from consumer, not closed by CleanupBgzfRawMtStream().
  FILE* ff;

  unsigned char* in;

  BgzfMtReadCommWithR* cwr[2];
  BgzfMtReadCommWithD* cwd[2];

  uint32_t initial_compressed_byte_ct;
} BgzfMtReadBody;

typedef struct BgzfRawMtDecompressStreamStruct {
  BgzfMtReadBody body;
  ThreadGroup tg;  // stores thread_ct

  // Between thread-group-joins, the consumer can safely copy from
  // body.cwd[consumer_parity]->overflow, while the decompressor threads
  // continue writing to body.cwd[1 - consumer_parity].
  // overflow_start[consumer_parity]..overflow_end[consumer_parity] indicate
  // which body.cwd[consumer_parity]->overflow bytes remain to be copied, while
  // overflow_start[1 - consumer_parity]..overflow_end[1 - consumer_parity]
  // indicate the initial body.cwd[1 - consumer_parity]->overflow bounds after
  // the next thread-group-join.
  uint32_t overflow_start[2];
  uint32_t overflow_end[2];
  uint32_t consumer_parity;
  uint32_t eof;
} BgzfRawMtDecompressStream;

extern const char kShortErrInvalidBgzf[];

void PreinitBgzfRawMtStream(BgzfRawMtDecompressStream* bgzfp);

// Two modes:
// - Regular: ff must point 16 bytes into the file, and header[] must contain
//   the first 16 bytes.  bgzf_st_ptr must be nullptr.
// - Move-construction: ff is assumed to point to &(bgzf_st_ptr->in[in_size]).
//   header must be nullptr.
// decompress_thread_ct must be positive.  It is automatically reduced to
// kMaxBgzfDecompressThreads if necessary.
PglErr BgzfRawMtStreamInit(const char* header, uint32_t decompress_thread_ct, FILE* ff, BgzfRawDecompressStream* bgzf_st_ptr, BgzfRawMtDecompressStream* bgzfp, const char** errmsgp);

PglErr BgzfRawMtStreamRead(unsigned char* dst_end, BgzfRawMtDecompressStream* bgzfp, unsigned char** dst_iterp, const char** errmsgp);

PglErr BgzfRawMtStreamRetarget(const char* header, BgzfRawMtDecompressStream* bgzfp, FILE* next_ff, const char** errmsgp);

HEADER_INLINE PglErr BgzfRawMtStreamRewind(BgzfRawMtDecompressStream* bgzfp, const char** errmsgp) {
  return BgzfRawMtStreamRetarget(nullptr, bgzfp, nullptr, errmsgp);
}

void CleanupBgzfRawMtStream(BgzfRawMtDecompressStream* bgzfp);


// Compression strategy:
// - We have N compression-job memory slots, where N is the smallest power of 2
//   >= 4 * compressor_thread_ct.  (This could be adjusted and/or separately
//   configurable, any value >= 2 * compressor_thread_ct is reasonable.)
// - For the most common plink2 use case (VCF export), compression is over 90%
//   of the compute cost.  Thus, a good implementation must be optimized around
//   minimizing compressor-thread waits, in a setting where one core's workload
//   is split between production and compression and the other cores are
//   compressing all the time.
//   I first implemented the trivial load-balancing strategy (compressor thread
//   k processes blocks k, n+k, 2n+k, ..., where n is the number of compressor
//   threads), but that proved to be slightly slower than htslib's bgzf writer
//   in my testing; it didn't do a good enough job of managing the
//   split-between-production-and-compression core.
//   So I've switched to a more flexible thread pool using __atomic_fetch_add()
//   to distribute compression jobs.
// - There's a dedicated writer thread which flushes the compression results.
// I will look into adding direct support for this parallelization pattern to
// plink2_thread; the benefit is small (~5%), but applies to a lot of
// workloads.  It may also be worthwhile to implement some form of
// work-stealing.
//
// I tried making this less granular (each job contains two blocks instead of
// one) to see if there was still meaningful room for improvement re: reducing
// thread-synchronization overhead; that didn't seem to make any better use of
// the extra memory than keeping the basic granularity level and doubling the
// number of slots.

CONSTI32(kMaxBgzfCompressThreads, 15);
CONSTI32(kBgzfInputBlockSize, 0xff00);  // htslib BGZF_BLOCK_SIZE

typedef struct BgzfCompressStreamMainStruct BgzfCompressStreamMain;

typedef struct BgzfCompressCommWithWStruct {
  // Compressor -> writer.  One per block slot.
  unsigned char cbuf[kMaxBgzfCompressedBlockSize];
  uint32_t nbytes;  // UINT32_MAX = open, otherwise filled
  uint32_t eof;
#ifdef _WIN32
  HANDLE cbuf_filled_event;
  HANDLE cbuf_open_event;
#else
  pthread_mutex_t cbuf_mutex;
  pthread_cond_t cbuf_condvar;
#endif
} BgzfCompressCommWithW;

typedef struct BgzfCompressCommWithPStruct {
  // Producer -> compressor.  One per block slot.
  char ucbuf[kBgzfInputBlockSize];
#ifdef _WIN32
  HANDLE ucbuf_filled_event;
  HANDLE ucbuf_open_event;
#else
  pthread_mutex_t ucbuf_mutex;
  pthread_cond_t ucbuf_condvar;
#endif
  uint32_t nbytes;  // UINT32_MAX = open, otherwise filled
} BgzfCompressCommWithP;

typedef struct BgzfCompressorContextStruct {
  BgzfCompressStreamMain* parent;
  struct libdeflate_compressor* lc;
  // don't need tidx any more...
} BgzfCompressorContext;

struct BgzfCompressStreamMainStruct {
  NONCOPYABLE(BgzfCompressStreamMainStruct);
  FILE* ff;
  pthread_t* threads;  // n+1 elements, last element is writer
  BgzfCompressCommWithP** cwps;  // N elements
  BgzfCompressCommWithW** cwws;  // N elements

  BgzfCompressorContext* compressor_args;  // n elements

  // Atomically read/updated, guaranteed to be on its own cacheline.
  uintptr_t* next_job_idxp;

  int32_t write_errno;

  uint16_t slot_ct;
  uint16_t compressor_thread_ct;  // 0 if no compression
  uint16_t partial_slot_idx;  // this ucbuf must be open
  uint16_t partial_nbytes;  // always < kBgzfInputBlockSize

  // If nonzero, initialization didn't complete, and this value contains enough
  // (platform-specific) information to clean up properly.
  // Not an enum since we perform a bit of arithmetic on it.
  uint16_t unfinished_init_state;
};

typedef struct BgzfCompressStreamStruct {
  NONCOPYABLE(BgzfCompressStreamStruct);
#ifdef __cplusplus
  BgzfCompressStreamMain& GET_PRIVATE_m() { return m; }
  BgzfCompressStreamMain const& GET_PRIVATE_m() const { return m; }
 private:
#endif
  BgzfCompressStreamMain m;
} BgzfCompressStream;

void PreinitBgzfCompressStream(BgzfCompressStream* cstream_ptr);

CONSTI32(kBgzfDefaultClvl, 6);

// - Compression level 0 = plaintext (not a BGZF file at all), so application
//   code no longer has to explicitly branch on bgzf/non-bgzf output.
// - thread_ct only applies when clvl != 0.  When that's true, it's internally
//   clipped to [2, kMaxBgzfCompressThreads].
// - errno is set on open-fail.
PglErr InitBgzfCompressStreamEx(const char* out_fname, uint32_t do_append, uint32_t clvl, uint32_t thread_ct, BgzfCompressStream* cstream_ptr);

HEADER_INLINE PglErr InitBgzfCompressStream(const char* out_fname, uint32_t thread_ct, BgzfCompressStream* cstream_ptr) {
  return InitBgzfCompressStreamEx(out_fname, 0, kBgzfDefaultClvl, thread_ct, cstream_ptr);
}

// errno is set on write-fail.
BoolErr BgzfWrite(const char* buf, uintptr_t len, BgzfCompressStream* cstream_ptr);

BoolErr BgzfFlushTry(uint32_t capacity_needed_to_defer_flush, BgzfCompressStream* cstream_ptr);

HEADER_INLINE BoolErr BgzfFlush(BgzfCompressStream* cstream_ptr) {
  return BgzfFlushTry(kBgzfInputBlockSize, cstream_ptr);
}

BoolErr CleanupBgzfCompressStream(BgzfCompressStream* cstream_ptr, PglErr* reterrp);

#ifdef __cplusplus
}  // namespace plink2
#endif

#endif  // __PLINK2_BGZF_H__