File: parrived_wait.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (247 lines) | stat: -rw-r--r-- 8,105 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include "mpi.h"
#include <stdio.h>
#include "mpitest.h"
#include "mpithreadtest.h"

/* Multithreaded version of test/mpi/part/parrived_wait.
 *
 * Each thread on the sender side prepares the partitions by using pready_range;
 * each thread on the receiver side receives the partitions by using parrived.
 *
 * How to use the test:.
 * - The send/receive partitions, total count of basic elements, and iteration of
 *   start-test|wait periods are set by input parameters.
 * - 4 threads are created on each side, can be changed by setting `NUM_THREADS`.
 *
 * Extended from Example 4.4 in the MPI 4.0 standard.*/

#define NUM_THREADS 4

/* Input parameters */
static int spart = 0, rpart = 0, iteration = 0;
static MPI_Count tot_count = 0;
/* Internal variables */
static double *buf = NULL;
static MPI_Count scount = 0, rcount = 0;
static int rank, size, thread_errs[NUM_THREADS];

typedef struct {
    int tid;
    int iter;
    MPI_Request req;
} thread_arg_t;
static thread_arg_t thread_args[NUM_THREADS];

/* Utility functions */
#define VAL(_index, _iter) (double) ((_index) + (_iter))

static void fill_send_partition(int sp, int iter)
{
    for (int i = 0; i < scount; i++) {
        int index = sp * scount + i;
        buf[index] = VAL(index, iter);
    }
}

static void reset_buf(void)
{
    memset(buf, 0, tot_count * sizeof(double));
}

static int check_recv_partition(int tid, int rp, int iter)
{
    int errs = 0;
    for (int i = 0; i < rcount; i++) {
        int index = rp * rcount + i;
        double exp_val = VAL(index, iter);
        if (buf[index] != exp_val) {
            if (errs < 10) {
                fprintf(stderr, "Rank %d tid %d expected %.1f but received %.1f "
                        "at buf[%d] (partition %d count %ld off %d), iteration %d\n",
                        rank, tid, exp_val, buf[index], index, rp, rcount, i, iter);
                fflush(stderr);
            }
            errs++;
        }
    }
    return errs;
}

/* Set partitions range for each thread by mimicking OpenMP static scheduling.*/
static void set_partitions_range(int partitions, int tid, int *lo_ptr, int *high_ptr)
{
    int partition_th = partitions / NUM_THREADS;        /* chunk size */
    int lo = tid * partition_th;
    int high = lo + partition_th - 1;

    if (high >= partitions)
        high = partitions - 1;  /* last chunk may be less than chunk size */
    if (tid == NUM_THREADS - 1)
        high += partitions % NUM_THREADS;       /* last thread need handle all remaining partitions */

    *lo_ptr = lo;
    *high_ptr = high;
}

/* Test subroutines */
static MTEST_THREAD_RETURN_TYPE run_send_test(void *arg)
{
    thread_arg_t *thread_arg = (thread_arg_t *) arg;
    int tid = thread_arg->tid;
    int iter = thread_arg->iter;
    MPI_Request req = thread_arg->req;

    int lo = 0, high = 0;
    set_partitions_range(spart, tid, &lo, &high);

    for (int i = lo; i <= high; i++)
        fill_send_partition(i, iter);
    MPI_Pready_range(lo, high, req);

    MTestPrintfMsg(1, "Rank %d tid %d sent partitions range %d:%d "
                   "with count %ld of basic elements\n", rank, tid, lo, high, scount);

    return (MTEST_THREAD_RETURN_TYPE) NULL;
}

static MTEST_THREAD_RETURN_TYPE run_recv_test(void *arg)
{
    thread_arg_t *thread_arg = (thread_arg_t *) arg;
    int tid = thread_arg->tid;
    int iter = thread_arg->iter;
    MPI_Request req = thread_arg->req;

    int lo = 0, high = 0;
    set_partitions_range(rpart, tid, &lo, &high);

    for (int i = lo; i <= high; i += 2) {
        /* mimic receive-side computation that handles two partitions differently. */
        int part1_completed = 0, part2_completed = 0;
        while (part1_completed == 0 || part2_completed == 0) {
            int flag = 0;
            MPI_Parrived(req, i, &flag);
            if (flag && part1_completed == 0) {
                part1_completed = 1;
                MTestPrintfMsg(1, "Rank %d tid %d received partition %d "
                               "with count %ld of basic elements\n", rank, tid, i, rcount);

                thread_errs[tid] += check_recv_partition(tid, i, iter);
            }

            if (i + 1 < rpart) {
                MPI_Parrived(req, i + 1, &flag);
                if (flag && part2_completed == 0) {
                    part2_completed = 1;
                    MTestPrintfMsg(1, "Rank %d tid %d received partition %d "
                                   "with count %ld of basic elements\n", rank, tid, i + 1, rcount);

                    thread_errs[tid] += check_recv_partition(tid, i + 1, iter);
                }
            } else {
                part2_completed = 1;
            }
        }
    }
    return (MTEST_THREAD_RETURN_TYPE) NULL;
}

int main(int argc, char *argv[])
{
    int errs = 0;
    int provided = -1;
    MPI_Request req = MPI_REQUEST_NULL;
    MPI_Datatype send_type;

    MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
    if (provided != MPI_THREAD_MULTIPLE) {
        fprintf(stderr, "MPI_THREAD_MULTIPLE is not supported\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (size < 2) {
        fprintf(stderr, "This test requires at least two processes\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    /* Setup input parameters and internal variables */
    MTestArgList *head = MTestArgListCreate(argc, argv);
    spart = MTestArgListGetInt_with_default(head, "spart", 8);
    rpart = MTestArgListGetInt_with_default(head, "rpart", 8);
    tot_count = MTestArgListGetLong_with_default(head, "tot_count", 64);
    iteration = MTestArgListGetInt_with_default(head, "iteration", 5);
    MTestArgListDestroy(head);

    /* Send buffer size and receive buffer size must be identical */
    scount = tot_count / spart;
    rcount = tot_count / rpart;
    if (spart * scount != rpart * rcount) {
        fprintf(stderr, "Invalid partitions (%d, %d) or tot_count (%ld):\n"
                "(tot_count / spart * spart) and (tot_count / rpart * rpart) "
                "must be identical\n", spart, rpart, tot_count);
        fflush(stderr);
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    buf = calloc(tot_count, sizeof(double));
    MPI_Type_contiguous(scount, MPI_DOUBLE, &send_type);
    MPI_Type_commit(&send_type);

    if (rank == 0) {
        MPI_Psend_init(buf, spart, 1, send_type, 1, 0, MPI_COMM_WORLD, MPI_INFO_NULL, &req);

        for (int iter = 0; iter < iteration; iter++) {
            reset_buf();
            MPI_Start(&req);

            /* Multiple threads prepare and send partitions in parallel */
            for (int tid = 0; tid < NUM_THREADS; tid++) {
                thread_args[tid].tid = tid;
                thread_args[tid].iter = iter;
                thread_args[tid].req = req;
                errs += MTest_Start_thread(run_send_test, &thread_args[tid]);
            }
            errs += MTest_Join_threads();

            MPI_Wait(&req, MPI_STATUS_IGNORE);
        }

        MPI_Request_free(&req);
    } else if (rank == 1) {
        MPI_Precv_init(buf, rpart, rcount, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_INFO_NULL, &req);

        for (int iter = 0; iter < iteration; iter++) {
            reset_buf();
            MPI_Start(&req);

            /* Multiple threads receive and process received partitions in parallel */
            for (int tid = 0; tid < NUM_THREADS; tid++) {
                thread_args[tid].tid = tid;
                thread_args[tid].iter = iter;
                thread_args[tid].req = req;
                errs += MTest_Start_thread(run_recv_test, &thread_args[tid]);
            }
            errs += MTest_Join_threads();

            MPI_Wait(&req, MPI_STATUS_IGNORE);
        }

        MPI_Request_free(&req);
    }

    MPI_Type_free(&send_type);
    free(buf);

    for (int tid = 0; tid < NUM_THREADS; tid++) {
        errs += thread_errs[tid];
    }

    MTest_Finalize(errs);
    return MTestReturnValue(errs);
}