1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
|
// Copyright (C) 2005-2006 The Trustees of Indiana University.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Authors: Douglas Gregor
// Andrew Lumsdaine
//
// Implements redistribution of vertices for a distributed adjacency
// list. This file should not be included by users. It will be
// included by the distributed adjacency list header.
//
#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif
#include <boost/pending/container_traits.hpp>
namespace boost { namespace detail { namespace parallel {
/* This structure contains a (vertex or edge) descriptor that is being
moved from one processor to another. It contains the properties for
that descriptor (if any).
*/
template<typename Descriptor, typename DescriptorProperty>
struct redistributed_descriptor : maybe_store_property<DescriptorProperty>
{
typedef maybe_store_property<DescriptorProperty> inherited;
redistributed_descriptor() { }
redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p)
: inherited(p), descriptor(v) { }
Descriptor descriptor;
private:
friend class boost::serialization::access;
template<typename Archiver>
void serialize(Archiver& ar, unsigned int /*version*/)
{
ar & boost::serialization::base_object<inherited>(*this)
& unsafe_serialize(descriptor);
}
};
/* Predicate that returns true if the target has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct target_migrated_t
{
typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
typedef typename graph_traits<Graph>::edge_descriptor Edge;
target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
: vertex_to_processor(vertex_to_processor), g(g) { }
bool operator()(Edge e) const
{
typedef global_descriptor<Vertex> DVertex;
processor_id_type owner = get(edge_target_processor_id, g, e);
return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner;
}
private:
VertexProcessorMap vertex_to_processor;
const Graph& g;
};
template<typename VertexProcessorMap, typename Graph>
inline target_migrated_t<VertexProcessorMap, Graph>
target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
{ return target_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
/* Predicate that returns true if the source of an in-edge has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct source_migrated_t
{
typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
typedef typename graph_traits<Graph>::edge_descriptor Edge;
source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
: vertex_to_processor(vertex_to_processor), g(g) { }
bool operator()(stored_in_edge<Edge> e) const
{
return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g)))
!= e.source_processor;
}
private:
VertexProcessorMap vertex_to_processor;
const Graph& g;
};
template<typename VertexProcessorMap, typename Graph>
inline source_migrated_t<VertexProcessorMap, Graph>
source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
{ return source_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
/* Predicate that returns true if the target has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct source_or_target_migrated_t
{
typedef typename graph_traits<Graph>::edge_descriptor Edge;
source_or_target_migrated_t(VertexProcessorMap vertex_to_processor,
const Graph& g)
: vertex_to_processor(vertex_to_processor), g(g) { }
bool operator()(Edge e) const
{
return get(vertex_to_processor, source(e, g)) != source(e, g).owner
|| get(vertex_to_processor, target(e, g)) != target(e, g).owner;
}
private:
VertexProcessorMap vertex_to_processor;
const Graph& g;
};
template<typename VertexProcessorMap, typename Graph>
inline source_or_target_migrated_t<VertexProcessorMap, Graph>
source_or_target_migrated(VertexProcessorMap vertex_to_processor,
const Graph& g)
{
typedef source_or_target_migrated_t<VertexProcessorMap, Graph> result_type;
return result_type(vertex_to_processor, g);
}
} } // end of namespace detail::parallel
template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::request_in_neighbors(vertex_descriptor v,
VertexProcessorMap vertex_to_processor,
bidirectionalS)
{
BGL_FORALL_INEDGES_T(v, e, *this, graph_type)
request(vertex_to_processor, source(e, *this));
}
template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::remove_migrated_in_edges(vertex_descriptor v,
VertexProcessorMap vertex_to_processor,
bidirectionalS)
{
graph_detail::erase_if(get(vertex_in_edges, base())[v.local],
source_migrated(vertex_to_processor, base()));
}
template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::redistribute(VertexProcessorMap vertex_to_processor)
{
using boost::parallel::inplace_all_to_all;
// When we have stable descriptors, we only move those descriptors
// that actually need to be moved. Otherwise, we essentially have to
// regenerate the entire graph.
const bool has_stable_descriptors =
is_same<typename config_type::vertex_list_selector, listS>::value
|| is_same<typename config_type::vertex_list_selector, setS>::value
|| is_same<typename config_type::vertex_list_selector, multisetS>::value;
typedef detail::parallel::redistributed_descriptor<vertex_descriptor,
vertex_property_type>
redistributed_vertex;
typedef detail::parallel::redistributed_descriptor<edge_descriptor,
edge_property_type>
redistributed_edge;
vertex_iterator vi, vi_end;
edge_iterator ei, ei_end;
process_group_type pg = process_group();
// Initial synchronization makes sure that we have all of our ducks
// in a row. We don't want any outstanding add/remove messages
// coming in mid-redistribution!
synchronize(process_group_);
// We cannot cope with eviction of ghost cells
vertex_to_processor.set_max_ghost_cells(0);
process_id_type p = num_processes(pg);
// Send vertices and edges to the processor where they will
// actually reside. This requires O(|V| + |E|) communication
std::vector<std::vector<redistributed_vertex> > redistributed_vertices(p);
std::vector<std::vector<redistributed_edge> > redistributed_edges(p);
// Build the sets of relocated vertices for each process and then do
// an all-to-all transfer.
for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) {
if (!has_stable_descriptors
|| get(vertex_to_processor, *vi) != vi->owner) {
redistributed_vertices[get(vertex_to_processor, *vi)]
.push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(),
vi->local)));
}
// When our descriptors are stable, we need to determine which
// adjacent descriptors are stable to determine which edges will
// be removed.
if (has_stable_descriptors) {
BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type)
request(vertex_to_processor, target(e, *this));
request_in_neighbors(*vi, vertex_to_processor, directed_selector());
}
}
inplace_all_to_all(pg, redistributed_vertices);
// If we have stable descriptors, we need to know where our neighbor
// vertices are moving.
if (has_stable_descriptors)
synchronize(vertex_to_processor);
// Build the sets of relocated edges for each process and then do
// an all-to-all transfer.
for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) {
vertex_descriptor src = source(*ei, *this);
vertex_descriptor tgt = target(*ei, *this);
if (!has_stable_descriptors
|| get(vertex_to_processor, src) != src.owner
|| get(vertex_to_processor, tgt) != tgt.owner)
redistributed_edges[get(vertex_to_processor, source(*ei, *this))]
.push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(),
ei->local))));
}
inplace_all_to_all(pg, redistributed_edges);
// A mapping from old vertex descriptors to new vertex
// descriptors. This is an STL map partly because I'm too lazy to
// build a real property map (which is hard in the general case) but
// also because it won't try to look in the graph itself, because
// the keys are all vertex descriptors that have been invalidated.
std::map<vertex_descriptor, vertex_descriptor> old_to_new_vertex_map;
if (has_stable_descriptors) {
// Clear out all vertices and edges that will have moved. There
// are several stages to this.
// First, eliminate all outgoing edges from the (local) vertices
// that have been moved or whose targets have been moved.
BGL_FORALL_VERTICES_T(v, *this, graph_type) {
if (get(vertex_to_processor, v) != v.owner) {
clear_out_edges(v.local, base());
clear_in_edges_local(v, directed_selector());
} else {
remove_out_edge_if(v.local,
target_migrated(vertex_to_processor, base()),
base());
remove_migrated_in_edges(v, vertex_to_processor, directed_selector());
}
}
// Next, eliminate locally-stored edges that have migrated (for
// undirected graphs).
graph_detail::erase_if(local_edges_,
source_or_target_migrated(vertex_to_processor, *this));
// Eliminate vertices that have migrated
for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) {
if (get(vertex_to_processor, *vi) != vi->owner)
remove_vertex((*vi++).local, base());
else {
// Add the identity relation for vertices that have not migrated
old_to_new_vertex_map[*vi] = *vi;
++vi;
}
}
} else {
// Clear out the local graph: the entire graph is in transit
clear();
}
// Add the new vertices to the graph. When we do so, update the old
// -> new vertex mapping both locally and for the owner of the "old"
// vertex.
{
typedef std::pair<vertex_descriptor, vertex_descriptor> mapping_pair;
std::vector<std::vector<mapping_pair> > mappings(p);
for (process_id_type src = 0; src < p; ++src) {
for (typename std::vector<redistributed_vertex>::iterator vi =
redistributed_vertices[src].begin();
vi != redistributed_vertices[src].end(); ++vi) {
vertex_descriptor new_vertex =
add_vertex(vi->get_property(), *this);
old_to_new_vertex_map[vi->descriptor] = new_vertex;
mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor,
new_vertex));
}
redistributed_vertices[src].clear();
}
inplace_all_to_all(pg, mappings);
// Add the mappings we were sent into the old->new map.
for (process_id_type src = 0; src < p; ++src)
old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end());
}
// Get old->new vertex mappings for all of the vertices we need to
// know about.
// TBD: An optimization here might involve sending the
// request-response pairs without an explicit request step (for
// bidirectional and undirected graphs). However, it may not matter
// all that much given the cost of redistribution.
{
std::vector<std::vector<vertex_descriptor> > vertex_map_requests(p);
std::vector<std::vector<vertex_descriptor> > vertex_map_responses(p);
// We need to know about all of the vertices incident on edges
// that have been relocated to this processor. Tell each processor
// what each other processor needs to know.
for (process_id_type src = 0; src < p; ++src)
for (typename std::vector<redistributed_edge>::iterator ei =
redistributed_edges[src].begin();
ei != redistributed_edges[src].end(); ++ei) {
vertex_descriptor need_vertex = target(ei->descriptor, *this);
if (old_to_new_vertex_map.find(need_vertex)
== old_to_new_vertex_map.end())
{
old_to_new_vertex_map[need_vertex] = need_vertex;
vertex_map_requests[need_vertex.owner].push_back(need_vertex);
}
}
inplace_all_to_all(pg,
vertex_map_requests,
vertex_map_responses);
// Process the requests made for vertices we own. Then perform yet
// another all-to-all swap. This one matches the requests we've
// made to the responses we were given.
for (process_id_type src = 0; src < p; ++src)
for (typename std::vector<vertex_descriptor>::iterator vi =
vertex_map_responses[src].begin();
vi != vertex_map_responses[src].end(); ++vi)
*vi = old_to_new_vertex_map[*vi];
inplace_all_to_all(pg, vertex_map_responses);
// Matching the requests to the responses, update the old->new
// vertex map for all of the vertices we will need to know.
for (process_id_type src = 0; src < p; ++src) {
typedef typename std::vector<vertex_descriptor>::size_type size_type;
for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) {
old_to_new_vertex_map[vertex_map_requests[src][i]] =
vertex_map_responses[src][i];
}
}
}
// Add edges to the graph by mapping the source and target.
for (process_id_type src = 0; src < p; ++src) {
for (typename std::vector<redistributed_edge>::iterator ei =
redistributed_edges[src].begin();
ei != redistributed_edges[src].end(); ++ei) {
add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)],
old_to_new_vertex_map[target(ei->descriptor, *this)],
ei->get_property(),
*this);
}
redistributed_edges[src].clear();
}
// Be sure that edge-addition messages are received now, completing
// the graph.
synchronize(process_group_);
this->distribution().clear();
detail::parallel::maybe_initialize_vertex_indices(vertices(base()),
get(vertex_index, base()));
}
} // end namespace boost
|