File: MessageBuffer.cpp

package info (click to toggle)
abyss 2.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,284 kB
  • sloc: cpp: 78,182; ansic: 6,512; makefile: 2,252; perl: 672; sh: 509; haskell: 412; python: 4
file content (151 lines) | stat: -rw-r--r-- 3,830 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include "MessageBuffer.h"
#include "Common/Options.h"
#include <iostream>

using namespace std;

MessageBuffer::MessageBuffer()
	: m_msgQueues(opt::numProc)
{
	for (unsigned i = 0; i < m_msgQueues.size(); i++)
		m_msgQueues[i].reserve(MAX_MESSAGES);
}

void MessageBuffer::sendSeqAddMessage(int nodeID, const V& seq)
{
	queueMessage(nodeID, new SeqAddMessage(seq), SM_BUFFERED);
}

void MessageBuffer::sendSeqRemoveMessage(int nodeID, const V& seq)
{
	queueMessage(nodeID, new SeqRemoveMessage(seq), SM_BUFFERED);
}

// Send a set flag message
void MessageBuffer::sendSetFlagMessage(int nodeID,
		const V& seq, SeqFlag flag)
{
	queueMessage(nodeID, new SetFlagMessage(seq, flag), SM_BUFFERED);
}

// Send a remove extension message
void MessageBuffer::sendRemoveExtension(int nodeID,
		const V& seq, extDirection dir, SymbolSet ext)
{
	queueMessage(nodeID, new RemoveExtensionMessage(seq, dir, ext),
			SM_BUFFERED);
}

// Send a sequence data request
void MessageBuffer::sendSeqDataRequest(int nodeID,
		IDType group, IDType id, const V& seq)
{
	queueMessage(nodeID,
			new SeqDataRequest(seq, group, id), SM_IMMEDIATE);
}

// Send a sequence data response
void MessageBuffer::sendSeqDataResponse(int nodeID,
		IDType group, IDType id, const V& seq,
		SymbolSetPair extRec, int multiplicity)
{
	queueMessage(nodeID,
			new SeqDataResponse(seq, group, id, extRec, multiplicity),
			SM_IMMEDIATE);
}

// Send a set base message
void MessageBuffer::sendSetBaseExtension(int nodeID,
		const V& seq, extDirection dir, Symbol base)
{
	queueMessage(nodeID,
			new SetBaseMessage(seq, dir, base), SM_BUFFERED);
}

void MessageBuffer::queueMessage(
		int nodeID, Message* message, SendMode mode)
{
	if (opt::verbose >= 9)
		cout << opt::rank << " to " << nodeID << ": " << *message;
	m_msgQueues[nodeID].push_back(message);
	checkQueueForSend(nodeID, mode);
}

void MessageBuffer::checkQueueForSend(int nodeID, SendMode mode)
{
	size_t numMsgs = m_msgQueues[nodeID].size();
	// check if the messages should be sent
	if ((numMsgs == MAX_MESSAGES || mode == SM_IMMEDIATE)
			&& numMsgs > 0) {
		// Calculate the total size of the message
		size_t totalSize = 0;
		for(size_t i = 0; i < numMsgs; i++)
		{
			totalSize += m_msgQueues[nodeID][i]->getNetworkSize();
		}

		// Generate a buffer for all the messages
		char* buffer = new char[totalSize];

		// Copy the messages into the buffer
		size_t offset = 0;
		for(size_t i = 0; i < numMsgs; i++)
			offset += m_msgQueues[nodeID][i]->serialize(
					buffer + offset);

		assert(offset == totalSize);
		sendBufferedMessage(nodeID, buffer, totalSize);

		delete [] buffer;
		clearQueue(nodeID);

		m_txPackets++;
		m_txMessages += numMsgs;
		m_txBytes += totalSize;
	}
}

// Clear a queue of messages
void MessageBuffer::clearQueue(int nodeID)
{
	size_t numMsgs = m_msgQueues[nodeID].size();
	for(size_t i = 0; i < numMsgs; i++)
	{
		// Delete the messages
		delete m_msgQueues[nodeID][i];
		m_msgQueues[nodeID][i] = 0;
	}
	m_msgQueues[nodeID].clear();
}

// Flush the message buffer by sending all messages that are queued
void MessageBuffer::flush()
{
	// Send all messages in all queues
	for(size_t id = 0; id < m_msgQueues.size(); ++id)
	{
		// force the queue to send any pending messages
		checkQueueForSend(id, SM_IMMEDIATE);
	}
}

// Check if all the queues are empty
bool MessageBuffer::transmitBufferEmpty() const
{
	bool isEmpty = true;
	for (MessageQueues::const_iterator it = m_msgQueues.begin();
			it != m_msgQueues.end(); ++it) {
		if (!it->empty()) {
			cerr
				<< opt::rank << ": error: tx buffer should be empty: "
				<< it->size() << " messages from "
				<< opt::rank << " to " << it - m_msgQueues.begin()
				<< '\n';
			for (MsgBuffer::const_iterator j = it->begin();
					j != it->end(); ++j)
				cerr << **j << '\n';
			isEmpty = false;
		}
	}
	return isEmpty;
}