File: probe_unexp.c

package info (click to toggle)
mpich 4.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 101,184 kB
  • sloc: ansic: 1,040,629; cpp: 82,270; javascript: 40,763; perl: 27,933; python: 16,041; sh: 14,676; xml: 14,418; f90: 12,916; makefile: 9,270; fortran: 8,046; java: 4,635; asm: 324; ruby: 103; awk: 27; lisp: 19; php: 8; sed: 4
file content (151 lines) | stat: -rw-r--r-- 6,512 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include <stdio.h>
#include "mpi.h"
#include "mpitest.h"

#define MAX_BUF_SIZE_LG 22
#define NUM_MSGS_PER_BUF_SIZE 5
char buf[1 << MAX_BUF_SIZE_LG];

/*
 * This program verifies that MPI_Probe() is operating properly in the face of
 * unexpected messages arriving after MPI_Probe() has
 * been called.  This program may hang if MPI_Probe() does not return when the
 * message finally arrives (see req #375).
 */

int main(int argc, char **argv)
{
    MPI_Comm comm;
    int p_size;
    int p_rank;
    int msg_size_lg;
    int errs = 0;
    int mpi_errno;

    MTest_Init(&argc, &argv);

    while (MTestGetIntracommGeneral(&comm, 2, 1)) {
        if (comm == MPI_COMM_NULL) {
            continue;
        }

        MPI_Comm_size(comm, &p_size);
        MPI_Comm_rank(comm, &p_rank);
        /* To improve reporting of problems about operations, we
         * change the error handler to errors return */
        MPI_Comm_set_errhandler(comm, MPI_ERRORS_RETURN);


        for (msg_size_lg = 0; msg_size_lg <= MAX_BUF_SIZE_LG; msg_size_lg++) {
            const int msg_size = 1 << msg_size_lg;
            int msg_cnt;

            MTestPrintfMsg(2, "testing messages of size %d\n", msg_size);
            for (msg_cnt = 0; msg_cnt < NUM_MSGS_PER_BUF_SIZE; msg_cnt++) {
                MPI_Status status;
                const int tag = msg_size_lg * NUM_MSGS_PER_BUF_SIZE + msg_cnt;

                MTestPrintfMsg(2, "Message count %d\n", msg_cnt);
                if (p_rank == 0) {
                    int p;

                    for (p = 1; p < p_size; p++) {
                        /* Wait for synchronization message */
                        mpi_errno = MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, tag, comm, &status);
                        if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                            MTestPrintError(mpi_errno);
                        }

                        if (status.MPI_TAG != tag && errs++ < 10) {
                            printf
                                ("ERROR: unexpected message tag from MPI_Recv(): lp=0, rp=%d, expected=%d, actual=%d, count=%d\n",
                                 status.MPI_SOURCE, status.MPI_TAG, tag, msg_cnt);
                        }
#		    if defined(VERBOSE)
                        {
                            printf("sending message: p=%d s=%d c=%d\n",
                                   status.MPI_SOURCE, msg_size, msg_cnt);
                        }
#		    endif

                        /* Send unexpected message which hopefully MPI_Probe() is
                         * already waiting for at the remote process */
                        mpi_errno = MPI_Send(buf, msg_size, MPI_BYTE,
                                             status.MPI_SOURCE, status.MPI_TAG, comm);
                        if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                            MTestPrintError(mpi_errno);
                        }
                    }
                } else {
                    int incoming_msg_size;

                    /* Send synchronization message */
                    mpi_errno = MPI_Send(NULL, 0, MPI_BYTE, 0, tag, comm);
                    if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                        MTestPrintError(mpi_errno);
                    }

                    /* Perform probe, hopefully before the main process can
                     * send its reply */
                    mpi_errno = MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
                    if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                        MTestPrintError(mpi_errno);
                    }
                    mpi_errno = MPI_Get_count(&status, MPI_BYTE, &incoming_msg_size);
                    if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                        MTestPrintError(mpi_errno);
                    }
                    if (status.MPI_SOURCE != 0 && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message source from MPI_Probe(): p=%d, expected=0, actual=%d, count=%d\n",
                             p_rank, status.MPI_SOURCE, msg_cnt);
                    }
                    if (status.MPI_TAG != tag && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message tag from MPI_Probe(): p=%d, expected=%d, actual=%d, count=%d\n",
                             p_rank, tag, status.MPI_TAG, msg_cnt);
                    }
                    if (incoming_msg_size != msg_size && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message size from MPI_Probe(): p=%d, expected=%d, actual=%d, count=%d\n",
                             p_rank, msg_size, incoming_msg_size, msg_cnt);
                    }

                    /* Receive the probed message from the main process */
                    mpi_errno = MPI_Recv(buf, msg_size, MPI_BYTE, 0, tag, comm, &status);
                    if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                        MTestPrintError(mpi_errno);
                    }
                    mpi_errno = MPI_Get_count(&status, MPI_BYTE, &incoming_msg_size);
                    if (mpi_errno != MPI_SUCCESS && errs++ < 10) {
                        MTestPrintError(mpi_errno);
                    }
                    if (status.MPI_SOURCE != 0 && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message source from MPI_Recv(): p=%d, expected=0, actual=%d, count=%d\n",
                             p_rank, status.MPI_SOURCE, msg_cnt);
                    }
                    if (status.MPI_TAG != tag && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message tag from MPI_Recv(): p=%d, expected=%d, actual=%d, count=%d\n",
                             p_rank, tag, status.MPI_TAG, msg_cnt);
                    }
                    if (incoming_msg_size != msg_size && errs++ < 10) {
                        printf
                            ("ERROR: unexpected message size from MPI_Recv(): p=%d, expected=%d, actual=%d, count=%d\n",
                             p_rank, msg_size, incoming_msg_size, msg_cnt);
                    }
                }
            }
        }
        MTestFreeComm(&comm);
    }

    MTest_Finalize(errs);
    return MTestReturnValue(errs);
}