File: slurm_apply.R

package info (click to toggle)
r-cran-rslurm 0.6.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 428 kB
  • sloc: sh: 25; makefile: 5
file content (215 lines) | stat: -rwxr-xr-x 10,853 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#' Parallel execution of a function on the Slurm cluster
#'
#' Use \code{slurm_apply} to compute function over multiple sets of
#' parameters in parallel, spread across multiple nodes of a Slurm cluster,
#' with similar syntax to \code{mapply}.
#'
#' This function creates a temporary folder ("_rslurm_[jobname]") in the current
#' directory, holding .RData and .RDS data files, the R script to run and the Bash
#' submission script generated for the Slurm job.
#'
#' The set of input parameters is divided in equal chunks sent to each node, and
#' \code{f} is evaluated in parallel within each node using functions from the
#' \code{parallel} R package. The names of any other R objects (besides
#' \code{params}) that \code{f} needs to access should be included in
#' \code{global_objects} or passed as additional arguments through \code{...}.
#'
#' Use \code{slurm_options} to set any option recognized by \code{sbatch}, e.g.
#' \code{slurm_options = list(time = "1:00:00", share = TRUE)}.
#' See \url{http://slurm.schedmd.com/sbatch.html} for details on possible options.
#' Note that full names must be used (e.g. "time" rather than "t") and that flags
#' (such as "share") must be specified as TRUE. The "array", "job-name", "nodes", 
#' "cpus-per-task" and "output" options are already determined by 
#' \code{slurm_apply} and should not be manually set.
#'
#' When processing the computation job, the Slurm cluster will output two types
#' of files in the temporary folder: those containing the return values of the
#' function for each subset of parameters ("results_[node_id].RDS") and those
#' containing any console or error output produced by R on each node
#' ("slurm_[node_id].out").
#'
#' If \code{submit = TRUE}, the job is sent to the cluster and a confirmation
#' message (or error) is output to the console. If \code{submit = FALSE},
#' a message indicates the location of the saved data and script files; the
#' job can be submitted manually by running the shell command
#' \code{sbatch submit.sh} from that directory.
#'
#' After sending the job to the Slurm cluster, \code{slurm_apply} returns a
#' \code{slurm_job} object which can be used to cancel the job, get the job
#' status or output, and delete the temporary files associated with it. See
#' the description of the related functions for more details.
#'
#' @param f A function that accepts one or many single values as parameters and
#'   may return any type of R object.
#' @param params A data frame of parameter values to apply \code{f} to. Each
#'   column corresponds to a parameter of \code{f} (\emph{Note}: names must
#'   match) and each row corresponds to a separate function call.
#' @param ... Additional arguments to \code{f}. These arguments do not vary
#'   with each call to \code{f}.
#' @param jobname The name of the Slurm job; if \code{NA}, it is assigned a
#'   random name of the form "slr####".
#' @param nodes The (maximum) number of cluster nodes to spread the calculation
#'   over. \code{slurm_apply} automatically divides \code{params} in chunks of
#'   approximately equal size to send to each node. Less nodes are allocated if
#'   the parameter set is too small to use all CPUs on the requested nodes.
#' @param cpus_per_node The number of CPUs requested per node. This argument is
#'   mapped to the Slurm parameter \code{cpus-per-task}.
#' @param processes_per_node The number of logical CPUs to utilize per node,
#'   i.e. how many processes to run in parallel per node. This can exceed
#'   \code{cpus_per_node} for nodes which support hyperthreading. Defaults to
#'   \code{processes_per_node = cpus_per_node}.
#' @param preschedule_cores Corresponds to the \code{mc.preschedule} argument of 
#'   \code{parallel::mcmapply}. Defaults to \code{TRUE}. If \code{TRUE}, the 
#'   rows of \code{params} are assigned to cores before computation. If \code{FALSE}, 
#'   each row of \code{params} is executed by the next available core.
#'   Setting \code{FALSE} may be faster if 
#'   different values of \code{params} result in very variable completion time for
#'   jobs.
#' @param job_array_task_limit The maximum number of job array tasks to run at 
#'   the same time. Defaults to \code{NULL} (no limit).
#' @param global_objects A character vector containing the name of R objects to be
#'   saved in a .RData file and loaded on each cluster node prior to calling
#'   \code{f}.
#' @param add_objects Older deprecated name of \code{global_objects}, retained for
#'   backwards compatibility.
#' @param pkgs A character vector containing the names of packages that must
#'   be loaded on each cluster node. By default, it includes all packages
#'   loaded by the user when \code{slurm_apply} is called.
#' @param libPaths A character vector describing the location of additional R
#'   library trees to search through, or NULL. The default value of NULL
#'   corresponds to libraries returned by \code{.libPaths()} on a cluster node.
#'   Non-existent library trees are silently ignored.
#' @param rscript_path The location of the Rscript command. If not specified, 
#'   defaults to the location of Rscript within the R installation being run.
#' @param r_template The path to the template file for the R script run on each node. 
#'   If NULL, uses the default template "rslurm/templates/slurm_run_R.txt".
#' @param sh_template The path to the template file for the sbatch submission script. 
#'   If NULL, uses the default template "rslurm/templates/submit_sh.txt".
#' @param slurm_options A named list of options recognized by \code{sbatch}; see
#'   Details below for more information.
#' @param submit Whether or not to submit the job to the cluster with
#'   \code{sbatch}; see Details below for more information.
#' @return A \code{slurm_job} object containing the \code{jobname} and the
#'   number of \code{nodes} effectively used.
#' @seealso \code{\link{slurm_call}} to evaluate a single function call.
#' @seealso \code{\link{slurm_map}} to evaluate a function over a list.
#' @seealso \code{\link{cancel_slurm}}, \code{\link{cleanup_files}},
#'   \code{\link{get_slurm_out}} and \code{\link{get_job_status}}
#'   which use the output of this function.
#' @examples
#' \dontrun{
#' sjob <- slurm_apply(func, pars)
#' get_job_status(sjob) # Prints console/error output once job is completed.
#' func_result <- get_slurm_out(sjob, "table") # Loads output data into R.
#' cleanup_files(sjob)
#' }
#' @export
slurm_apply <- function(f, params, ..., jobname = NA, nodes = 2,
                        cpus_per_node = 2, processes_per_node = cpus_per_node,
                        preschedule_cores = TRUE, job_array_task_limit = NULL, global_objects = NULL,
                        add_objects = NULL, pkgs = rev(.packages()),
                        libPaths = NULL, rscript_path = NULL, 
                        r_template = NULL, sh_template = NULL, 
                        slurm_options = list(), submit = TRUE) {
    # Check inputs
    if (!is.function(f)) {
        stop("first argument to slurm_apply should be a function")
    }
    if (!is.data.frame(params)) {
        stop("second argument to slurm_apply should be a data.frame")
    }
    if (is.null(names(params)) || (!is.primitive(f) && !"..." %in% names(formals(f)) && any(!names(params) %in% names(formals(f))))) {
        stop("column names of params must match arguments of f")
    }
    if (!is.numeric(nodes) || length(nodes) != 1) {
        stop("nodes should be a single number")
    }
    if (!is.numeric(cpus_per_node) || length(cpus_per_node) != 1) {
        stop("cpus_per_node should be a single number")
    }
    
    # Check for use of deprecated argument
    if (!missing("add_objects")) {
        warning("Argument add_objects is deprecated; use global_objects instead.", .call = FALSE)
        global_objects <- add_objects
    }
    
    # Default templates
    if(is.null(r_template)) {
        r_template <- system.file("templates/slurm_run_R.txt", package = "rslurm")
    }
    if(is.null(sh_template)) {
        sh_template <- system.file("templates/submit_sh.txt", package = "rslurm")
    }

    jobname <- make_jobname(jobname)

    # Create temp folder
    tmpdir <- paste0("_rslurm_", jobname)
    dir.create(tmpdir, showWarnings = FALSE)
    
    # Unpack additional arguments
    more_args <- list(...)

    saveRDS(params, file = file.path(tmpdir, "params.RDS"))
    saveRDS(f, file = file.path(tmpdir, "f.RDS"))
    saveRDS(more_args, file = file.path(tmpdir, "more_args.RDS"))
    if (!is.null(global_objects)) {
        save(list = global_objects,
             file = file.path(tmpdir, "add_objects.RData"),
             envir = environment(f))
    }    
    
    # Get chunk size (nb. of param. sets by node)
    # Special case if less param. sets than CPUs in cluster
    if (nrow(params) < cpus_per_node * nodes) {
        nchunk <- cpus_per_node
    } else {
        nchunk <- ceiling(nrow(params) / nodes)
    }
    # Re-adjust number of nodes (only matters for small sets)
    nodes <- ceiling(nrow(params) / nchunk)

    # Create a R script to run function in parallel on each node
    template_r <- readLines(r_template)
    script_r <- whisker::whisker.render(template_r,
                    list(pkgs = pkgs,
                         add_obj = !is.null(global_objects),
                         nchunk = nchunk,
                         cpus_per_node = cpus_per_node,
                         processes_per_node = processes_per_node,
                         preschedule_cores = preschedule_cores,
                         libPaths = libPaths))
    writeLines(script_r, file.path(tmpdir, "slurm_run.R"))

    # Create submission bash script
    template_sh <- readLines(sh_template)
    slurm_options <- format_option_list(slurm_options)
    if (is.null(rscript_path)){
        rscript_path <- file.path(R.home("bin"), "Rscript")
    }
    script_sh <- whisker::whisker.render(template_sh,
                    list(max_node = nodes - 1,
                         job_array_task_limit = ifelse(is.null(job_array_task_limit), "", paste0("%", job_array_task_limit)),
                         cpus_per_node = cpus_per_node,
                         jobname = jobname,
                         flags = slurm_options$flags,
                         options = slurm_options$options,
                         rscript = rscript_path))
    writeLines(script_sh, file.path(tmpdir, "submit.sh"))

    # Submit job to Slurm if applicable
    if (submit && system('squeue', ignore.stdout = TRUE)) {
        submit <- FALSE
        cat("Cannot submit; no Slurm workload manager found\n")
    }
    if (submit) {
        jobid <- submit_slurm_job(tmpdir)
    } else {
        jobid <- NA
        cat(paste("Submission scripts output in directory", tmpdir,"\n"))
    }

    # Return 'slurm_job' object
    slurm_job(jobname, jobid, nodes)
}