1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
// Copyright (C) 2004-2006 The Trustees of Indiana University.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Authors: Douglas Gregor
// Andrew Lumsdaine
#include <boost/optional.hpp>
#include <cassert>
#include <boost/graph/parallel/algorithm.hpp>
#include <boost/graph/parallel/process_group.hpp>
#include <functional>
#include <algorithm>
#include <boost/graph/parallel/simple_trigger.hpp>
#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif
namespace boost { namespace graph { namespace distributed {
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const Buffer& buffer, bool polling)
: process_group(process_group, attach_distributed_object()),
owner(owner),
buffer(buffer),
polling(polling)
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const Buffer& buffer, const UnaryPredicate& pred,
bool polling)
: process_group(process_group, attach_distributed_object()),
owner(owner),
buffer(buffer),
pred(pred),
polling(polling)
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const UnaryPredicate& pred, bool polling)
: process_group(process_group, attach_distributed_object()),
owner(owner),
pred(pred),
polling(polling)
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
{
typename ProcessGroup::process_id_type dest = get(owner, x);
if (outgoing_buffers)
outgoing_buffers->at(dest).push_back(x);
else if (dest == process_id(process_group))
buffer.push(x);
else
send(process_group, get(owner, x), msg_push, x);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
{
/* Processes will stay here until the buffer is nonempty or
synchronization with the other processes indicates that all local
buffers are empty (and no messages are in transit).
*/
while (buffer.empty() && !do_synchronize()) ;
return buffer.empty();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
{
empty();
return buffer.size();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
{
using boost::graph::parallel::simple_trigger;
simple_trigger(process_group, msg_push, this,
&distributed_queue::handle_push);
simple_trigger(process_group, msg_multipush, this,
&distributed_queue::handle_multipush);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_push(int /*source*/, int /*tag*/, const value_type& value,
trigger_receive_context)
{
if (pred(value)) buffer.push(value);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_multipush(int /*source*/, int /*tag*/,
const std::vector<value_type>& values,
trigger_receive_context)
{
for (std::size_t i = 0; i < values.size(); ++i)
if (pred(values[i])) buffer.push(values[i]);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
{
#ifdef PBGL_ACCOUNTING
++num_synchronizations;
#endif
using boost::parallel::all_reduce;
using std::swap;
typedef typename ProcessGroup::process_id_type process_id_type;
if (outgoing_buffers) {
// Transfer all of the push requests
process_id_type id = process_id(process_group);
process_id_type np = num_processes(process_group);
for (process_id_type dest = 0; dest < np; ++dest) {
outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
std::size_t size = outgoing.size();
if (size != 0) {
if (dest != id) {
send(process_group, dest, msg_multipush, outgoing);
} else {
for (std::size_t i = 0; i < size; ++i)
buffer.push(outgoing[i]);
}
outgoing.clear();
}
}
}
synchronize(process_group);
unsigned local_size = buffer.size();
unsigned global_size =
all_reduce(process_group, local_size, std::plus<unsigned>());
return global_size == 0;
}
} } } // end namespace boost::graph::distributed
|