1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
// Copyright (C) 2005-2006 Matthias Troyer
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// An example of a parallel Monte Carlo simulation using some nodes to produce
// data and others to aggregate the data
#include <iostream>
#include <boost/mpi.hpp>
#include <boost/random/parallel.hpp>
#include <boost/random.hpp>
#include <boost/foreach.hpp>
#include <iostream>
#include <cstdlib>
namespace mpi = boost::mpi;
enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
void calculate_samples(int sample_length)
{
int num_samples = 100;
std::vector<double> sample(sample_length);
// setup communicator by splitting
mpi::communicator world;
mpi::communicator calculate_communicator = world.split(0);
unsigned int num_calculate_ranks = calculate_communicator.size();
// the master of the accumulaion ranks is the first of them, hence
// with a rank just one after the last calculation rank
int master_accumulate_rank = num_calculate_ranks;
// the master of the calculation ranks sends the skeleton of the sample
// to the master of the accumulation ranks
if (world.rank()==0)
world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
// next we extract the content of the sample vector, to be used in sending
// the content later on
mpi::content sample_content = mpi::get_content(sample);
// now intialize the parallel random number generator
boost::lcg64 engine(
boost::random::stream_number = calculate_communicator.rank(),
boost::random::total_streams = calculate_communicator.size()
);
boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
rng(engine,boost::uniform_real<>());
for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
// calculate sample by filling the vector with random numbers
// note that std::generate will not work since it takes the generator
// by value, and boost::ref cannot be used as a generator.
// boost::ref should be fixed so that it can be used as generator
BOOST_FOREACH(double& x, sample)
x = rng();
// send sample to accumulation ranks
// Ideally we want to do this as a broadcast with an inter-communicator
// between the calculation and accumulation ranks. MPI2 should support
// this, but here we present an MPI1 compatible solution.
// send content of sample to first (master) accumulation process
world.send(master_accumulate_rank,sample_tag,sample_content);
// gather some results from all calculation ranks
double local_result = sample[0];
std::vector<double> gathered_results(calculate_communicator.size());
mpi::all_gather(calculate_communicator,local_result,gathered_results);
}
// we are done: the master tells the accumulation ranks to quit
if (world.rank()==0)
world.send(master_accumulate_rank,quit_tag);
}
void accumulate_samples()
{
std::vector<double> sample;
// setup the communicator for all accumulation ranks by splitting
mpi::communicator world;
mpi::communicator accumulate_communicator = world.split(1);
bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
// the master receives the sample skeleton
if (is_master_accumulate_rank)
world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
// and broadcasts it to all accumulation ranks
mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
// next we extract the content of the sample vector, to be used in receiving
// the content later on
mpi::content sample_content = mpi::get_content(sample);
// accumulate until quit is called
double sum=0.;
while (true) {
// the accumulation master checks whether we should quit
if (world.iprobe(0,quit_tag)) {
world.recv(0,quit_tag);
for (int i=1; i<accumulate_communicator.size();++i)
accumulate_communicator.send(i,quit_tag);
std::cout << sum << "\n";
break; // We're done
}
// the otehr accumulation ranks check whether we should quit
if (accumulate_communicator.iprobe(0,quit_tag)) {
accumulate_communicator.recv(0,quit_tag);
std::cout << sum << "\n";
break; // We're done
}
// check whether the master accumulation rank has received a sample
if (world.iprobe(mpi::any_source,sample_tag)) {
BOOST_ASSERT(is_master_accumulate_rank);
// receive the content
world.recv(mpi::any_source,sample_tag,sample_content);
// now we need to braodcast
// the problam is we do not have a non-blocking broadcast that we could
// abort if we receive a quit message from the master. We thus need to
// first tell all accumulation ranks to start a broadcast. If the sample
// is small, we could just send the sample in this message, but here we
// optimize the code for large samples, so that the overhead of these
// sends can be ignored, and we count on an optimized broadcast
// implementation with O(log N) complexity
for (int i=1; i<accumulate_communicator.size();++i)
accumulate_communicator.send(i,sample_broadcast_tag);
// now broadcast the contents of the sample to all accumulate ranks
mpi::broadcast(accumulate_communicator,sample_content,0);
// and handle the sample by summing the appropriate value
sum += sample[0];
}
// the other accumulation ranks wait for a mesage to start the broadcast
if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
BOOST_ASSERT(!is_master_accumulate_rank);
accumulate_communicator.recv(0,sample_broadcast_tag);
// receive broadcast of the sample contents
mpi::broadcast(accumulate_communicator,sample_content,0);
// and handle the sample
// and handle the sample by summing the appropriate value
sum += sample[accumulate_communicator.rank()];
}
}
}
int main(int argc, char** argv)
{
mpi::environment env(argc, argv);
mpi::communicator world;
// half of the processes generate, the others accumulate
// the sample size is just the number of accumulation ranks
if (world.rank() < world.size()/2)
calculate_samples(world.size()-world.size()/2);
else
accumulate_samples();
return 0;
}
|