1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
|
/*
* Ray -- Parallel genome assemblies for parallel DNA sequencing
* Copyright (C) 2013 Sébastien Boisvert
*
* http://DeNovoAssembler.SourceForge.Net/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You have received a copy of the GNU General Public License
* along with this program (gpl-3.0.txt).
* see <http://www.gnu.org/licenses/>
*/
// this code is developped with defensive programming. (assertions)
// -Sébastien Boisvert
// #define CONFIG_DEBUG_GOSSIP_ASSET_MANAGER "Yes !!!"
#include "GossipAssetManager.h"
#include <code/Mock/common_functions.h>
#include <iostream>
using namespace std;
void GossipAssetManager::addGossip(GraphSearchResult & gossip) {
//------------------------------------------------------------//
// step add the asset and get a handle for it (an index)
m_gossips.push_back(gossip);
string key = gossip.toString();
#ifdef CONFIG_ASSERT
assert(m_gossipIndex.count(key) == 0);
#endif
int index = m_gossips.size() - 1;
m_gossipIndex[key] = index;
classifyGossip(gossip);
}
void GossipAssetManager::classifyGossip(GraphSearchResult & gossip) {
string key = gossip.toString();
int index = m_gossipIndex[key];
bool mergedTwoClusters = false;
//------------------------------------------------------------//
// give a better name to the index for the rest of this method.
int & newGossipIndex = index;
PathHandle & path1 = gossip.getPathHandles()[0];
PathHandle & path2 = gossip.getPathHandles()[1];
//------------------------------------------------------------//
// Now, we add the gossip to an existing cluster or to a new one.
//
// step 3: try to merge the newCluster to existing clusters
// step 3.1 -- path1 and path2 in the gossip in newCluster have 0 match
if(m_pathToClusterTable.count(path1) == 0 && m_pathToClusterTable.count(path2) == 0) {
set<int> newCluster;
newCluster.insert(index);
m_gossipClusters.push_back(newCluster);
int clusterIndex = m_gossipClusters.size() - 1;
m_pathToClusterTable[path1] = clusterIndex;
m_pathToClusterTable[path2] = clusterIndex;
// step 3.2 -- path1 in the gossip newCluster has 1 match, but path2 in in the gossip in newCluster have 0 match
} else if(m_pathToClusterTable.count(path1) > 0 && m_pathToClusterTable.count(path2) == 0) {
// add the gossip in cluster 1 and also
// propagate the forward index for path2 -> oldCluster
int clusterIndex = m_pathToClusterTable[path1];
set<int> & clusterContent = m_gossipClusters[clusterIndex];
clusterContent.insert(newGossipIndex);
m_pathToClusterTable[path2] = clusterIndex;
// step 3.3 path1 has 0 matches but path2 has 1 match
} else if(m_pathToClusterTable.count(path1) == 0 && m_pathToClusterTable.count(path2) > 0) {
int clusterIndex = m_pathToClusterTable[path2];
set<int> & clusterContent = m_gossipClusters[clusterIndex];
clusterContent.insert(newGossipIndex);
m_pathToClusterTable[path1] = clusterIndex;
// case 3.4 both path have match in the same cluster
} else if(m_pathToClusterTable.count(path1) > 0
&& m_pathToClusterTable.count(path2) > 0
&& m_pathToClusterTable[path1] == m_pathToClusterTable[path2]) {
int clusterIndexForPath1 = m_pathToClusterTable[path1];
// it is the same cluster. (?)
// check if clusterIndexForPath1 and clusterIndexForPath2 are the
// same
#ifdef CONFIG_ASSERT
int clusterIndexForPath2 = m_pathToClusterTable[path2];
assert(clusterIndexForPath1 == clusterIndexForPath2);
#endif // CONFIG_ASSERT
int clusterIndex = clusterIndexForPath1;
set<int> & clusterContent = m_gossipClusters[clusterIndex];
clusterContent.insert(newGossipIndex);
// no update are necessary for the m_pathToClusterTable index
// step 3.5: path1 has 1 match and path2 has 1 match
// this bridges two existing clusters, how exciting !!!
} else if(m_pathToClusterTable.count(path1) > 0
&& m_pathToClusterTable.count(path2) > 0) {
int clusterIndexForPath1 = m_pathToClusterTable[path1];
int clusterIndexForPath2 = m_pathToClusterTable[path2];
#ifdef CONFIG_ASSERT
assert(clusterIndexForPath1 != clusterIndexForPath2);
#endif // CONFIG_ASSERT
// TODO optimization: flip group1 and group2 if group2 is smaller than
// group1
mergedTwoClusters = true;
// add everything in the cluster of path1
set<int> & clusterContentForPath1 = m_gossipClusters[clusterIndexForPath1];
set<int> & clusterContentForPath2 = m_gossipClusters[clusterIndexForPath2];
for(set<int>::iterator i = clusterContentForPath2.begin() ;
i != clusterContentForPath2.end() ; ++i) {
int otherGossipIndex = *i;
GraphSearchResult & otherGossip = m_gossips[otherGossipIndex];
#ifdef CONFIG_ASSERT
if(clusterContentForPath1.count(otherGossipIndex) > 0) {
cout << "Error: otherGossipIndex is in clusterContentForPath1 and clusterContentForPath2 ! ";
cout << " gossip: ";
cout << otherGossipIndex << " ";
otherGossip.print();
cout << " path1 " << path1;
cout << " path2 " << path2;
cout << " clusterIndexForPath1 " << clusterIndexForPath1;
cout << " clusterIndexForPath2 " << clusterIndexForPath2;
cout << " clusterContentForPath1 ";
for(set<int>::iterator j = clusterContentForPath1.begin();
j != clusterContentForPath1.end() ; ++j) {
cout << " " << *j;
}
cout << " clusterContentForPath2 ";
for(set<int>::iterator j = clusterContentForPath2.begin();
j != clusterContentForPath2.end() ; ++j) {
cout << " " << *j;
}
cout << endl;
}
assert(clusterContentForPath1.count(otherGossipIndex) == 0);
#endif
PathHandle & otherPath1 = otherGossip.getPathHandles()[0];
PathHandle & otherPath2 = otherGossip.getPathHandles()[1];
// some assertions before changing things.
#ifdef CONFIG_ASSERT
assert(m_pathToClusterTable.count(otherPath1) > 0);
assert(m_pathToClusterTable.count(otherPath2) > 0);
// These 2 assertions are overkill and invalid
// because otherPath1 (or otherPath2) can appear in more than
// 1 gossip !!! LOL XD XD
//assert(m_pathToClusterTable[otherPath1] == clusterIndexForPath1);
//assert(m_pathToClusterTable[otherPath2] == clusterIndexForPath2);
#endif // CONFIG_ASSERT
clusterContentForPath1.insert(otherGossipIndex);
// update the index buckets
m_pathToClusterTable[otherPath1] = clusterIndexForPath1;
m_pathToClusterTable[otherPath2] = clusterIndexForPath1;
// some assertions *after* changing things.
#ifdef CONFIG_ASSERT
assert(m_pathToClusterTable.count(otherPath1) > 0);
assert(m_pathToClusterTable.count(otherPath2) > 0);
assert(m_pathToClusterTable[otherPath1] == clusterIndexForPath1);
assert(m_pathToClusterTable[otherPath2] == clusterIndexForPath1);
#endif // CONFIG_ASSERT
}
// at this point, the cluster for path1 contains everything that was in the old cluster
// for path1 and everything that was in the old cluster for path2
clusterContentForPath2.clear();
clusterContentForPath1.insert(newGossipIndex);
// we don't need to update m_pathToClusterTable because path1 and path2 are already
// indexed.
}
//------------------------------------------------------------//
// step 2. mark the gossip for future transportation
// get the cluster that contains gossip
#ifdef CONFIG_ASSERT
assert(m_pathToClusterTable.count(path1) > 0);
assert(m_pathToClusterTable.count(path2) > 0);
#endif
int finalCluster1 = m_pathToClusterTable[path1];
#ifdef CONFIG_ASSERT
int finalCluster2 = m_pathToClusterTable[path2];
assert(finalCluster1 == finalCluster2);
#endif
set<int> & finalClusterContent = m_gossipClusters[finalCluster1];
#ifdef CONFIG_ASSERT
assert(finalClusterContent.count(newGossipIndex) > 0);
#endif
// gather all destination for this cluster
set<Rank> destinations;
for(set<int>::iterator i = finalClusterContent.begin() ;
i != finalClusterContent.end() ; ++i) {
int theGossipIndex = *i;
GraphSearchResult & entry = m_gossips[theGossipIndex];
Rank rank1 = getRankFromPathUniqueId(entry.getPathHandles()[0]);
Rank rank2 = getRankFromPathUniqueId(entry.getPathHandles()[1]);
destinations.insert(rank1);
destinations.insert(rank2);
}
// here, we have a list of destinations to which we must send the gossip
// if mergedTwoClusters is false, we only need to send gossip to the destinations
//
// if mergedTwoClusters is true, however, we need to send every gossip in the cluster to
// every destination because a merge event occured.
set<int> dummySet;
dummySet.insert(newGossipIndex);
set<int> * gossipsToScheduleForTransportation = & dummySet;
if(mergedTwoClusters)
gossipsToScheduleForTransportation = & finalClusterContent;
int scheduledOperations = 0;
for(set<int>::iterator gossipIndexIterator = gossipsToScheduleForTransportation->begin();
gossipIndexIterator != gossipsToScheduleForTransportation->end();
++gossipIndexIterator) {
int gossipIndexToSchedule = *gossipIndexIterator;
for(set<Rank>::iterator destinationIterator = destinations.begin() ;
destinationIterator != destinations.end();
++destinationIterator) {
Rank destination = * destinationIterator;
if(scheduleTransportation(gossipIndexToSchedule, destination))
scheduledOperations ++;
}
}
#ifdef CONFIG_DEBUG_GOSSIP_ASSET_MANAGER
cout << "DEBUG GossipAssetManager::classifyGossip classification of gossip ";
cout << newGossipIndex << " triggered " << scheduledOperations << " scheduling";
cout << " operations" << endl;
#endif
}
bool GossipAssetManager::scheduleGossipTransportation(GraphSearchResult & gossip, Rank & destination) {
string key = gossip.toString();
#ifdef CONFIG_ASSERT
assert(m_gossipIndex.count(key) > 0);
#endif
int index = m_gossipIndex[key];
return scheduleTransportation(index, destination);
}
bool GossipAssetManager::scheduleTransportation(int gossipIndex, Rank destination) {
// the gossip is already on this destination
if(m_remoteGossipOwners.count(gossipIndex) > 0 && m_remoteGossipOwners[gossipIndex].count(destination) > 0)
return false;
m_futureRemoteGossipOwners[gossipIndex].insert(destination);
#ifdef CONFIG_DEBUG_GOSSIP_ASSET_MANAGER
cout << "DEBUG /GossipAssetManager::scheduleTransportation scheduled gossip " << gossipIndex;
cout << " for immediate delivery to endpoint " << destination << endl;
#endif
return true;
}
bool GossipAssetManager::hasGossipToShare() const {
return m_futureRemoteGossipOwners.size() > 0;
}
void GossipAssetManager::getGossipToShare(GraphSearchResult & gossip, Rank & destination) {
#ifdef CONFIG_ASSERT
assert(hasGossipToShare());
#endif
// return the first gossip to share
// and its first corresponding destination
map<int, set<Rank> >::iterator gossipIterator = m_futureRemoteGossipOwners.begin();
set<Rank> & destinations = gossipIterator->second;
const int & gossipIndex = gossipIterator->first;
#ifdef CONFIG_ASSERT
assert(destinations.size() > 0);
#endif
set<Rank>::iterator destinationIterator = destinations.begin();
const Rank & resultingDestination = *destinationIterator;
GraphSearchResult & resultingGossip = m_gossips[gossipIndex];
// give the objects to the caller
gossip = resultingGossip;
destination = resultingDestination;
}
vector<GraphSearchResult> & GossipAssetManager::getGossips() {
return m_gossips;
}
bool GossipAssetManager::hasGossip(const GraphSearchResult & gossip) const {
string key = gossip.toString();
return m_gossipIndex.count(key) > 0;
}
void GossipAssetManager::registerRemoteGossip(GraphSearchResult & gossip, Rank & destination) {
string key = gossip.toString();
#ifdef CONFIG_ASSERT
assert(m_gossipIndex.count(key) > 0);
#endif
int index = m_gossipIndex[key];
#if 0
// check that the asset is registered to be sent to this remote
// Actually, we may want to register remote copies even when there is no scheduled
// operations.
assert(m_futureRemoteGossipOwners.count(index) > 0 && m_futureRemoteGossipOwners[index].count(destination) > 0);
#endif
// remove any scheduled operation for this gossip and this destination
if(m_futureRemoteGossipOwners.count(index) > 0
&& m_futureRemoteGossipOwners[index].count(destination) > 0) {
// at this point, everything looks OK.
m_futureRemoteGossipOwners[index].erase(destination);
if(m_futureRemoteGossipOwners[index].size() == 0) {
m_futureRemoteGossipOwners.erase(index);
}
}
/// this assertion is not valid
#if 0
#ifdef CONFIG_ASSERT
// the asset must not be already registered...
assert(m_remoteGossipOwners.count(index) == 0 || m_remoteGossipOwners[index].count(destination) == 0);
#endif
#endif
m_remoteGossipOwners[index].insert(destination);
#if CONFIG_DEBUG_GOSSIP_ASSET_MANAGER
cout << "DEBUG GossipAssetManager::registerRemoteGossip" << index << " was successfully";
cout << " delivered to endpoint " << destination << endl;
#endif
}
|