File: rinside_mpi_sample4.cpp

package info (click to toggle)
r-cran-rinside 0.2.19-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 668 kB
  • sloc: cpp: 3,310; ansic: 117; xml: 57; ruby: 34; makefile: 2
file content (141 lines) | stat: -rw-r--r-- 3,331 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4;  tab-width: 8; -*-
//
// Simple mpi example: Usage of RInside with a Master-Slave Model with worker
//
// MPI C API version of file contributed by Nicholas Pezolano and Martin Morgan 
//
// Copyright (C) 2010 - 2013  Dirk Eddelbuettel
// Copyright (C)        2013  Nicholas Pezolano
// Copyright (C)        2013  Martin Morgan
//
// GPL'ed 

#include <mpi.h>
#include <RInside.h>
#include <string>
#include <vector>
#include <iostream>

#define WORKTAG 1
#define DIETAG 2

/* Local functions */
static void master(void);
static void slave(RInside &R);
static int get_next_work_item(int &work, const int size_work, std::vector<int> &data);
static void do_work(int work,int &result,RInside &R);
static void initalize(RInside &R);

int itr = 0;

int main(int argc, char **argv){
    int myrank;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    RInside R(argc, argv); 


    if (myrank == 0) {
	master();
    } else {
	initalize(R);
	slave(R);
    }

    MPI_Finalize();
    return 0;
}

static void initalize(RInside &R){
    //load the following R library on every R instance
    std::string R_libs ="suppressMessages(library(random));";
    R.parseEvalQ(R_libs); 
}

static void master(void){
    int ntasks, rank;
    std::vector<int> data;
    int work;
    int result;
    int sum;
    MPI_Status status;

    //create some test "data" to pass around
    for(int i = 0; i< 10; i++){
	data.push_back(i);
    }

    const int size_work = (int)data.size();
    
    MPI_Comm_size(MPI_COMM_WORLD, &ntasks);

    for (rank = 1; rank < ntasks; ++rank) {
	get_next_work_item(work,size_work,data);
	MPI_Send(&work,1,MPI_INT,rank, WORKTAG,MPI_COMM_WORLD);
    }

    int ret = get_next_work_item(work,size_work,data);
    while (ret == 0) {
	MPI_Recv(&result,1,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
	sum += result;
	MPI_Send(&work,1,MPI_INT,status.MPI_SOURCE,WORKTAG,MPI_COMM_WORLD);

	ret = get_next_work_item(work,size_work,data);
    }

    for (rank = 1; rank < ntasks; ++rank) {
	MPI_Recv(&result, 1, MPI_INT, MPI_ANY_SOURCE,
		 MPI_ANY_TAG, MPI_COMM_WORLD, &status);
	sum += result;
    }

    for (rank = 1; rank < ntasks; ++rank) {
	MPI_Send(0, 0, MPI_INT, rank, DIETAG, MPI_COMM_WORLD);
    }
  
    std::cout << "sum of all iterations = " << sum << std::endl;
}

static void slave(RInside &R) {
    int work;
    int result;
    MPI_Status status;

    while (1) {

	MPI_Recv(&work, 1, MPI_INT, 0, MPI_ANY_TAG,
		 MPI_COMM_WORLD, &status);

	if (status.MPI_TAG == DIETAG) {
	    return;
	}

	do_work(work,result,R);

	MPI_Send(&result, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
    }
}


static int get_next_work_item(int &work,const int size_work, std::vector<int> &data) {
    if (itr >= size_work) {
	return -1;
    }

    work = data[itr];  
    
    itr++;
    std::cout << "iteration = " << itr << std::endl;  

    return 0;
}

static void do_work(int work,int &result,RInside &R){

    //create a random number on every slave iteration
    R["work"] = work;
    std::string Rcmd = "work <- sample(1:10, 1)";
    Rcpp::NumericVector M = R.parseEval(Rcmd);
    
    result = M(0);
}