File: multisend3.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (136 lines) | stat: -rw-r--r-- 4,069 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

/*
 * Run concurrent sends to different target processes. Stresses an
 * implementation that permits concurrent sends to different targets.
 *
 * By turning on verbose output, some simple performance data will be output.
 *
 * Use nonblocking sends, and have a single thread complete all I/O.
 */
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#include "mpitest.h"
#include "mpithreadtest.h"

/* This is the main test routine */
#define MAX_CNT 660000
/*#define MAX_LOOP 200 */
#define MAX_LOOP 10
#define MAX_NTHREAD 128

static int ownerWaits = 0;
static int nthreads = -1;

MTEST_THREAD_RETURN_TYPE run_test_send(void *arg);
MTEST_THREAD_RETURN_TYPE run_test_send(void *arg)
{
    int cnt, j, *buf, wsize, tag;
    int thread_num = (int) (long) arg;
    double t;
    static MPI_Request r[MAX_NTHREAD];

    /* Create the buf just once to avoid finding races in malloc instead
     * of the MPI library */
    buf = (int *) malloc(MAX_CNT * sizeof(int));
    MTEST_VG_MEM_INIT(buf, MAX_CNT * sizeof(int));
    MTestPrintfMsg(1, "buf address %p (size %d)\n", buf, MAX_CNT * sizeof(int));
    MPI_Comm_size(MPI_COMM_WORLD, &wsize);
    if (wsize >= MAX_NTHREAD)
        wsize = MAX_NTHREAD;
    /* Sanity check */
    if (nthreads != wsize - 1)
        fprintf(stderr, "Panic wsize = %d nthreads = %d\n", wsize, nthreads);

    for (cnt = 1, tag = 1; cnt < MAX_CNT; cnt = 2 * cnt, tag++) {
        /* Wait for all senders to be ready */
        MTest_thread_barrier(nthreads);

        t = MPI_Wtime();
        for (j = 0; j < MAX_LOOP; j++) {
            MTest_thread_barrier(nthreads);
            MPI_Isend(buf, cnt, MPI_INT, thread_num, tag, MPI_COMM_WORLD, &r[thread_num - 1]);
            if (ownerWaits) {
                MPI_Wait(&r[thread_num - 1], MPI_STATUS_IGNORE);
            } else {
                /* Wait for all threads to start the sends */
                MTest_thread_barrier(nthreads);
                if (thread_num == 1) {
                    MPI_Waitall(wsize - 1, r, MPI_STATUSES_IGNORE);
                }
            }
        }
        t = MPI_Wtime() - t;
        if (thread_num == 1)
            MTestPrintfMsg(1, "buf size %d: time %f\n", cnt * sizeof(int), t / MAX_LOOP);
    }
    MTest_thread_barrier(nthreads);
    free(buf);
    return (MTEST_THREAD_RETURN_TYPE) NULL;
}

void run_test_recv(void);
void run_test_recv(void)
{
    int cnt, j, *buf, tag;
    MPI_Status status;
    double t;

    for (cnt = 1, tag = 1; cnt < MAX_CNT; cnt = 2 * cnt, tag++) {
        buf = (int *) malloc(cnt * sizeof(int));
        MTEST_VG_MEM_INIT(buf, cnt * sizeof(int));
        t = MPI_Wtime();
        for (j = 0; j < MAX_LOOP; j++)
            MPI_Recv(buf, cnt, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
        t = MPI_Wtime() - t;
        free(buf);
    }
}

int main(int argc, char **argv)
{
    int i, pmode, nprocs, rank;
    int errs = 0, err;

    MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode);
    if (pmode != MPI_THREAD_MULTIPLE) {
        fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n");
        MPI_Abort(MPI_COMM_WORLD, -1);
    }

    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (nprocs < 2) {
        fprintf(stderr, "Need at least two processes\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    if (nprocs > MAX_NTHREAD)
        nprocs = MAX_NTHREAD;

    MPI_Barrier(MPI_COMM_WORLD);
    if (rank == 0) {
        err = MTest_thread_barrier_init();
        if (err) {
            fprintf(stderr, "Could not create thread barrier\n");
            MPI_Abort(MPI_COMM_WORLD, 1);
        }
        nthreads = nprocs - 1;
        for (i = 1; i < nprocs; i++)
            MTest_Start_thread(run_test_send, (void *) (long) i);

        MTest_Join_threads();
        MTest_thread_barrier_free();
    } else if (rank < MAX_NTHREAD) {
        run_test_recv();
    }

    MTest_Finalize(errs);


    return MTestReturnValue(errs);
}