File: NetworkSequenceCollection.h

package info (click to toggle)
abyss 2.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,284 kB
  • sloc: cpp: 78,182; ansic: 6,512; makefile: 2,252; perl: 672; sh: 509; haskell: 412; python: 4
file content (285 lines) | stat: -rw-r--r-- 8,139 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#ifndef NETWORKSEQUENCECOLLECTION_H
#define NETWORKSEQUENCECOLLECTION_H 1

#include "CommLayer.h"
#include "MessageBuffer.h"
#include "SequenceCollection.h"
#include "Assembly/BranchGroup.h"
#include "Common/Timer.h"
#include "DataLayer/FastaWriter.h"
#include <ostream>
#include <set>
#include <utility>
#include "Common/InsOrderedMap.h"

namespace NSC
{
	typedef InsOrderedMap<std::string, int> dbMap;
	dbMap moveFromAaStatMap();
}

enum NetworkAssemblyState
{
	NAS_LOADING, // loading sequences
	NAS_LOAD_COMPLETE, // loading is complete
	NAS_GEN_ADJ, // generating the sequence data
	NAS_ADJ_COMPLETE, // adjacency generation is complete
	NAS_ERODE, // erode the branch ends one sequence at a time
	NAS_ERODE_WAITING,
	NAS_ERODE_COMPLETE,
	NAS_TRIM, // trimming the data
	NAS_REMOVE_MARKED, // remove marked sequences
	NAS_COVERAGE, // remove low-coverage contigs
	NAS_COVERAGE_COMPLETE,
	NAS_DISCOVER_BUBBLES, // discover read errors/SNPs
	NAS_POPBUBBLE, // remove read errors/SNPs
	NAS_MARK_AMBIGUOUS, // mark ambiguous branches
	NAS_SPLIT_AMBIGUOUS, // split ambiguous branches
	NAS_CLEAR_FLAGS, // clear the flags
	NAS_ASSEMBLE, // assembling the data
	NAS_ASSEMBLE_COMPLETE, // assembling is complete
	NAS_WAITING, // non-control process is waiting
	NAS_DONE // finished, clean up and exit
};

typedef std::map<uint64_t, BranchGroup> BranchGroupMap;

/** A distributed map of vertices to vertex properties. */
class NetworkSequenceCollection
{
	public:
		typedef SequenceDataHash::key_type V;

		typedef SequenceDataHash::key_type key_type;
		typedef SequenceDataHash::mapped_type mapped_type;
		typedef SequenceDataHash::value_type value_type;
		typedef SequenceDataHash::iterator iterator;
		typedef SequenceDataHash::const_iterator const_iterator;

		typedef mapped_type::Symbol Symbol;
		typedef mapped_type::SymbolSet SymbolSet;
		typedef mapped_type::SymbolSetPair SymbolSetPair;

		// Used by boost::vertex_bundle_type
		typedef mapped_type vertex_bundled;

		NetworkSequenceCollection()
			: m_state(NAS_WAITING), m_trimStep(0),
			m_numPopped(0), m_numAssembled(0) { }

		size_t performNetworkTrim();

		size_t performNetworkDiscoverBubbles();
		size_t performNetworkPopBubbles(std::ostream& out);

		size_t controlErode();
		size_t controlTrimRound(unsigned trimLen);
		void controlTrim(unsigned&, unsigned start = 1);
		size_t controlRemoveMarked();
		void controlCoverage();
		size_t controlDiscoverBubbles();
		size_t controlPopBubbles(std::ostream& out);
		size_t controlMarkAmbiguous();
		size_t controlSplitAmbiguous();
		size_t controlSplit();

		// Perform a network assembly
		std::pair<size_t, size_t> performNetworkAssembly(
				FastaWriter* fileWriter = NULL);

		void add(const V& seq, unsigned coverage = 1);
		void remove(const V& seq);
		void setFlag(const V& seq, SeqFlag flag);

		/** Mark the specified sequence in both directions. */
		void mark(const V& seq)
		{
			setFlag(seq, SeqFlag(SF_MARK_SENSE | SF_MARK_ANTISENSE));
		}

		/** Mark the specified sequence. */
		void mark(const V& seq, extDirection sense)
		{
			setFlag(seq, sense == SENSE
					? SF_MARK_SENSE : SF_MARK_ANTISENSE);
		}

		/** Return true if this container is empty. */
		bool empty() const { return m_data.empty(); }

		void printLoad() const { m_data.printLoad(); }

		void removeExtension(const V& seq, extDirection dir,
				SymbolSet ext);

		/** Remove the specified edge of this vertex. */
		void removeExtension(const V& seq, extDirection dir, Symbol base)
		{
			removeExtension(seq, dir, SymbolSet(base));
		}

		bool setBaseExtension(const V& seq, extDirection dir, Symbol base);

		// Receive and dispatch packets.
		size_t pumpNetwork();
		size_t pumpFlushReduce();

		void completeOperation();

		// run the assembly
		void run();

		// run the assembly from the controller's point of view
		void runControl();

		// test if the checkpoint has been reached
		bool checkpointReached() const;
		bool checkpointReached(unsigned numRequired) const;

		void handle(int senderID, const SeqAddMessage& message);
		void handle(int senderID, const SeqRemoveMessage& message);
		void handle(int senderID, const SetBaseMessage& message);
		void handle(int senderID, const SetFlagMessage& message);
		void handle(int senderID, const RemoveExtensionMessage& m);
		void handle(int senderID, const SeqDataRequest& message);
		void handle(int senderID, const SeqDataResponse& message);

		/** The observer callback function. */
		typedef void (*SeqObserver)(SequenceCollectionHash* c,
				const value_type& seq);

		// Observer pattern, not implemented.
		void attach(SeqObserver) { }
		void detach(SeqObserver) { }

		/** Load this collection from disk. */
		void load(const char *path)
		{
			m_data.load(path);
		}

		/** Indicate that this is a colour-space collection. */
		void setColourSpace(bool flag)
		{
			m_data.setColourSpace(flag);
			m_comm.broadcast(flag);
		}

		iterator begin() { return m_data.begin(); }
		const_iterator begin() const { return m_data.begin(); }
		iterator end() { return m_data.end(); }
		const_iterator end() const { return m_data.end(); }

	private:
		// Observer pattern
		void notify(const V& seq);

		void loadSequences();

		std::pair<size_t, size_t> processBranchesAssembly(
				FastaWriter* fileWriter, unsigned currContigID);
		size_t processBranchesTrim();
		bool processBranchesDiscoverBubbles();

		void generateExtensionRequest(
				uint64_t groupID, uint64_t branchID, const V& seq);
		void generateExtensionRequests(uint64_t groupID,
				BranchGroup::const_iterator first,
				BranchGroup::const_iterator last);
		void processSequenceExtension(
				uint64_t groupID, uint64_t branchID, const V& seq,
				const SymbolSetPair& extRec, int multiplicity);
		void processLinearSequenceExtension(
				uint64_t groupID, uint64_t branchID, const V& seq,
				const SymbolSetPair& extRec, int multiplicity,
				unsigned maxLength);
		void processSequenceExtensionPop(
				uint64_t groupID, uint64_t branchID, const V& seq,
				const SymbolSetPair& extRec, int multiplicity,
				unsigned maxLength);

		void assembleContig(
				FastaWriter* fileWriter,
				BranchRecord& branch, unsigned id);

		// Check if a branch is redundant with a previously output
		// branch.
		bool isBranchRedundant(const BranchRecord& branch);

		void parseControlMessage(int source);

		bool isLocal(const V& seq) const;
		int computeNodeID(const V& seq) const;

		void EndState();

		// Set the state of the network assembly
		void SetState(NetworkAssemblyState newState);

		SequenceCollectionHash m_data;

		// The communications layer implements the functions over the
		// network.
		MessageBuffer m_comm;

		// The number of nodes in the network
		unsigned m_numDataNodes;

		// the state of the assembly
		NetworkAssemblyState m_state;

		// The number of processes that have sent a checkpoint reached
		// message, this is used by the control process to determine
		// the state flow.
		unsigned m_numReachedCheckpoint;

		/** The sum of the values returned by the slave nodes in their
		 * checkpoint messages.
		 */
		size_t m_checkpointSum;

		// the number of bases of adjacency set
		size_t m_numBasesAdjSet;

		// the current length to trim on (comes from the control node)
		unsigned m_trimStep;

		/** The number of low-coverage contigs removed. */
		size_t m_lowCoverageContigs;

		/** The number of low-coverage k-mer removed. */
		size_t m_lowCoverageKmer;

		/** The number of bubbles popped so far. */
		size_t m_numPopped;

		// the number of sequences assembled so far
		size_t m_numAssembled;

		// The current branches that are active
		BranchGroupMap m_activeBranchGroups;

		/** Bubbles, which are branch groups that have joined. */
		BranchGroupMap m_bubbles;

		// List of IDs of finished groups, used for sanity checking
		// during bubble popping.
		std::set<uint64_t> m_finishedGroups;

		static const size_t MAX_ACTIVE = 50;
		static const size_t LOW_ACTIVE = 10;
};

// Graph

namespace boost {

template <>
struct graph_traits<NetworkSequenceCollection>
	: graph_traits<SequenceCollectionHash>
{
}; // graph_traits<NetworkSequenceCollection>

} // namespace boost

#endif