1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
#ifndef __FASTX_PARSER__
#define __FASTX_PARSER__
#include "fcntl.h"
#include "unistd.h"
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <thread>
#include <vector>
extern "C" {
#include "kseq.h"
}
#include "concurrentqueue.h"
#ifndef __FASTX_PARSER_PRECXX14_MAKE_UNIQUE__
#define __FASTX_PARSER_PRECXX14_MAKE_UNIQUE__
#if __cplusplus >= 201402L
#include <memory>
using std::make_unique;
#else
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>
template <class T> struct _Unique_if {
using _Single_object = std::unique_ptr<T>;
};
template <class T> struct _Unique_if<T[]> {
using _Unknown_bound = std::unique_ptr<T[]>;
};
template <class T, size_t N> struct _Unique_if<T[N]> {
using _Known_bound = void;
};
template <class T, class... Args>
typename _Unique_if<T>::_Single_object make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
template <class T>
typename _Unique_if<T>::_Unknown_bound make_unique(size_t n) {
using U = typename std::remove_extent<T>::type;
return std::unique_ptr<T>(new U[n]());
}
template <class T, class... Args>
typename _Unique_if<T>::_Known_bound make_unique(Args&&...) = delete;
#endif // C++11
#endif //__FASTX_PARSER_PRECXX14_MAKE_UNIQUE__
namespace fastx_parser {
struct ReadSeq {
std::string seq;
std::string name;
~ReadSeq() {}
};
struct ReadPair {
ReadSeq first;
ReadSeq second;
};
template <typename T> class ReadChunk {
public:
ReadChunk(size_t want) : group_(want), want_(want), have_(want) {}
inline void have(size_t num) { have_ = num; }
inline size_t size() { return have_; }
inline size_t want() const { return want_; }
T& operator[](size_t i) { return group_[i]; }
typename std::vector<T>::iterator begin() { return group_.begin(); }
typename std::vector<T>::iterator end() { return group_.begin() + have_; }
private:
std::vector<T> group_;
size_t want_;
size_t have_;
};
template <typename T> class ReadGroup {
public:
ReadGroup(moodycamel::ProducerToken&& pt, moodycamel::ConsumerToken&& ct)
: pt_(std::move(pt)), ct_(std::move(ct)) {}
moodycamel::ConsumerToken& consumerToken() { return ct_; }
moodycamel::ProducerToken& producerToken() { return pt_; }
// get a reference to the chunk this ReadGroup owns
std::unique_ptr<ReadChunk<T>>& chunkPtr() { return chunk_; }
// get a *moveable* reference to the chunk this ReadGroup owns
std::unique_ptr<ReadChunk<T>>&& takeChunkPtr() { return std::move(chunk_); }
inline void have(size_t num) { chunk_->have(num); }
inline size_t size() { return chunk_->size(); }
inline size_t want() const { return chunk_->want(); }
T& operator[](size_t i) { return (*chunk_)[i]; }
typename std::vector<T>::iterator begin() { return chunk_->begin(); }
typename std::vector<T>::iterator end() {
return chunk_->begin() + chunk_->size();
}
void setChunkEmpty() { chunk_.release(); }
bool empty() const { return chunk_.get() == nullptr; }
private:
std::unique_ptr<ReadChunk<T>> chunk_{nullptr};
moodycamel::ProducerToken pt_;
moodycamel::ConsumerToken ct_;
};
template <typename T> class FastxParser {
public:
FastxParser(std::vector<std::string> files, uint32_t numConsumers,
uint32_t numParsers = 1, uint32_t chunkSize = 1000);
FastxParser(std::vector<std::string> files, std::vector<std::string> files2,
uint32_t numConsumers, uint32_t numParsers = 1,
uint32_t chunkSize = 1000);
~FastxParser();
bool start();
bool stop();
ReadGroup<T> getReadGroup();
bool refill(ReadGroup<T>& rg);
void finishedWithGroup(ReadGroup<T>& s);
private:
moodycamel::ProducerToken getProducerToken_();
moodycamel::ConsumerToken getConsumerToken_();
std::vector<std::string> inputStreams_;
std::vector<std::string> inputStreams2_;
uint32_t numParsers_;
std::atomic<uint32_t> numParsing_;
// NOTE: Would like to use std::future<int> here instead, but that
// solution doesn't seem to work. It's unclear exactly why
// see (https://twitter.com/nomad421/status/917748383321817088)
std::vector<std::unique_ptr<std::thread>> parsingThreads_;
// holds the results of the parsing threads, which is simply equal to
// the return value of kseq_read() for the last call to that function.
// A value < -1 signifies some sort of error.
std::vector<int> threadResults_;
size_t blockSize_;
moodycamel::ConcurrentQueue<std::unique_ptr<ReadChunk<T>>> readQueue_,
seqContainerQueue_;
// holds the indices of files (file-pairs) to be processed
moodycamel::ConcurrentQueue<uint32_t> workQueue_;
std::vector<std::unique_ptr<moodycamel::ProducerToken>> produceReads_;
std::vector<std::unique_ptr<moodycamel::ConsumerToken>> consumeContainers_;
bool isActive_{false};
};
}
#endif // __FASTX_PARSER__
|