File: testconnectserial.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (136 lines) | stat: -rw-r--r-- 4,008 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include "mpi.h"
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "mpitest.h"

#include "connectstuff.h"

int main(int argc, char **argv)
{
    MPI_Comm tmp, comm, startComm;
    char *fname;
    char *actualFname = NULL;
    char *globalFname = NULL;
    int totalSize, expectedRank, size, cachedRank;
    char portName[MPI_MAX_PORT_NAME + 1];
    int rankToAccept = 1;

    /* Debug - print out where we picked up the MPICH build from */
#ifdef MPICHLIBSTR
    msg("MPICH library taken from: %s\n", MPICHLIBSTR);
#endif

    if (argc != 4) {
        printf("Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0]);
        exit(1);
    }

    /* This is the base name of the file into which we write the port */
    fname = argv[1];
    /* This is the total number of processes launched */
    totalSize = atoi(argv[2]);
    /* Each process knows its expected rank */
    expectedRank = atoi(argv[3]) - 1;

    /* Start a watchdog thread which will abort after 120 seconds, and will
     * print stack traces using GDB every 5 seconds if you don't call
     * strokeWatchdog() */
    startWatchdog(120);

    /* Print a debug header */
    msg("Waiting for: %d - my rank is %d\n", totalSize, expectedRank);

    /* Singleton init */
    MPI_Init(0, 0);

    /* Duplicate from MPI_COMM_SELF the starting point */
    MPI_Comm_dup(MPI_COMM_SELF, &startComm);


    if (expectedRank == 0) {
        /* This process opens the port, and writes the information to the file */
        MPI_Open_port(MPI_INFO_NULL, portName);

        /* Write the port to fname.<rank> so that the connecting processes can
         * wait their turn by checking for the correct file to show up */
        actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);

        /* The wrapper script I'm using checks for the existence of "fname", so
         * create that - even though it isn't used  */
        globalFname = writePortToFile(portName, fname);
        installExitHandler(globalFname);

        comm = startComm;
    } else {
        char *readPort;
        readPort = getPortFromFile("%s.%d", fname, expectedRank);
        strncpy(portName, readPort, MPI_MAX_PORT_NAME);
        free(readPort);
        msg("Read port <%s>\n", portName);

        MPI_Comm_connect(portName, MPI_INFO_NULL, 0, startComm, &comm);
        MPI_Intercomm_merge(comm, 1, &tmp);
        comm = tmp;
        MPI_Comm_size(comm, &size);
        msg("After my first merge, size is now: %d\n", size);
    }
    while (size < totalSize) {
        /* Make sure we don't print a stack until we stall */
        strokeWatchdog();

        /* Accept the connection */
        MPI_Comm_accept(portName, MPI_INFO_NULL, 0, comm, &tmp);

        /* Merge into intracomm */
        MPI_Intercomm_merge(tmp, 0, &comm);

        /* Free the intercomm */
        MPI_Comm_free(&tmp);

        /* See where we're up to */
        MPI_Comm_rank(comm, &cachedRank);
        MPI_Comm_size(comm, &size);

        if (expectedRank == 0) {
            msg("Up to size: %d\n", size);

            /* Delete the old file, create the new one */
            unlink(actualFname);
            free(actualFname);

            /* Allow the next rank to connect */
            actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
        }
    }
    MPI_Comm_rank(comm, &cachedRank);

    msg("All done - I got rank: %d.\n", cachedRank);

    MPI_Barrier(comm);

    if (expectedRank == 0) {

        /* Cleanup on rank zero - delete some files */
        MTestSleep(4);
        unlink(actualFname);
        free(actualFname);
        unlink(globalFname);
        free(globalFname);

        /* This lets my wrapper script know that we did everything correctly */
        indicateConnectSucceeded();
    }
    MPI_Finalize();

    return 0;
}