File: makeClusterMPI.R

package info (click to toggle)
r-cran-future 1.11.1.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,380 kB
  • sloc: sh: 14; makefile: 2
file content (97 lines) | stat: -rw-r--r-- 3,656 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#' Create a Message Passing Interface (MPI) cluster of \R workers for parallel processing
#' 
#' The \code{makeClusterMPI()} function creates an MPI cluster of \R workers
#' for parallel processing.  This function utilizes 
#' \code{makeCluster(..., type = "MPI")} of the \pkg{parallel} package and
#' tweaks the cluster in an attempt to avoid
#' \code{\link[parallel:stopCluster]{stopCluster()}} from hanging [1].
#' \emph{WARNING: This function is very much in a beta version and should
#' only be used if \code{parallel::makeCluster(..., type = "MPI")} fails.}
#'
#' \emph{Creating MPI clusters requires the \bold{Rmpi} package.}
#'
#' @inheritParams makeClusterPSOCK
#'
#' @param workers The number workers (as a positive integer).
#' 
#' @param \dots Optional arguments passed to
#' \code{\link[parallel:makeCluster]{makeCluster}(workers, type = "MPI", ...)}.
#' 
#' @return An object of class \code{"FutureMPIcluster"} consisting
#' of a list of \code{"MPInode"} workers.
#'
#' @references
#' [1] R-sig-hpc thread \href{https://stat.ethz.ch/pipermail/r-sig-hpc/2017-September/002065.html}{Rmpi: mpi.close.Rslaves() 'hangs'} on 2017-09-28. \cr
#'
#' @seealso
#' \code{\link{makeClusterPSOCK}()} and
#' \code{\link[parallel:makeCluster]{parallel::makeCluster}()}.
#'
#' @importFrom parallel makeCluster
#' @export
makeClusterMPI <- function(workers, ..., autoStop = FALSE, verbose = getOption("future.debug", FALSE)) {
  if (is.numeric(workers)) {
    if (length(workers) != 1L) {
      stop("When numeric, argument 'workers' must be a single value: ", length(workers))
    }
    workers <- as.integer(workers)
    if (is.na(workers) || workers < 1L) {
      stop("Number of 'workers' must be one or greater: ", workers)
    }
  } else {
    stop("Argument 'workers' must be an integer: ", mode(workers))
  }
  if (verbose) {
    message(sprintf("Number of workers: %d", workers))
  }

  ## FIXME: Re-implement locally using below for loop
  cl <- makeCluster(workers, type = "MPI", ...)

  n <- length(cl)
  for (ii in seq_along(cl)) {
    if (verbose) message(sprintf("Updating node %d of %d ...", ii, n))

    ## Attaching session information for each worker.  This is done to assert
    ## that we have a working cluster already here.  It will also collect
    ## useful information otherwise not available, e.g. the PID.
    if (verbose) message("- collecting session information")
    cl[ii] <- add_cluster_session_info(cl[ii])
    
    if (verbose) message(sprintf("Updating node %d of %d ... DONE", ii, n))
  }

  ## AD HOC/WORKAROUND:
  ## Note, stopCluster.spawnedMPIcluster() calls Rmpi::mpi.comm.disconnect()
  ## which may stall R.  Because of this, we drop 'spawnedMPIcluster' from
  ## the class attribute to avoid calling that method.  Similarly, calling
  ## Rmpi::mpi.finalize() and Rmpi::mpi.exit() may also hang R.
  ## See also below stopCluster.FutureMPIcluster() implementation.
  ## REFERENCE: https://stackoverflow.com/a/44317647/1072091
  class(cl) <- c("FutureMPIcluster", setdiff(class(cl), "spawnedMPIcluster"))

  if (autoStop) cl <- autoStopCluster(cl)
  
  cl
} ## makeClusterMPI()


#' @export
#' @keywords internal
stopCluster.FutureMPIcluster <- function(cl) {
  NextMethod()

  if (!requireNamespace(pkg <- "Rmpi", quietly = TRUE)) return(invisible(cl))

  ## https://stat.ethz.ch/pipermail/r-sig-hpc/2017-September/002065.html
  ns <- getNamespace("Rmpi")
  if (!exists("mpi.comm.free", mode = "function", envir = ns, inherits = FALSE))
    return(invisible(cl))
    
  mpi.comm.free <- get("mpi.comm.free", mode = "function", envir = ns, inherits = FALSE)
  
  comm <- 1
  mpi.comm.free(comm)
  
  invisible(cl)
}