1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
#include "mpi.h"
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "mpitest.h"
#include "connectstuff.h"
int main(int argc, char **argv)
{
MPI_Comm tmp, comm, startComm;
char *fname;
char *actualFname = NULL;
char *globalFname = NULL;
int totalSize, expectedRank, size, cachedRank;
char portName[MPI_MAX_PORT_NAME + 1];
int rankToAccept = 1;
/* Debug - print out where we picked up the MPICH build from */
#ifdef MPICHLIBSTR
msg("MPICH library taken from: %s\n", MPICHLIBSTR);
#endif
if (argc != 4) {
printf("Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0]);
exit(1);
}
/* This is the base name of the file into which we write the port */
fname = argv[1];
/* This is the total number of processes launched */
totalSize = atoi(argv[2]);
/* Each process knows its expected rank */
expectedRank = atoi(argv[3]) - 1;
/* Start a watchdog thread which will abort after 120 seconds, and will
* print stack traces using GDB every 5 seconds if you don't call
* strokeWatchdog() */
startWatchdog(120);
/* Print a debug header */
msg("Waiting for: %d - my rank is %d\n", totalSize, expectedRank);
/* Singleton init */
MPI_Init(0, 0);
/* Duplicate from MPI_COMM_SELF the starting point */
MPI_Comm_dup(MPI_COMM_SELF, &startComm);
if (expectedRank == 0) {
/* This process opens the port, and writes the information to the file */
MPI_Open_port(MPI_INFO_NULL, portName);
/* Write the port to fname.<rank> so that the connecting processes can
* wait their turn by checking for the correct file to show up */
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
/* The wrapper script I'm using checks for the existence of "fname", so
* create that - even though it isn't used */
globalFname = writePortToFile(portName, fname);
installExitHandler(globalFname);
comm = startComm;
} else {
char *readPort;
readPort = getPortFromFile("%s.%d", fname, expectedRank);
strncpy(portName, readPort, MPI_MAX_PORT_NAME);
free(readPort);
msg("Read port <%s>\n", portName);
MPI_Comm_connect(portName, MPI_INFO_NULL, 0, startComm, &comm);
MPI_Intercomm_merge(comm, 1, &tmp);
comm = tmp;
MPI_Comm_size(comm, &size);
msg("After my first merge, size is now: %d\n", size);
}
while (size < totalSize) {
/* Make sure we don't print a stack until we stall */
strokeWatchdog();
/* Accept the connection */
MPI_Comm_accept(portName, MPI_INFO_NULL, 0, comm, &tmp);
/* Merge into intracomm */
MPI_Intercomm_merge(tmp, 0, &comm);
/* Free the intercomm */
MPI_Comm_free(&tmp);
/* See where we're up to */
MPI_Comm_rank(comm, &cachedRank);
MPI_Comm_size(comm, &size);
if (expectedRank == 0) {
msg("Up to size: %d\n", size);
/* Delete the old file, create the new one */
unlink(actualFname);
free(actualFname);
/* Allow the next rank to connect */
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
}
}
MPI_Comm_rank(comm, &cachedRank);
msg("All done - I got rank: %d.\n", cachedRank);
MPI_Barrier(comm);
if (expectedRank == 0) {
/* Cleanup on rank zero - delete some files */
MTestSleep(4);
unlink(actualFname);
free(actualFname);
unlink(globalFname);
free(globalFname);
/* This lets my wrapper script know that we did everything correctly */
indicateConnectSucceeded();
}
MPI_Finalize();
return 0;
}
|