1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/submitJobs.R
\name{submitJobs}
\alias{submitJobs}
\title{Submit Jobs to the Batch Systems}
\usage{
submitJobs(
ids = NULL,
resources = list(),
sleep = NULL,
reg = getDefaultRegistry()
)
}
\arguments{
\item{ids}{[\code{\link[base]{data.frame}} or \code{integer}]\cr
A \code{\link[base]{data.frame}} (or \code{\link[data.table]{data.table}})
with a column named \dQuote{job.id}.
Alternatively, you may also pass a vector of integerish job ids.
If not set, defaults to the return value of \code{\link{findNotSubmitted}}.
Invalid ids are ignored.}
\item{resources}{[\code{named list}]\cr
Computational resources for the jobs to submit. The actual elements of this list
(e.g. something like \dQuote{walltime} or \dQuote{nodes}) depend on your template file, exceptions are outlined in the section 'Resources'.
Default settings for a system can be set in the configuration file by defining the named list \code{default.resources}.
Note that these settings are merged by name, e.g. merging \code{list(walltime = 300)} into \code{list(walltime = 400, memory = 512)}
will result in \code{list(walltime = 300, memory = 512)}.
Same holds for individual job resources passed as additional column of \code{ids} (c.f. section 'Resources').}
\item{sleep}{[\code{function(i)} | \code{numeric(1)}]\cr
Parameter to control the duration to sleep between temporary errors.
You can pass an absolute numeric value in seconds or a \code{function(i)} which returns the number of seconds to sleep in the \code{i}-th
iteration between temporary errors.
If not provided (\code{NULL}), tries to read the value (number/function) from the configuration file (stored in \code{reg$sleep}) or defaults to
a function with exponential backoff between 5 and 120 seconds.}
\item{reg}{[\code{\link{Registry}}]\cr
Registry. If not explicitly passed, uses the default registry (see \code{\link{setDefaultRegistry}}).}
}
\value{
[\code{\link{data.table}}] with columns \dQuote{job.id} and \dQuote{chunk}.
}
\description{
Submits defined jobs to the batch system.
After submitting the jobs, you can use \code{\link{waitForJobs}} to wait for the
termination of jobs or call \code{\link{reduceResultsList}}/\code{\link{reduceResults}}
to collect partial results.
The progress can be monitored with \code{\link{getStatus}}.
}
\note{
If you a large number of jobs, disabling the progress bar (\code{options(batchtools.progress = FALSE)})
can significantly increase the performance of \code{submitJobs}.
}
\section{Resources}{
You can pass arbitrary resources to \code{submitJobs()} which then are available in the cluster function template.
Some resources' names are standardized and it is good practice to stick to the following nomenclature to avoid confusion:
\describe{
\item{walltime:}{Upper time limit in seconds for jobs before they get killed by the scheduler. Can be passed as additional column as part of \code{ids} to set per-job resources.}
\item{memory:}{Memory limit in Mb. If jobs exceed this limit, they are usually killed by the scheduler. Can be passed as additional column as part of \code{ids} to set per-job resources.}
\item{ncpus:}{Number of (physical) CPUs to use on the slave. Can be passed as additional column as part of \code{ids} to set per-job resources.}
\item{omp.threads:}{Number of threads to use via OpenMP. Used to set environment variable \dQuote{OMP_NUM_THREADS}. Can be passed as additional column as part of \code{ids} to set per-job resources.}
\item{pp.size:}{Maximum size of the pointer protection stack, see \code{\link[base]{Memory}}.}
\item{blas.threads:}{Number of threads to use for the BLAS backend. Used to set environment variables \dQuote{MKL_NUM_THREADS} and \dQuote{OPENBLAS_NUM_THREADS}. Can be passed as additional column as part of \code{ids} to set per-job resources.}
\item{measure.memory:}{Enable memory measurement for jobs. Comes with a small runtime overhead.}
\item{chunks.as.arrayjobs:}{Execute chunks as array jobs.}
\item{pm.backend:}{Start a \pkg{parallelMap} backend on the slave.}
\item{foreach.backend:}{Start a \pkg{foreach} backend on the slave.}
\item{clusters:}{Resource used for Slurm to select the set of clusters to run \code{sbatch}/\code{squeue}/\code{scancel} on.}
}
}
\section{Chunking of Jobs}{
Multiple jobs can be grouped (chunked) together to be executed sequentially on the batch system as a single batch job.
This is especially useful to avoid overburding the scheduler by submitting thousands of jobs simultaneously.
To chunk jobs together, job ids must be provided as \code{data.frame} with columns \dQuote{job.id} and \dQuote{chunk} (integer).
All jobs with the same chunk number will be executed sequentially inside the same batch job.
The utility functions \code{\link{chunk}}, \code{\link{binpack}} and \code{\link{lpt}}
can assist in grouping jobs.
}
\section{Array Jobs}{
If your cluster supports array jobs, you can set the resource \code{chunks.as.arrayjobs} to \code{TRUE} in order
to execute chunks as job arrays on the cluster.
For each chunk of size \code{n}, \pkg{batchtools} creates a \code{\link{JobCollection}} of (possibly heterogeneous) jobs which is
submitted to the scheduler as a single array job with \code{n} repetitions.
For each repetition, the \code{JobCollection} is first read from the file system, then subsetted to the \code{i}-th job using
the environment variable \code{reg$cluster.functions$array.var} (depending on the cluster backend, defined automatically) and finally
executed.
}
\section{Order of Submission}{
Jobs are submitted in the order of chunks, i.e. jobs which have chunk number
\code{sort(unique(ids$chunk))[1]} first, then jobs with chunk number \code{sort(unique(ids$chunk))[2]}
and so on. If no chunks are provided, jobs are submitted in the order of \code{ids$job.id}.
}
\section{Limiting the Number of Jobs}{
If requested, \code{submitJobs} tries to limit the number of concurrent jobs of the user by waiting until jobs terminate
before submitting new ones.
This can be controlled by setting \dQuote{max.concurrent.jobs} in the configuration file (see \code{\link{Registry}})
or by setting the resource \dQuote{max.concurrent.jobs} to the maximum number of jobs to run simultaneously.
If both are set, the setting via the resource takes precedence over the setting in the configuration.
}
\section{Measuring Memory}{
Setting the resource \code{measure.memory} to \code{TRUE} turns on memory measurement:
\code{\link[base]{gc}} is called directly before and after the job and the difference is
stored in the internal database. Note that this is just a rough estimate and does
neither work reliably for external code like C/C++ nor in combination with threading.
}
\section{Inner Parallelization}{
Inner parallelization is typically done via threading, sockets or MPI.
Two backends are supported to assist in setting up inner parallelization.
The first package is \pkg{parallelMap}.
If you set the resource \dQuote{pm.backend} to \dQuote{multicore}, \dQuote{socket} or \dQuote{mpi},
\code{\link[parallelMap]{parallelStart}} is called on the slave before the first job in the chunk is started
and \code{\link[parallelMap]{parallelStop}} is called after the last job terminated.
This way, the resources for inner parallelization can be set and get automatically stored just like other computational resources.
The function provided by the user just has to call \code{\link[parallelMap]{parallelMap}} to start parallelization using the preconfigured backend.
To control the number of CPUs, you have to set the resource \code{ncpus}.
Otherwise \code{ncpus} defaults to the number of available CPUs (as reported by (see \code{\link[parallel]{detectCores}}))
on the executing machine for multicore and socket mode and defaults to the return value of \code{\link[Rmpi]{mpi.universe.size}-1} for MPI.
Your template must be set up to handle the parallelization, e.g. request the right number of CPUs or start R with \code{mpirun}.
You may pass further options like \code{level} to \code{\link[parallelMap]{parallelStart}} via the named list \dQuote{pm.opts}.
The second supported parallelization backend is \pkg{foreach}.
If you set the resource \dQuote{foreach.backend} to \dQuote{seq} (sequential mode), \dQuote{parallel} (\pkg{doParallel}) or
\dQuote{mpi} (\pkg{doMPI}), the requested \pkg{foreach} backend is automatically registered on the slave.
Again, the resource \code{ncpus} is used to determine the number of CPUs.
Neither the namespace of \pkg{parallelMap} nor the namespace \pkg{foreach} are attached.
You have to do this manually via \code{\link[base]{library}} or let the registry load the packages for you.
}
\examples{
\dontshow{ batchtools:::example_push_temp(3) }
### Example 1: Submit subsets of jobs
tmp = makeRegistry(file.dir = NA, make.default = FALSE)
# toy function which fails if x is even and an input file does not exists
fun = function(x, fn) if (x \%\% 2 == 0 && !file.exists(fn)) stop("file not found") else x
# define jobs via batchMap
fn = tempfile()
ids = batchMap(fun, 1:20, reg = tmp, fn = fn)
# submit some jobs
ids = 1:10
submitJobs(ids, reg = tmp)
waitForJobs(ids, reg = tmp)
getStatus(reg = tmp)
# create the required file and re-submit failed jobs
file.create(fn)
submitJobs(findErrors(ids, reg = tmp), reg = tmp)
getStatus(reg = tmp)
# submit remaining jobs which have not yet been submitted
ids = findNotSubmitted(reg = tmp)
submitJobs(ids, reg = tmp)
getStatus(reg = tmp)
# collect results
reduceResultsList(reg = tmp)
### Example 2: Using memory measurement
tmp = makeRegistry(file.dir = NA, make.default = FALSE)
# Toy function which creates a large matrix and returns the column sums
fun = function(n, p) colMeans(matrix(runif(n*p), n, p))
# Arguments to fun:
args = data.table::CJ(n = c(1e4, 1e5), p = c(10, 50)) # like expand.grid()
print(args)
# Map function to create jobs
ids = batchMap(fun, args = args, reg = tmp)
# Set resources: enable memory measurement
res = list(measure.memory = TRUE)
# Submit jobs using the currently configured cluster functions
submitJobs(ids, resources = res, reg = tmp)
# Retrive information about memory, combine with parameters
info = ijoin(getJobStatus(reg = tmp)[, .(job.id, mem.used)], getJobPars(reg = tmp))
print(unwrap(info))
# Combine job info with results -> each job is aggregated using mean()
unwrap(ijoin(info, reduceResultsDataTable(fun = function(res) list(res = mean(res)), reg = tmp)))
### Example 3: Multicore execution on the slave
tmp = makeRegistry(file.dir = NA, make.default = FALSE)
# Function which sleeps 10 seconds, i-times
f = function(i) {
parallelMap::parallelMap(Sys.sleep, rep(10, i))
}
# Create one job with parameter i=4
ids = batchMap(f, i = 4, reg = tmp)
# Set resources: Use parallelMap in multicore mode with 4 CPUs
# batchtools internally loads the namespace of parallelMap and then
# calls parallelStart() before the job and parallelStop() right
# after the job last job in the chunk terminated.
res = list(pm.backend = "multicore", ncpus = 4)
\dontrun{
# Submit both jobs and wait for them
submitJobs(resources = res, reg = tmp)
waitForJobs(reg = tmp)
# If successfull, the running time should be ~10s
getJobTable(reg = tmp)[, .(job.id, time.running)]
# There should also be a note in the log:
grepLogs(pattern = "parallelMap", reg = tmp)
}
}
|