File: parallel.cpp

package info (click to toggle)
hilive 2.0a-5
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 956 kB
  • sloc: cpp: 4,723; python: 241; xml: 188; sh: 35; makefile: 14
file content (221 lines) | stat: -rw-r--r-- 6,216 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
#include "parallel.h"

std::ostream& operator<<(std::ostream& os, const Task& t)
{
  std::string mate = t.seqEl.mate == 0 ? "b" : std::to_string(t.seqEl.mate);
  os << "Lane " << t.lane << " Tile " << t.tile << " Cycle " << mate << "." << t.cycle;
  return os;
}

// Add element to the task list
void TaskQueue::push(Task t) {
  std::lock_guard<std::mutex> lk(m);
  tasks.push(t);
}
  
// Get element from the task list. If TaskList is empty, return NO_TASK.
Task TaskQueue::pop() {
  std::lock_guard<std::mutex> lk(m);
  if (!tasks.empty()) {
    Task t = tasks.front();
    tasks.pop();
    return t;
  }
  else {
    return NO_TASK;
  }
}

// return the size of the queue
uint64_t TaskQueue::size() {
  std::lock_guard<std::mutex> lk(m);
  return tasks.size();
}

// create a vector with one lane number
std::vector<uint16_t> one_lane(uint16_t l) {
  return std::vector<uint16_t> (1,l);
}

// create a vector with one tile number
std::vector<uint16_t> one_tile(uint16_t t) {
  return std::vector<uint16_t> (1,t);
}

// check for BCL files and update item status
void Agenda::update_status () {

	// iterate over lanes
	for (uint16_t ln_id = 0; ln_id < lanes.size(); ++ln_id) {

		// iterate over all lanes
		for (uint16_t tl_id = 0; tl_id < tiles.size(); ++tl_id) {

			// get the first tile that is not in the FINISHED status
			uint16_t first_unfinished = 0;
			while ( (first_unfinished < items.size()) && (items[first_unfinished][ln_id][tl_id] == FINISHED)) {
				first_unfinished++;
			}

			// if there is one, check if there is a BCL file available
			if ((first_unfinished != items.size()) && (items[first_unfinished][ln_id][tl_id] == WAITING)) {
				std::string this_fname = get_bcl_fname(lanes[ln_id], tiles[tl_id], first_unfinished + 1);
				// only change the status if the file exists
				if ( file_exists(this_fname) ) {
					// TODO: probably find a way to check if the machine currently writes to that file
					items[first_unfinished][ln_id][tl_id] = BCL_AVAILABLE;
				}
			}
		}
	}
}

// generate a new task from the agenda
Task Agenda::get_task(){

	// iterate over cycles
	for (uint16_t cycle_id = 0; cycle_id < items.size(); ++cycle_id) {

		// iterate over all tiles
		for (uint16_t ln_id = 0; ln_id < items[cycle_id].size(); ++ln_id) {

			// check if there is a cycle with an unprocessed BCL file
			uint16_t unprocessed = 0;
			while ( (unprocessed < items[cycle_id][ln_id].size()) && (items[cycle_id][ln_id][unprocessed] != BCL_AVAILABLE)) {
				unprocessed++;
			}

			// generate a new task if there is an unprocessed BCL file
			if ( unprocessed != items[cycle_id][ln_id].size() ) {
				uint16_t cycle = cycle_id + 1;
				uint16_t read_no = 0;
				while ( cycle > globalAlignmentSettings.get_seq_by_id(read_no).length) {
					cycle -= globalAlignmentSettings.get_seq_by_id(read_no).length;
					read_no += 1;
				}
				Task t (lanes[ln_id], tiles[unprocessed], globalAlignmentSettings.get_seq_by_id(read_no), cycle);
				return t;
			}
		}
	}
	// return indicator that no new task could be created
	return NO_TASK;
}

// set a status
void Agenda::set_status(Task t, ItemStatus status) {
  // get the lane index
  uint64_t diff = std::find(lanes.begin(), lanes.end(), t.lane) - lanes.begin();
  if ( diff >= lanes.size() ) {
    throw std::out_of_range("Lane ID out of range.");
  }
  uint16_t ln_id = diff;
  
  // get the tile index
  diff = std::find(tiles.begin(), tiles.end(), t.tile) - tiles.begin();
  if ( diff >= tiles.size() ) {
    throw std::out_of_range("Tile ID out of range.");
  }
  uint16_t tl_id = diff;

  // get the cycle index
  if ( (t.cycle > rlen) || (t.cycle == 0) ) {
    throw std::out_of_range("Cycle out of range.");
  }
  uint16_t cl_id = getSeqCycle(t.cycle,t.seqEl.id) -1;

  items[cl_id][ln_id][tl_id] = status;
}

// get the status of a task
ItemStatus Agenda::get_status(Task t) {
  // get the lane index
  uint64_t diff = std::find(lanes.begin(), lanes.end(), t.lane) - lanes.begin();
  if ( diff >= lanes.size() ) {
    throw std::out_of_range("Lane ID out of range.");
  }
  uint16_t ln_id = diff;
  
  // get the tile index
  diff = std::find(tiles.begin(), tiles.end(), t.tile) - tiles.begin();
  if ( diff >= tiles.size() ) {
    throw std::out_of_range("Tile ID out of range.");
  }
  uint16_t tl_id = diff;

  // get the cycle index
  if ( (t.cycle > rlen) || (t.cycle == 0) ) {
    throw std::out_of_range("Cycle out of range.");
  }
  uint16_t cl_id = t.cycle -1;

  return items[cl_id][ln_id][tl_id];
}

// check if all items of the agenda were processed, if possible
bool Agenda::finished() {

	return finished( items.size() );

}

// check if all items of the agenda were processed, if possible
bool Agenda::finished( CountType cycle ) {

	if ( cycle > items.size() ) {
		return false;
	}

	for (uint16_t ln_id = 0; ln_id < lanes.size(); ++ln_id) {
		for (uint16_t tl_id = 0; tl_id < tiles.size(); ++tl_id) {
			for (uint16_t cl_id = 0; cl_id < cycle; ++cl_id) {
				ItemStatus s = items[cl_id][ln_id][tl_id];
				if ( s == FAILED ) {
					// the rest of the tile is "allowed" to be unprocessed --> skip
					break;
				}
				else if (s != FINISHED) {
					// otherwise any other status means that the agenda is not finished
					return false;
				}
			}
		}
	}
	return true;
}

bool Agenda::cycle_available( CountType cycle ) {

	if ( cycle == 0 || cycle > items.size() )
		return false;

	for (uint16_t ln_id = 0; ln_id < items[cycle-1].size(); ++ln_id) {
		for (uint16_t tl_id = 0; tl_id < items[cycle-1][ln_id].size(); ++tl_id) {
			if ( items[cycle-1][ln_id][tl_id] == WAITING )
				return false;
		}
	}

	return true;
}

// the total number of tasks on the agenda
uint32_t Agenda::task_count() {
  return lanes.size() * tiles.size() * rlen;
}

// the total number of finished tasks on the agenda
uint32_t Agenda::tasks_finished() {
	uint32_t num_finished = 0;
	// iterate over all items and count the finished tasks
	for (uint16_t cl_id = 0; cl_id < items.size(); ++cl_id) {
		for (uint16_t ln_id = 0; ln_id < items[cl_id].size(); ++ln_id) {
			for (uint16_t tl_id = 0; tl_id < items[cl_id][ln_id].size(); ++tl_id) {
				if (items[cl_id][ln_id][tl_id] == FINISHED) {
					++num_finished;
				}
			}
		}
	}
	return num_finished;
}