1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
waitForWorker <- function(future, ...) {
UseMethod("waitForWorker")
}
waitForWorker.default <- function(future, ...) NULL
waitForWorker.BatchtoolsUniprocessFuture <- function(future, ...) NULL
registerFuture <- function(future, ...) {
UseMethod("registerFuture")
}
registerFuture.default <- function(future, ...) NULL
registerFuture.BatchtoolsUniprocessFuture <- function(future, ...) NULL
unregisterFuture <- function(future, ...) {
UseMethod("unregisterFuture")
}
unregisterFuture.default <- function(future, ...) NULL
unregisterFuture.BatchtoolsUniprocessFuture <- function(future, ...) NULL
registerFuture.BatchtoolsFuture <- function(future, ...) {
freg <- sprintf("workers-%s", class(future)[1])
FutureRegistry(freg, action = "add", future = future, earlySignal = FALSE, ...)
}
unregisterFuture.BatchtoolsFuture <- function(future, ...) {
freg <- sprintf("workers-%s", class(future)[1])
FutureRegistry(freg, action = "remove", future = future, ...)
}
#' @importFrom future FutureError
waitForWorker.BatchtoolsFuture <- function(future,
workers,
await = NULL,
timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
delta = getOption("future.wait.interval", 0.2),
alpha = getOption("future.wait.alpha", 1.01)) {
debug <- getOption("future.debug", FALSE)
stop_if_not(is.null(await) || is.function(await))
workers <- as.integer(workers)
stop_if_not(length(workers) == 1, is.finite(workers), workers >= 1L)
stop_if_not(length(timeout) == 1, is.finite(timeout), timeout >= 0)
stop_if_not(length(alpha) == 1, is.finite(alpha), alpha > 0)
freg <- sprintf("workers-%s", class(future)[1])
## Use a default await() function?
if (is.null(await)) {
await <- function() FutureRegistry(freg, action = "collect-first")
}
## Number of occupied workers
usedWorkers <- function() {
length(FutureRegistry(freg, action = "list", earlySignal = FALSE))
}
t0 <- Sys.time()
dt <- 0
iter <- 1L
interval <- delta
finished <- FALSE
while (dt <= timeout) {
## Check for available workers
used <- usedWorkers()
finished <- (used < workers)
if (finished) break
if (debug) mdebugf("Poll #%d (%s): usedWorkers() = %d, workers = %d", iter, format(round(dt, digits = 2L)), used, workers)
## Wait
Sys.sleep(interval)
interval <- alpha * interval
## Finish/close workers, iff possible
await()
iter <- iter + 1L
dt <- difftime(Sys.time(), t0)
}
if (!finished) {
msg <- sprintf("TIMEOUT: All %d workers are still occupied after %s (polled %d times)", workers, format(round(dt, digits = 2L)), iter)
if (debug) mdebug(msg)
stop(FutureError(msg))
}
}
|