1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
#ifndef PARALLEL_H
#define PARALLEL_H
#include "headers.h"
#include "definitions.h"
#include "tools.h"
#include "kindex.h"
//------ Threading tools --------------------------------------------//
/**
* Task data structure. Contains all information for a thread to process a BCL file.
* @author Martin Lindner
*/
struct Task {
/** The lane of the task. */
uint16_t lane;
/** The tile of the task. */
uint16_t tile;
/** Struct containing the read properties (Barcode vs. sequence; length; mate). */
SequenceElement seqEl;
/** Current cycle of the particular read (in general, this does NOT equal the sequencing cycle!). Must be <=seqEl.length. */
uint16_t cycle;
/**
* Constructor for a NULL task.
* @author Tobias Loka
*/
Task() : lane(255), tile(0), seqEl(NULLSEQ), cycle(0) {};
/**
* Constructor for a valid task.
* @param ln The lane number.
* @param tl The tile number.
* @param seq The respective seqEl element for the current read containing information about length, type (barcode vs. sequence), mate number ...
* @param cl The cycle of the current read (in general, this does NOT equal the sequencing cycle!). Must be <=seqEl.length.
* @author Martin Lindner
*/
Task(uint16_t ln, uint16_t tl, SequenceElement seq, uint16_t cl):
lane(ln), tile(tl), seqEl(seq), cycle(cl) {};
/** Constructor for a task without seqEl information. */
Task(uint16_t ln, uint16_t tl, uint16_t cl) : lane(ln), tile(tl), seqEl(NULLSEQ), cycle(cl) {};
/**
* Overload of the << operator. Defines the cout form of a task.
* @author Martin Lindner
*/
friend std::ostream& operator<<(std::ostream& os, const Task& t);
};
/**
* Overload of the == operator.
* @return true, if all fields/variables of the compared tasks equal.
* @author Martin Lindner
*/
inline bool operator==(const Task& l, const Task& r){ return (r.lane==l.lane)&&(r.tile==l.tile)&&(r.cycle==l.cycle)&&(r.seqEl==l.seqEl); }
/**
* Overload of the != operator.
* @return true, if at least one field/variable of the compared tasks is different.
* @author Martin Lindner
*/
inline bool operator!=(const Task& l, const Task& r){ return !(l==r); }
inline bool operator<(const Task& l, const Task& r){
if ( l.cycle == r.cycle) {
if ( l.lane == r.lane ) {
if ( l.tile == r.tile ) {
return l.seqEl.mate < r.seqEl.mate;
} else {
return l.tile < r.tile;
}
} else {
return l.lane < r.lane;
}
} else {
return l.cycle < r.cycle;
}
}
/**
* Definition of a NULL task.
* @author Martin Lindner
*/
const Task NO_TASK (255,0,NULLSEQ,0);
// Task queue data structure. Manages a list of task objects in a thread safe way.
class TaskQueue {
// the internal queue
std::queue<Task> tasks;
// mutex to ensure that only one process can access the queue at once
std::mutex m;
public:
// Add element to the task list
void push(Task t);
// Get element from the task list
Task pop();
// return the size of the queue
uint64_t size();
};
// Agenda item status
typedef uint8_t ItemStatus;
const ItemStatus WAITING = 0;
const ItemStatus BCL_AVAILABLE = 1;
const ItemStatus RUNNING = 2;
const ItemStatus FINISHED = 3;
const ItemStatus RETRY = 4;
const ItemStatus FAILED = 5;
const ItemStatus ERROR = std::numeric_limits<ItemStatus>::max();
// Agenda: monitors the sequencing process and manages the alignment
// - Monitors BCL files
// - generates new tasks
// - receive finished/fail signals
class Agenda {
// list of items on the agenda. items[cycle][lane][tile]
std::vector< std::vector< std::vector<ItemStatus> > > items;
// dataset information
uint16_t rlen;
std::vector<uint16_t> lanes;
std::vector<uint16_t> tiles;
public:
// initialize agenda with read length only (all lanes, all tiles)
Agenda (uint16_t rl) : Agenda (rl, all_lanes()) {};
// initialize agenda with read length and lanes (all tiles)
Agenda (uint16_t rl, std::vector<uint16_t> ln) : Agenda (rl, ln, all_tiles()) {};
// initialize agenda with read length, lanes and tiles
Agenda (uint16_t rl, std::vector<uint16_t> ln, std::vector<uint16_t> tl) : Agenda (rl, ln, tl, 1) {};
Agenda (uint16_t rl, std::vector<uint16_t> ln, std::vector<uint16_t> tl, CountType start_cycle) : rlen(rl), lanes(ln), tiles(tl) {
// set up the agenda
items.clear();
for ( CountType cycle_id = 0; cycle_id < rlen; ++cycle_id ) {
ItemStatus status = cycle_id < start_cycle - 1 ? FINISHED : WAITING;
std::vector<ItemStatus> cycle_status (tiles.size(), status);
std::vector<std::vector<ItemStatus> > lane_status;
for (uint16_t ln_id = 0; ln_id < lanes.size(); ln_id++) {
lane_status.push_back(cycle_status);
}
items.push_back(lane_status);
}
}
// check for BCL files and update item status
void update_status();
// generate a new task from the agenda
Task get_task();
// set the status of a task
void set_status(Task t, ItemStatus status);
// get the status of a task
ItemStatus get_status(Task t);
// check if all items of the agenda were processed, if possible
bool finished();
bool finished( CountType cycle );
bool cycle_available( CountType cycle );
// the total number of tasks on the agenda
uint32_t task_count();
// the total number of finished tasks on the agenda
uint32_t tasks_finished();
// generate a complete TaskQueue with tasks to generate SAM files
// SAM files can only be generated for tiles where all cycles are completed
std::vector<Task> get_SAM_tasks();
};
// create a vector with all lane numbers
std::vector<uint16_t> all_lanes();
// create a vector with one lane number
std::vector<uint16_t> one_lane(uint16_t l);
// create a vector with all tile numbers
std::vector<uint16_t> all_tiles();
// create a vector with one tile number
std::vector<uint16_t> one_tile(uint16_t t);
#endif /* PARALLEL_H */
|