1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
// Copyright (C) 2005-2006 The Trustees of Indiana University.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Authors: Douglas Gregor
// Andrew Lumsdaine
#ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
#define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif
#include <boost/graph/parallel/process_group.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <vector>
#include <boost/assert.hpp>
#include <boost/optional.hpp>
#include <queue>
namespace boost { namespace graph { namespace detail {
template<typename ProcessGroup>
void do_synchronize(ProcessGroup& pg)
{
using boost::parallel::synchronize;
synchronize(pg);
}
struct remote_set_queued {};
struct remote_set_immediate {};
template<typename ProcessGroup>
class remote_set_semantics
{
BOOST_STATIC_CONSTANT
(bool,
queued = (is_convertible<
typename ProcessGroup::communication_category,
boost::parallel::bsp_process_group_tag>::value));
public:
typedef typename mpl::if_c<queued,
remote_set_queued,
remote_set_immediate>::type type;
};
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap,
typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
class remote_update_set;
/**********************************************************************
* Remote updating set that queues messages until synchronization *
**********************************************************************/
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap>
class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
remote_set_queued>
{
typedef typename property_traits<OwnerMap>::key_type Key;
typedef std::vector<std::pair<Key, Value> > Updates;
typedef typename Updates::size_type updates_size_type;
typedef typename Updates::value_type updates_pair_type;
public:
private:
typedef typename ProcessGroup::process_id_type process_id_type;
enum message_kind {
/** Message containing the number of updates that will be sent in
* a msg_updates message that will immediately follow. This
* message will contain a single value of type
* updates_size_type.
*/
msg_num_updates,
/** Contains (key, value) pairs with all of the updates from a
* particular source. The number of updates is variable, but will
* be provided in a msg_num_updates message that immediately
* preceeds this message.
*
*/
msg_updates
};
struct handle_messages
{
explicit
handle_messages(remote_update_set* self, const ProcessGroup& pg)
: self(self), update_sizes(num_processes(pg), 0) { }
void operator()(process_id_type source, int tag)
{
switch(tag) {
case msg_num_updates:
{
// Receive the # of updates
updates_size_type num_updates;
receive(self->process_group, source, tag, num_updates);
update_sizes[source] = num_updates;
}
break;
case msg_updates:
{
updates_size_type num_updates = update_sizes[source];
BOOST_ASSERT(num_updates);
// Receive the actual updates
std::vector<updates_pair_type> updates(num_updates);
receive(self->process_group, source, msg_updates, &updates[0],
num_updates);
// Send updates to derived "receive_update" member
Derived* derived = static_cast<Derived*>(self);
for (updates_size_type u = 0; u < num_updates; ++u)
derived->receive_update(source, updates[u].first, updates[u].second);
update_sizes[source] = 0;
}
break;
};
}
private:
remote_update_set* self;
std::vector<updates_size_type> update_sizes;
};
friend struct handle_messages;
protected:
remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
: process_group(pg, handle_messages(this, pg)),
updates(num_processes(pg)), owner(owner) {
}
void update(const Key& key, const Value& value)
{
if (get(owner, key) == process_id(process_group)) {
Derived* derived = static_cast<Derived*>(this);
derived->receive_update(get(owner, key), key, value);
}
else {
updates[get(owner, key)].push_back(std::make_pair(key, value));
}
}
void collect() { }
void synchronize()
{
// Emit all updates and then remove them
process_id_type num_processes = updates.size();
for (process_id_type p = 0; p < num_processes; ++p) {
if (!updates[p].empty()) {
send(process_group, p, msg_num_updates, updates[p].size());
send(process_group, p, msg_updates,
&updates[p].front(), updates[p].size());
updates[p].clear();
}
}
do_synchronize(process_group);
}
ProcessGroup process_group;
private:
std::vector<Updates> updates;
OwnerMap owner;
};
/**********************************************************************
* Remote updating set that sends messages immediately *
**********************************************************************/
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap>
class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
remote_set_immediate>
{
typedef typename property_traits<OwnerMap>::key_type Key;
typedef std::pair<Key, Value> update_pair_type;
typedef typename std::vector<update_pair_type>::size_type updates_size_type;
public:
typedef typename ProcessGroup::process_id_type process_id_type;
private:
enum message_kind {
/** Contains a (key, value) pair that will be updated. */
msg_update
};
struct handle_messages
{
explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
: self(self)
{ update_sizes.resize(num_processes(pg), 0); }
void operator()(process_id_type source, int tag)
{
// Receive the # of updates
BOOST_ASSERT(tag == msg_update);
update_pair_type update;
receive(self->process_group, source, tag, update);
// Send update to derived "receive_update" member
Derived* derived = static_cast<Derived*>(self);
derived->receive_update(source, update.first, update.second);
}
private:
std::vector<updates_size_type> update_sizes;
remote_update_set* self;
};
friend struct handle_messages;
protected:
remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
: process_group(pg, handle_messages(this, pg)), owner(owner) { }
void update(const Key& key, const Value& value)
{
if (get(owner, key) == process_id(process_group)) {
Derived* derived = static_cast<Derived*>(this);
derived->receive_update(get(owner, key), key, value);
}
else
send(process_group, get(owner, key), msg_update,
update_pair_type(key, value));
}
void collect()
{
typedef std::pair<process_id_type, int> probe_type;
handle_messages handler(this, process_group);
while (optional<probe_type> stp = probe(process_group))
if (stp->second == msg_update) handler(stp->first, stp->second);
}
void synchronize()
{
do_synchronize(process_group);
}
ProcessGroup process_group;
OwnerMap owner;
};
} } } // end namespace boost::graph::detail
#endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
|