1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
#' @title Estimate Remaining Runtimes
#'
#' @description
#' Estimates the runtimes of jobs using the random forest implemented in \pkg{ranger}.
#' Observed runtimes are retrieved from the \code{\link{Registry}} and runtimes are
#' predicted for unfinished jobs.
#'
#' The estimated remaining time is calculated in the \code{print} method.
#' You may also pass \code{n} here to determine the number of parallel jobs which is then used
#' in a simple Longest Processing Time (LPT) algorithm to give an estimate for the parallel runtime.
#'
#' @param tab [\code{\link{data.table}}]\cr
#' Table with column \dQuote{job.id} and additional columns to predict the runtime.
#' Observed runtimes will be looked up in the registry and serve as dependent variable.
#' All columns in \code{tab} except \dQuote{job.id} will be passed to \code{\link[ranger]{ranger}} as
#' independent variables to fit the model.
#' @param ... [ANY]\cr
#' Additional parameters passed to \code{\link[ranger]{ranger}}. Ignored for the \code{print} method.
#' @template reg
#' @return [\code{RuntimeEstimate}] which is a \code{list} with two named elements:
#' \dQuote{runtimes} is a \code{\link{data.table}} with columns \dQuote{job.id},
#' \dQuote{runtime} (in seconds) and \dQuote{type} (\dQuote{estimated} if runtime is estimated,
#' \dQuote{observed} if runtime was observed).
#' The other element of the list named \dQuote{model}] contains the fitted random forest object.
#' @export
#' @seealso \code{\link{binpack}} and \code{\link{lpt}} to chunk jobs according to their estimated runtimes.
#' @examples
#' \dontshow{ batchtools:::example_push_temp(1) }
#' # Create a simple toy registry
#' set.seed(1)
#' tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE, seed = 1)
#' addProblem(name = "iris", data = iris, fun = function(data, ...) nrow(data), reg = tmp)
#' addAlgorithm(name = "nrow", function(instance, ...) nrow(instance), reg = tmp)
#' addAlgorithm(name = "ncol", function(instance, ...) ncol(instance), reg = tmp)
#' addExperiments(algo.designs = list(nrow = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp)
#' addExperiments(algo.designs = list(ncol = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp)
#'
#' # We use the job parameters to predict runtimes
#' tab = unwrap(getJobPars(reg = tmp))
#'
#' # First we need to submit some jobs so that the forest can train on some data.
#' # Thus, we just sample some jobs from the registry while grouping by factor variables.
#' library(data.table)
#' ids = tab[, .SD[sample(nrow(.SD), 5)], by = c("problem", "algorithm", "y")]
#' setkeyv(ids, "job.id")
#' submitJobs(ids, reg = tmp)
#' waitForJobs(reg = tmp)
#'
#' # We "simulate" some more realistic runtimes here to demonstrate the functionality:
#' # - Algorithm "ncol" is 5 times more expensive than "nrow"
#' # - x has no effect on the runtime
#' # - If y is "a" or "b", the runtimes are really high
#' runtime = function(algorithm, x, y) {
#' ifelse(algorithm == "nrow", 100L, 500L) + 1000L * (y %in% letters[1:2])
#' }
#' tmp$status[ids, done := done + tab[ids, runtime(algorithm, x, y)]]
#' rjoin(sjoin(tab, ids), getJobStatus(ids, reg = tmp)[, c("job.id", "time.running")])
#'
#' # Estimate runtimes:
#' est = estimateRuntimes(tab, reg = tmp)
#' print(est)
#' rjoin(tab, est$runtimes)
#' print(est, n = 10)
#'
#' # Submit jobs with longest runtime first:
#' ids = est$runtimes[type == "estimated"][order(runtime, decreasing = TRUE)]
#' print(ids)
#' \dontrun{
#' submitJobs(ids, reg = tmp)
#' }
#'
#' # Group jobs into chunks with runtime < 1h
#' ids = est$runtimes[type == "estimated"]
#' ids[, chunk := binpack(runtime, 3600)]
#' print(ids)
#' print(ids[, list(runtime = sum(runtime)), by = chunk])
#' \dontrun{
#' submitJobs(ids, reg = tmp)
#' }
#'
#' # Group jobs into 10 chunks with similar runtime
#' ids = est$runtimes[type == "estimated"]
#' ids[, chunk := lpt(runtime, 10)]
#' print(ids[, list(runtime = sum(runtime)), by = chunk])
estimateRuntimes = function(tab, ..., reg = getDefaultRegistry()) {
assertRegistry(reg, sync = TRUE)
data = copy(convertIds(reg, tab, keep.extra = names(tab)))
if (!requireNamespace("ranger", quietly = TRUE))
stop("Please install package 'ranger' for runtime estimation")
data[, "runtime" := as.numeric(getJobStatus(tab, reg)$time.running)]
i = is.na(data$runtime)
if (all(i))
stop("No training data available. Some jobs must be finished before estimating runtimes.")
rf = ranger::ranger(runtime ~ ., data = data[!i, !"job.id"], ...)
data[i, "runtime" := predict(rf, .SD)$predictions, .SDcols = chsetdiff(names(data), c("job.id", "runtime"))]
data$type = factor(ifelse(i, "estimated", "observed"), levels = c("observed", "estimated"))
setClasses(list(runtimes = data[, c("job.id", "type", "runtime")], model = rf), c("RuntimeEstimate", class(data)))
}
#' @rdname estimateRuntimes
#' @param x [\code{RuntimeEstimate}]\cr
#' Object to print.
#' @param n [\code{integer(1)}]\cr
#' Number of parallel jobs to assume for runtime estimation.
#' @export
print.RuntimeEstimate = function(x, n = 1L, ...) {
ps = function(x, nc = 2L) {
sprintf(paste0("%0", nc, "id %02ih %02im %.1fs"),
floor(x / 86400),
floor((x / 3600) %% 24L),
floor((x / 60) %% 60L),
x %% 60L
)
}
assertCount(n, positive = TRUE)
runtime = type = NULL
calculated = x$runtimes[type == "observed", sum(runtime)]
remaining = x$runtimes[type == "estimated", sum(runtime)]
total = calculated + remaining
nc = max(1L, nchar(total %/% 86400))
catf("Runtime Estimate for %i jobs with %i CPUs", nrow(x$runtimes), n)
catf(" Done : %s", ps(calculated, nc = nc))
if (x$runtimes[type == "estimated", .N] > 0L) {
catf(" Remaining: %s", ps(remaining, nc = nc))
if (n >= 2L) {
rt = x$runtimes[type == "estimated"]$runtime
bins = lpt(rt, n)
bins = vnapply(split(rt, bins), sum)
catf(" Parallel : %s", ps(max(bins), nc = nc))
}
}
catf(" Total : %s", ps(total, nc = nc))
}
|