File: th_taskmanager.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (156 lines) | stat: -rw-r--r-- 4,877 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "mpithreadtest.h"

#define DEFAULT_TASKS 128
#define DEFAULT_TASK_WINDOW 2
#define USE_THREADS

int comm_world_rank;
int comm_world_size;
#ifdef USE_THREADS
MPI_Comm th_comms[DEFAULT_TASK_WINDOW];
#endif

#define CHECK_SUCCESS(status) \
{ \
    if (status != MPI_SUCCESS) { \
	fprintf(stderr, "Routine on line %d returned with error status\n", __LINE__); \
	MPI_Abort(MPI_COMM_WORLD, -1); \
    } \
}

void process_spawn(MPI_Comm * comm, int thread_id);
void process_spawn(MPI_Comm * comm, int thread_id)
{
    CHECK_SUCCESS(MPI_Comm_spawn((char *) "./th_taskmanager", (char **) NULL,
                                 1, MPI_INFO_NULL, 0, th_comms[thread_id], comm, NULL));
}

void process_disconnect(MPI_Comm * comm, int thread_id);
void process_disconnect(MPI_Comm * comm, int thread_id)
{
    if (comm_world_rank == 0) {
        CHECK_SUCCESS(MPI_Recv(NULL, 0, MPI_CHAR, 0, 1, *comm, MPI_STATUS_IGNORE));
        CHECK_SUCCESS(MPI_Send(NULL, 0, MPI_CHAR, 0, 1, *comm));
    }

    CHECK_SUCCESS(MPI_Comm_disconnect(comm));
}

#ifdef USE_THREADS
MTEST_THREAD_RETURN_TYPE main_thread(void *arg);
MTEST_THREAD_RETURN_TYPE main_thread(void *arg)
{
    MPI_Comm child_comm;
    int thread_id = (int) (size_t) arg;

    process_spawn(&child_comm, thread_id);
    CHECK_SUCCESS(MPI_Comm_set_errhandler(child_comm, MPI_ERRORS_RETURN));
    process_disconnect(&child_comm, thread_id);

    return MTEST_THREAD_RETVAL_IGN;
}
#endif /* USE_THREADS */

int main(int argc, char *argv[])
{
    int tasks = 0, provided, i, j;
    MPI_Comm parent;
#ifndef USE_THREADS
    MPI_Comm *child;
#endif /* USE_THREADS */

#ifdef USE_THREADS
    CHECK_SUCCESS(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided));
    if (provided != MPI_THREAD_MULTIPLE) {
        fprintf(stderr, "MPI does not provide THREAD_MULTIPLE support\n");
        MPI_Abort(MPI_COMM_WORLD, -1);
    }
#else
    CHECK_SUCCESS(MPI_Init(&argc, &argv));
#endif

    CHECK_SUCCESS(MPI_Comm_get_parent(&parent));

    if (parent == MPI_COMM_NULL) {      /* Parent communicator */
        if (argc == 2) {
            tasks = atoi(argv[1]);
        } else if (argc == 1) {
            tasks = DEFAULT_TASKS;
        } else {
            fprintf(stderr, "Usage: %s {number_of_tasks}\n", argv[0]);
            MPI_Abort(MPI_COMM_WORLD, -1);
        }

        CHECK_SUCCESS(MPI_Comm_rank(MPI_COMM_WORLD, &comm_world_rank));
        CHECK_SUCCESS(MPI_Comm_size(MPI_COMM_WORLD, &comm_world_size));

#ifndef USE_THREADS
        child = (MPI_Comm *) malloc(tasks * sizeof(MPI_Comm));
        if (!child) {
            fprintf(stderr, "Unable to allocate memory for child communicators\n");
            MPI_Abort(MPI_COMM_WORLD, -1);
        }
#endif /* USE_THREADS */

#ifdef USE_THREADS
        /* Initialize the thread package. It should not be initialized via
         * MTest_Init_thread since MTest_Finalize cannot be called. */
        MTest_init_thread_pkg();

        /* We need different communicators per thread */
        for (i = 0; i < DEFAULT_TASK_WINDOW; i++)
            CHECK_SUCCESS(MPI_Comm_dup(MPI_COMM_WORLD, &th_comms[i]));

        /* Create a thread for each task. Each thread will spawn a
         * child process to perform its task. */
        for (i = 0; i < tasks;) {
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++) {
                MTest_Start_thread(main_thread, (void *) (size_t) j);
            }
            MTest_Join_threads();
            i += DEFAULT_TASK_WINDOW;
        }

        for (i = 0; i < DEFAULT_TASK_WINDOW; i++)
            CHECK_SUCCESS(MPI_Comm_free(&th_comms[i]));

        /* Release the thread package. */
        MTest_finalize_thread_pkg();
#else
        /* Directly spawn a child process to perform each task */
        for (i = 0; i < tasks;) {
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                process_spawn(&child[j], -1);
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                process_disconnect(&child[j], -1);
            i += DEFAULT_TASK_WINDOW;
        }
#endif /* USE_THREADS */

        CHECK_SUCCESS(MPI_Barrier(MPI_COMM_WORLD));

        if (comm_world_rank == 0)
            printf(" No Errors\n");
    } else {    /* Child communicator */
        /* Do some work here and send a message to the root process in
         * the parent communicator. */
        CHECK_SUCCESS(MPI_Send(NULL, 0, MPI_CHAR, 0, 1, parent));
        CHECK_SUCCESS(MPI_Recv(NULL, 0, MPI_CHAR, 0, 1, parent, MPI_STATUS_IGNORE));
        CHECK_SUCCESS(MPI_Comm_disconnect(&parent));
    }

  fn_exit:
    /* Do not call MTest_Finalize (and thus MTest_Init) to avoid printing extra
     * "No Errors" as many as spawned MPI processes. */
    MPI_Finalize();

    return 0;
}