1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
/*
* Run concurrent sends to different target processes. Stresses an
* implementation that permits concurrent sends to different targets.
*
* By turning on verbose output, some simple performance data will be output.
*
* Use nonblocking sends, and have a single thread complete all I/O.
*/
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#include "mpitest.h"
#include "mpithreadtest.h"
/* This is the main test routine */
#define MAX_CNT 660000
/*#define MAX_LOOP 200 */
#define MAX_LOOP 10
#define MAX_NTHREAD 128
static int ownerWaits = 0;
static int nthreads = -1;
MTEST_THREAD_RETURN_TYPE run_test_send(void *arg);
MTEST_THREAD_RETURN_TYPE run_test_send(void *arg)
{
int cnt, j, *buf, wsize, tag;
int thread_num = (int) (long) arg;
double t;
static MPI_Request r[MAX_NTHREAD];
/* Create the buf just once to avoid finding races in malloc instead
* of the MPI library */
buf = (int *) malloc(MAX_CNT * sizeof(int));
MTEST_VG_MEM_INIT(buf, MAX_CNT * sizeof(int));
MTestPrintfMsg(1, "buf address %p (size %d)\n", buf, MAX_CNT * sizeof(int));
MPI_Comm_size(MPI_COMM_WORLD, &wsize);
if (wsize >= MAX_NTHREAD)
wsize = MAX_NTHREAD;
/* Sanity check */
if (nthreads != wsize - 1)
fprintf(stderr, "Panic wsize = %d nthreads = %d\n", wsize, nthreads);
for (cnt = 1, tag = 1; cnt < MAX_CNT; cnt = 2 * cnt, tag++) {
/* Wait for all senders to be ready */
MTest_thread_barrier(nthreads);
t = MPI_Wtime();
for (j = 0; j < MAX_LOOP; j++) {
MTest_thread_barrier(nthreads);
MPI_Isend(buf, cnt, MPI_INT, thread_num, tag, MPI_COMM_WORLD, &r[thread_num - 1]);
if (ownerWaits) {
MPI_Wait(&r[thread_num - 1], MPI_STATUS_IGNORE);
} else {
/* Wait for all threads to start the sends */
MTest_thread_barrier(nthreads);
if (thread_num == 1) {
MPI_Waitall(wsize - 1, r, MPI_STATUSES_IGNORE);
}
}
}
t = MPI_Wtime() - t;
if (thread_num == 1)
MTestPrintfMsg(1, "buf size %d: time %f\n", cnt * sizeof(int), t / MAX_LOOP);
}
MTest_thread_barrier(nthreads);
free(buf);
return (MTEST_THREAD_RETURN_TYPE) NULL;
}
void run_test_recv(void);
void run_test_recv(void)
{
int cnt, j, *buf, tag;
MPI_Status status;
double t;
for (cnt = 1, tag = 1; cnt < MAX_CNT; cnt = 2 * cnt, tag++) {
buf = (int *) malloc(cnt * sizeof(int));
MTEST_VG_MEM_INIT(buf, cnt * sizeof(int));
t = MPI_Wtime();
for (j = 0; j < MAX_LOOP; j++)
MPI_Recv(buf, cnt, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
t = MPI_Wtime() - t;
free(buf);
}
}
int main(int argc, char **argv)
{
int i, pmode, nprocs, rank;
int errs = 0, err;
MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode);
if (pmode != MPI_THREAD_MULTIPLE) {
fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (nprocs < 2) {
fprintf(stderr, "Need at least two processes\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
if (nprocs > MAX_NTHREAD)
nprocs = MAX_NTHREAD;
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) {
err = MTest_thread_barrier_init();
if (err) {
fprintf(stderr, "Could not create thread barrier\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
nthreads = nprocs - 1;
for (i = 1; i < nprocs; i++)
MTest_Start_thread(run_test_send, (void *) (long) i);
MTest_Join_threads();
MTest_thread_barrier_free();
} else if (rank < MAX_NTHREAD) {
run_test_recv();
}
MTest_Finalize(errs);
return MTestReturnValue(errs);
}
|