File: nbrOfWorkers.R

package info (click to toggle)
r-cran-future.batchtools 0.12.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 528 kB
  • sloc: sh: 82; makefile: 2
file content (127 lines) | stat: -rw-r--r-- 3,878 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#' Gets the number of batchtools workers
#'
#' Tries to infer the total number of batchtools workers.  This is
#' done using various ad-hoc procedures based on code inspection
#' of batchtools itself.
#'
#' @param evaluator A future evaluator function.
#' If NULL (default), the current evaluator as returned
#' by [plan()] is used.
#'
#' @return A number in \eqn{[1, Inf]}.
#'
#' @importFrom future nbrOfWorkers
#' @export
#' @keywords internal
nbrOfWorkers.batchtools <- function(evaluator) {
  ## 1. Infer from 'workers' argument
  expr <- formals(evaluator)$workers
  workers <- eval(expr, enclos = baseenv())
  if (!is.null(workers)) {
    stop_if_not(length(workers) >= 1)
    if (is.numeric(workers)) return(prod(workers))
    if (is.character(workers)) return(length(workers))
    stop("Invalid data type of 'workers': ", mode(workers))
  }

  ## 2. Infer from 'cluster.functions' argument
  expr <- formals(evaluator)$cluster.functions
  cf <- eval(expr, enclos = baseenv())
  if (!is.null(cf)) {
    stop_if_not(inherits(cf, "ClusterFunctions"))

    name <- cf$name
    if (is.null(name)) name <- cf$Name

    ## Uni-process backends
    if (name %in% c("Local", "Interactive")) return(1L)

    ## Cluster backends (with a scheduler queue)
    if (name %in% c("TORQUE", "Slurm", "SGE", "OpenLava", "LSF")) {
      return(availableHpcWorkers())
    }
  }

  ## If still not known, assume a generic HPC scheduler
  availableHpcWorkers()
}

#' @export
nbrOfWorkers.batchtools_uniprocess <- function(evaluator) {
  assert_no_positional_args_but_first()
  1L
}

#' @export
nbrOfWorkers.batchtools_multicore <- function(evaluator) {
  ## 1. Infer from 'workers' argument
  expr <- formals(evaluator)$workers
  workers <- eval(expr, enclos = baseenv())
  stop_if_not(length(workers) == 1L, is.numeric(workers), !is.na(workers), is.finite(workers), workers >= 1)
  workers
}

#' @importFrom future nbrOfWorkers nbrOfFreeWorkers
#' @export
nbrOfFreeWorkers.batchtools <- function(evaluator, background = FALSE, ...) {
  ## Special case #1: Fall back to uniprocess processing
  if (inherits(evaluator, "uniprocess")) {
    return(NextMethod())
  }
  
  ## Special case #2: Infinite number of workers
  workers <- nbrOfWorkers(evaluator)
  if (is.infinite(workers)) return(workers)

  ## In all other cases, we need to figure out how many workers
  ## are running at the moment
  
  warnf("nbrOfFreeWorkers() for %s is not fully implemented. For now, it'll assume that none of the workers are occupied", setdiff(class(evaluator), c("FutureStrategy", "tweaked"))[1])
  usedWorkers <- 0L  ## Mockup for now
  
  workers <- workers - usedWorkers
  stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 0L)
  workers
}


#' @export
nbrOfFreeWorkers.batchtools_uniprocess <- function(evaluator, background = FALSE, ...) {
  assert_no_positional_args_but_first()
  if (isTRUE(background)) 0L else 1L
}



#' @export
nbrOfFreeWorkers.batchtools_multiprocess <- function(evaluator, background = FALSE, ...) {
  assert_no_positional_args_but_first()

  workers <- nbrOfWorkers(evaluator)

  ## Create a dummy future
  future <- evaluator(NULL, globals = FALSE, lazy = TRUE)
  freg <- sprintf("workers-%s", class(future)[1])
  usedWorkers <- length(FutureRegistry(freg, action = "list"))
  
  workers <- workers - usedWorkers
  stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 0L)
  workers
}



## Number of available workers in an HPC environment
##
## @return (numeric) A positive integer or `+Inf`.
availableHpcWorkers <- function() {
  name <- "future.batchtools.workers"
  value <- getOption(name, default = 100)
  if (!is.numeric(value) || length(value) != 1L ||
      is.na(value) || value < 1.0) {
    stopf("Option %s does not specify a value >= 1: %s",
          sQuote(name), paste(sQuote(value), collapse = ", "))
  }
  value <- floor(value)
  value
}