File: submitJobs.R

package info (click to toggle)
r-cran-batchtools 0.9.15%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,416 kB
  • sloc: ansic: 172; sh: 156; makefile: 2
file content (355 lines) | stat: -rw-r--r-- 17,639 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#' @title Submit Jobs to the Batch Systems
#'
#' @description
#' Submits defined jobs to the batch system.
#'
#' After submitting the jobs, you can use \code{\link{waitForJobs}} to wait for the
#' termination of jobs or call \code{\link{reduceResultsList}}/\code{\link{reduceResults}}
#' to collect partial results.
#' The progress can be monitored with \code{\link{getStatus}}.
#'
#' @section Resources:
#' You can pass arbitrary resources to \code{submitJobs()} which then are available in the cluster function template.
#' Some resources' names are standardized and it is good practice to stick to the following nomenclature to avoid confusion:
#' \describe{
#'  \item{walltime:}{Upper time limit in seconds for jobs before they get killed by the scheduler. Can be passed as additional column as part of \code{ids} to set per-job resources.}
#'  \item{memory:}{Memory limit in Mb. If jobs exceed this limit, they are usually killed by the scheduler. Can be passed as additional column as part of \code{ids} to set per-job resources.}
#'  \item{ncpus:}{Number of (physical) CPUs to use on the slave. Can be passed as additional column as part of \code{ids} to set per-job resources.}
#'  \item{omp.threads:}{Number of threads to use via OpenMP. Used to set environment variable \dQuote{OMP_NUM_THREADS}. Can be passed as additional column as part of \code{ids} to set per-job resources.}
#'  \item{pp.size:}{Maximum size of the pointer protection stack, see \code{\link[base]{Memory}}.}
#'  \item{blas.threads:}{Number of threads to use for the BLAS backend. Used to set environment variables \dQuote{MKL_NUM_THREADS} and \dQuote{OPENBLAS_NUM_THREADS}. Can be passed as additional column as part of \code{ids} to set per-job resources.}
#'  \item{measure.memory:}{Enable memory measurement for jobs. Comes with a small runtime overhead.}
#'  \item{chunks.as.arrayjobs:}{Execute chunks as array jobs.}
#'  \item{pm.backend:}{Start a \pkg{parallelMap} backend on the slave.}
#'  \item{foreach.backend:}{Start a \pkg{foreach} backend on the slave.}
#'  \item{clusters:}{Resource used for Slurm to select the set of clusters to run \code{sbatch}/\code{squeue}/\code{scancel} on.}
#' }
#'
#' @section Chunking of Jobs:
#' Multiple jobs can be grouped (chunked) together to be executed sequentially on the batch system as a single batch job.
#' This is especially useful to avoid overburding the scheduler by submitting thousands of jobs simultaneously.
#' To chunk jobs together, job ids must be provided as \code{data.frame} with columns \dQuote{job.id} and \dQuote{chunk} (integer).
#' All jobs with the same chunk number will be executed sequentially inside the same batch job.
#' The utility functions \code{\link{chunk}}, \code{\link{binpack}} and \code{\link{lpt}}
#' can assist in grouping jobs.
#'
#' @section Array Jobs:
#' If your cluster supports array jobs, you can set the resource \code{chunks.as.arrayjobs} to \code{TRUE} in order
#' to execute chunks as job arrays on the cluster.
#' For each chunk of size \code{n}, \pkg{batchtools} creates a \code{\link{JobCollection}} of (possibly heterogeneous) jobs which is
#' submitted to the scheduler as a single array job with \code{n} repetitions.
#' For each repetition, the \code{JobCollection} is first read from the file system, then subsetted to the \code{i}-th job using
#' the environment variable \code{reg$cluster.functions$array.var} (depending on the cluster backend, defined automatically) and finally
#' executed.
#'
#' @section Order of Submission:
#' Jobs are submitted in the order of chunks, i.e. jobs which have chunk number
#' \code{sort(unique(ids$chunk))[1]} first, then jobs with chunk number \code{sort(unique(ids$chunk))[2]}
#' and so on. If no chunks are provided, jobs are submitted in the order of \code{ids$job.id}.
#'
#' @section Limiting the Number of Jobs:
#' If requested, \code{submitJobs} tries to limit the number of concurrent jobs of the user by waiting until jobs terminate
#' before submitting new ones.
#' This can be controlled by setting \dQuote{max.concurrent.jobs} in the configuration file (see \code{\link{Registry}})
#' or by setting the resource \dQuote{max.concurrent.jobs} to the maximum number of jobs to run simultaneously.
#' If both are set, the setting via the resource takes precedence over the setting in the configuration.
#'
#' @section Measuring Memory:
#' Setting the resource \code{measure.memory} to \code{TRUE} turns on memory measurement:
#' \code{\link[base]{gc}} is called  directly before and after the job and the difference is
#' stored in the internal database. Note that this is just a rough estimate and does
#' neither work reliably for external code like C/C++ nor in combination with threading.
#'
#' @section Inner Parallelization:
#' Inner parallelization is typically done via threading, sockets or MPI.
#' Two backends are supported to assist in setting up inner parallelization.
#'
#' The first package is \pkg{parallelMap}.
#' If you set the resource \dQuote{pm.backend} to \dQuote{multicore}, \dQuote{socket} or \dQuote{mpi},
#' \code{\link[parallelMap]{parallelStart}} is called on the slave before the first job in the chunk is started
#' and \code{\link[parallelMap]{parallelStop}} is called after the last job terminated.
#' This way, the resources for inner parallelization can be set and get automatically stored just like other computational resources.
#' The function provided by the user just has to call \code{\link[parallelMap]{parallelMap}} to start parallelization using the preconfigured backend.
#'
#' To control the number of CPUs, you have to set the resource \code{ncpus}.
#' Otherwise \code{ncpus} defaults to the number of available CPUs (as reported by (see \code{\link[parallel]{detectCores}}))
#' on the executing machine for multicore and socket mode and defaults to the return value of \code{\link[Rmpi]{mpi.universe.size}-1} for MPI.
#' Your template must be set up to handle the parallelization, e.g. request the right number of CPUs or start R with \code{mpirun}.
#' You may pass further options like \code{level} to \code{\link[parallelMap]{parallelStart}} via the named list \dQuote{pm.opts}.
#'
#' The second supported parallelization backend is \pkg{foreach}.
#' If you set the resource \dQuote{foreach.backend} to \dQuote{seq} (sequential mode), \dQuote{parallel} (\pkg{doParallel}) or
#' \dQuote{mpi} (\pkg{doMPI}), the requested \pkg{foreach} backend is automatically registered on the slave.
#' Again, the resource \code{ncpus} is used to determine the number of CPUs.
#'
#' Neither the namespace of \pkg{parallelMap} nor the namespace \pkg{foreach} are attached.
#' You have to do this manually via \code{\link[base]{library}} or let the registry load the packages for you.
#'
#' @note
#' If you a large number of jobs, disabling the progress bar (\code{options(batchtools.progress = FALSE)})
#' can significantly increase the performance of \code{submitJobs}.
#'
#' @templateVar ids.default findNotSubmitted
#' @template ids
#' @param resources [\code{named list}]\cr
#'   Computational  resources for the jobs to submit. The actual elements of this list
#'   (e.g. something like \dQuote{walltime} or \dQuote{nodes}) depend on your template file, exceptions are outlined in the section 'Resources'.
#'   Default settings for a system can be set in the configuration file by defining the named list \code{default.resources}.
#'   Note that these settings are merged by name, e.g. merging \code{list(walltime = 300)} into \code{list(walltime = 400, memory = 512)}
#'   will result in \code{list(walltime = 300, memory = 512)}.
#'   Same holds for individual job resources passed as additional column of \code{ids} (c.f. section 'Resources').
#' @param sleep [\code{function(i)} | \code{numeric(1)}]\cr
#'   Parameter to control the duration to sleep between temporary errors.
#'   You can pass an absolute numeric value in seconds or a \code{function(i)} which returns the number of seconds to sleep in the \code{i}-th
#'   iteration between temporary errors.
#'   If not provided (\code{NULL}), tries to read the value (number/function) from the configuration file (stored in \code{reg$sleep}) or defaults to
#'   a function with exponential backoff between 5 and 120 seconds.
#' @template reg
#' @return [\code{\link{data.table}}] with columns \dQuote{job.id} and \dQuote{chunk}.
#' @export
#' @examples
#' \dontshow{ batchtools:::example_push_temp(3) }
#' ### Example 1: Submit subsets of jobs
#' tmp = makeRegistry(file.dir = NA, make.default = FALSE)
#'
#' # toy function which fails if x is even and an input file does not exists
#' fun = function(x, fn) if (x %% 2 == 0 && !file.exists(fn)) stop("file not found") else x
#'
#' # define jobs via batchMap
#' fn = tempfile()
#' ids = batchMap(fun, 1:20, reg = tmp, fn = fn)
#'
#' # submit some jobs
#' ids = 1:10
#' submitJobs(ids, reg = tmp)
#' waitForJobs(ids, reg = tmp)
#' getStatus(reg = tmp)
#'
#' # create the required file and re-submit failed jobs
#' file.create(fn)
#' submitJobs(findErrors(ids, reg = tmp), reg = tmp)
#' getStatus(reg = tmp)
#'
#' # submit remaining jobs which have not yet been submitted
#' ids = findNotSubmitted(reg = tmp)
#' submitJobs(ids, reg = tmp)
#' getStatus(reg = tmp)
#'
#' # collect results
#' reduceResultsList(reg = tmp)
#'
#' ### Example 2: Using memory measurement
#' tmp = makeRegistry(file.dir = NA, make.default = FALSE)
#'
#' # Toy function which creates a large matrix and returns the column sums
#' fun = function(n, p) colMeans(matrix(runif(n*p), n, p))
#'
#' # Arguments to fun:
#' args = data.table::CJ(n = c(1e4, 1e5), p = c(10, 50)) # like expand.grid()
#' print(args)
#'
#' # Map function to create jobs
#' ids = batchMap(fun, args = args, reg = tmp)
#'
#' # Set resources: enable memory measurement
#' res = list(measure.memory = TRUE)
#'
#' # Submit jobs using the currently configured cluster functions
#' submitJobs(ids, resources = res, reg = tmp)
#'
#' # Retrive information about memory, combine with parameters
#' info = ijoin(getJobStatus(reg = tmp)[, .(job.id, mem.used)], getJobPars(reg = tmp))
#' print(unwrap(info))
#'
#' # Combine job info with results -> each job is aggregated using mean()
#' unwrap(ijoin(info, reduceResultsDataTable(fun = function(res) list(res = mean(res)), reg = tmp)))
#'
#' ### Example 3: Multicore execution on the slave
#' tmp = makeRegistry(file.dir = NA, make.default = FALSE)
#'
#' # Function which sleeps 10 seconds, i-times
#' f = function(i) {
#'   parallelMap::parallelMap(Sys.sleep, rep(10, i))
#' }
#'
#' # Create one job with parameter i=4
#' ids = batchMap(f, i = 4, reg = tmp)
#'
#' # Set resources: Use parallelMap in multicore mode with 4 CPUs
#' # batchtools internally loads the namespace of parallelMap and then
#' # calls parallelStart() before the job and parallelStop() right
#' # after the job last job in the chunk terminated.
#' res = list(pm.backend = "multicore", ncpus = 4)
#'
#' \dontrun{
#' # Submit both jobs and wait for them
#' submitJobs(resources = res, reg = tmp)
#' waitForJobs(reg = tmp)
#'
#' # If successfull, the running time should be ~10s
#' getJobTable(reg = tmp)[, .(job.id, time.running)]
#'
#' # There should also be a note in the log:
#' grepLogs(pattern = "parallelMap", reg = tmp)
#' }
submitJobs = function(ids = NULL, resources = list(), sleep = NULL, reg = getDefaultRegistry()) {
  assertRegistry(reg, writeable = TRUE, sync = TRUE)
  assertList(resources, names = "strict")
  resources = insert(reg$default.resources, resources)
  if (hasName(resources, "pm.backend"))
    assertChoice(resources$pm.backend, c("local", "multicore", "socket", "mpi"))
  if (hasName(resources, "foreach.backend"))
    assertChoice(resources$foreach.backend, c("seq", "parallel", "mpi"))
  if (hasName(resources, "pm.opts"))
    assertList(resources$pm.opts, names = "unique")
  if (hasName(resources, "ncpus"))
    assertCount(resources$ncpus, positive = TRUE)
  if (hasName(resources, "measure.memory"))
    assertFlag(resources$measure.memory)
  sleep = getSleepFunction(reg, sleep)

  ids = convertIds(reg, ids, default = .findNotSubmitted(reg = reg), keep.extra = c("chunk", batchtools$resources$per.job))
  if (nrow(ids) == 0L)
    return(noIds())

  # handle chunks
  use.chunking = hasName(ids, "chunk") && anyDuplicated(ids, by = "chunk") > 0L
  if (use.chunking) {
    ids$chunk = asInteger(ids$chunk, any.missing = FALSE)
    chunks = sort(unique(ids$chunk))
  } else {
    chunks = ids$chunk = seq_row(ids)
  }

  # check for jobs already on system
  on.sys = .findOnSystem(reg = reg, cols = c("job.id", "batch.id"))
  ids.on.sys = on.sys[ids, nomatch = 0L, on = "job.id"]
  if (nrow(ids.on.sys) > 0L)
    stopf("Some jobs are already on the system, e.g. %i", ids.on.sys[1L, ]$job.id)

  # handle max.concurrent.jobs
  max.concurrent.jobs = assertCount(resources$max.concurrent.jobs, null.ok = TRUE) %??%
    assertCount(reg$max.concurrent.jobs, null.ok = TRUE) %??% NA_integer_

  # handle chunks.as.arrayjobs
  chunks.as.arrayjobs = FALSE
  if (hasName(resources, "chunks.as.arrayjobs")) {
    assertFlag(resources$chunks.as.arrayjobs)
    if (resources$chunks.as.arrayjobs) {
      if (is.na(reg$cluster.functions$array.var)) {
        info("Ignoring resource 'chunks.as.arrayjobs', not supported by cluster functions '%s'", reg$cluster.functions$name)
      } else {
        chunks.as.arrayjobs = TRUE
      }
    }
  }

  if (!is.na(max.concurrent.jobs)) {
    if (uniqueN(on.sys, by = "batch.id") + (!chunks.as.arrayjobs) * length(chunks) + chunks.as.arrayjobs * nrow(ids) > max.concurrent.jobs) {
      "!DEBUG [submitJobs]: Limiting the number of concurrent jobs to `max.concurrent.jobs`"
    } else {
      max.concurrent.jobs = NA_integer_
    }
  }

  # handle job resources
  per.job.resources = chintersect(names(ids), batchtools$resources$per.job)
  if (length(per.job.resources) > 0L) {
    if (use.chunking)
      stopf("Combining per-job resources with chunking is not supported")
    ids$resource.id = addResources(reg, .mapply(function(...) insert(resources, list(...)), ids[, per.job.resources, with = FALSE], MoreArgs = list()))
    ids[, (per.job.resources) := NULL]
  } else {
    ids$resource.id = addResources(reg, list(resources))
  }

  info("Submitting %i jobs in %i chunks using cluster functions '%s' ...", nrow(ids), length(chunks), reg$cluster.functions$name)
  on.exit(saveRegistry(reg))

  chunk = NULL
  runHook(reg, "pre.submit")

  pb = makeProgressBar(total = length(chunks), format = ":status [:bar] :percent eta: :eta")
  pb$tick(0, tokens = list(status = "Submitting"))

  for (ch in chunks) {
    ids.chunk = ids[chunk == ch, c("job.id", "resource.id")]
    jc = makeJobCollection(ids.chunk, resources = reg$resources[ids.chunk, on = "resource.id"]$resources[[1L]], reg = reg)
    if (reg$cluster.functions$store.job.collection)
      writeRDS(jc, file = jc$uri, compress = jc$compress)

    # do we have to wait for jobs to get terminated before proceeding?
    if (!is.na(max.concurrent.jobs)) {
      # count chunks or job.id
      i = 1L
      repeat {
        n.on.sys = uniqueN(getBatchIds(reg), by = "batch.id")
        "!DEBUG [submitJobs]: Detected `n.on.sys` batch jobs on system (`max.concurrent.jobs` allowed concurrently)"

        if (n.on.sys < max.concurrent.jobs)
          break
        pb$tick(0, tokens = list(status = "Waiting   "))
        sleep(i)
        i = i + 1L
      }
    }

    # remove old result files
    fns = getResultFiles(reg, ids.chunk)
    file_remove(fns)

    i = 1L
    repeat {
      runHook(reg, "pre.submit.job")
      now = ustamp()
      submit = reg$cluster.functions$submitJob(reg = reg, jc = jc)

      if (submit$status == 0L) {
        if (!testCharacter(submit$batch.id, any.missing = FALSE, min.len = 1L)) {
          stopf("Cluster function did not return valid batch ids:\n%s", stri_flatten(capture.output(str(submit$batch.id)), "\n"))
        }
        reg$status[ids.chunk,
          c("submitted", "started", "done",   "error",       "mem.used", "resource.id",         "batch.id",      "log.file",      "job.hash") :=
          list(now,      NA_real_,  NA_real_, NA_character_, NA_real_,   ids.chunk$resource.id, submit$batch.id, submit$log.file, jc$job.hash)]
        runHook(reg, "post.submit.job")
        break
      } else if (submit$status > 0L && submit$status < 100L) {
        # temp error
        pb$tick(0, tokens = list(status = submit$msg))
        sleep(i)
        i = i + 1L
      } else if (submit$status > 100L && submit$status <= 200L) {
        # fatal error
        stopf("Fatal error occurred: %i. %s", submit$status, submit$msg)
      }
    }
    pb$tick(len = 1, tokens = list(status = "Submitting"))
  }

  Sys.sleep(reg$cluster.functions$scheduler.latency)
  runHook(reg, "post.submit")

  # return ids, registry is saved via on.exit()
  return(invisible(ids))
}

addResources = function(reg, resources) {
  ai = function(tab, col) { # auto increment by reference
    i = tab[is.na(get(col)), which = TRUE]
    if (length(i) > 0L) {
      ids = seq_along(i)
      if (length(i) < nrow(tab))
        ids = ids + max(tab[, max(col, na.rm = TRUE), with = FALSE][[1L]], na.rm = TRUE)
      tab[i, (col) := ids]
      setkeyv(tab, col)[]
    }
  }

  tab = data.table(resources = resources, resource.hash = vcapply(resources, digest))
  new.tab = unique(tab, by = "resource.hash")[!reg$resources, on = "resource.hash"]
  if (nrow(new.tab)) {
    reg$resources = rbindlist(list(reg$resources, new.tab), fill = TRUE, use.names = TRUE)
    ai(reg$resources, "resource.id")
  }
  reg$resources[tab, "resource.id", on = "resource.hash"][[1L]]
}