File: taskmanager.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (165 lines) | stat: -rw-r--r-- 4,773 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mpi.h>
#include "mpitest.h"

#define DEFAULT_TASKS 128
#define DEFAULT_TASK_WINDOW 2
/* #define USE_THREADS */

int comm_world_rank;
int comm_world_size;

#define CHECK_SUCCESS(status) \
{ \
    if (status != MPI_SUCCESS) { \
	fprintf(stderr, "Routine on line %d returned with error status\n", __LINE__); \
	MPI_Abort(MPI_COMM_WORLD, -1); \
    } \
}

void process_spawn(MPI_Comm * comm, int thread_id);
void process_spawn(MPI_Comm * comm, int thread_id)
{
    CHECK_SUCCESS(MPI_Comm_spawn((char *) "./taskmanager", (char **) NULL, 1, MPI_INFO_NULL, 0,
                                 MPI_COMM_WORLD, comm, NULL));
}

void process_disconnect(MPI_Comm * comm, int thread_id);
void process_disconnect(MPI_Comm * comm, int thread_id)
{
    if (comm_world_rank == 0) {
        CHECK_SUCCESS(MPI_Recv(NULL, 0, MPI_CHAR, 0, 1, *comm, MPI_STATUS_IGNORE));
        CHECK_SUCCESS(MPI_Send(NULL, 0, MPI_CHAR, 0, 1, *comm));
    }

    CHECK_SUCCESS(MPI_Comm_disconnect(comm));
}

#ifdef USE_THREADS
static void *main_thread(void *arg)
{
    MPI_Comm child_comm;
    int thread_id = *((int *) arg);

    process_spawn(&child_comm, thread_id);
    CHECK_SUCCESS(MPI_Comm_set_errhandler(child_comm, MPI_ERRORS_RETURN));
    process_disconnect(&child_comm, thread_id);

    return NULL;
}
#endif /* USE_THREADS */

int main(int argc, char *argv[])
{
    int tasks = 0, i, j;
    MPI_Comm parent;
#ifdef USE_THREADS
    int provided;
    pthread_t *threads = NULL;
#else
    MPI_Comm *child = NULL;
#endif /* USE_THREADS */
    int can_spawn, errs = 0;

#ifdef USE_THREADS
    CHECK_SUCCESS(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided));
    if (provided != MPI_THREAD_MULTIPLE) {
        fprintf(stderr, "MPI does not provide THREAD_MULTIPLE support\n");
        MPI_Abort(MPI_COMM_WORLD, -1);
    }
#else
    CHECK_SUCCESS(MPI_Init(&argc, &argv));
#endif

    errs += MTestSpawnPossible(&can_spawn);

    if (!can_spawn) {
        if (errs)
            printf(" Found %d errors\n", errs);
        else
            printf(" No Errors\n");
        fflush(stdout);
        goto fn_exit;
    }

    CHECK_SUCCESS(MPI_Comm_get_parent(&parent));

    if (parent == MPI_COMM_NULL) {      /* Parent communicator */
        if (argc == 2) {
            tasks = atoi(argv[1]);
        } else if (argc == 1) {
            tasks = DEFAULT_TASKS;
        } else {
            fprintf(stderr, "Usage: %s {number_of_tasks}\n", argv[0]);
            MPI_Abort(MPI_COMM_WORLD, -1);
        }

        CHECK_SUCCESS(MPI_Comm_rank(MPI_COMM_WORLD, &comm_world_rank));
        CHECK_SUCCESS(MPI_Comm_size(MPI_COMM_WORLD, &comm_world_size));

#ifdef USE_THREADS
        threads = (pthread_t *) malloc(tasks * sizeof(pthread_t));
        if (!threads) {
            fprintf(stderr, "Unable to allocate memory for threads\n");
            MPI_Abort(MPI_COMM_WORLD, -1);
        }
#else
        child = (MPI_Comm *) malloc(tasks * sizeof(MPI_Comm));
        if (!child) {
            fprintf(stderr, "Unable to allocate memory for child communicators\n");
            MPI_Abort(MPI_COMM_WORLD, -1);
        }
#endif /* USE_THREADS */

#ifdef USE_THREADS
        /* Create a thread for each task. Each thread will spawn a
         * child process to perform its task. */
        for (i = 0; i < tasks;) {
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                pthread_create(&threads[j], NULL, main_thread, &j);
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                pthread_join(threads[j], NULL);
            i += DEFAULT_TASK_WINDOW;
        }
#else
        /* Directly spawn a child process to perform each task */
        for (i = 0; i < tasks;) {
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                process_spawn(&child[j], -1);
            for (j = 0; j < DEFAULT_TASK_WINDOW; j++)
                process_disconnect(&child[j], -1);
            i += DEFAULT_TASK_WINDOW;
        }
#endif /* USE_THREADS */

        CHECK_SUCCESS(MPI_Barrier(MPI_COMM_WORLD));

        if (comm_world_rank == 0)
            printf(" No Errors\n");
    } else {    /* Child communicator */
        /* Do some work here and send a message to the root process in
         * the parent communicator. */
        CHECK_SUCCESS(MPI_Send(NULL, 0, MPI_CHAR, 0, 1, parent));
        CHECK_SUCCESS(MPI_Recv(NULL, 0, MPI_CHAR, 0, 1, parent, MPI_STATUS_IGNORE));
        CHECK_SUCCESS(MPI_Comm_disconnect(&parent));
    }

  fn_exit:
#ifdef USE_THREADS
    if (threads)
        free(threads);
#else
    if (child)
        free(child);
#endif
    MPI_Finalize();

    return MTestReturnValue(errs);
}