File: waitForWorker.R

package info (click to toggle)
r-cran-future.batchtools 0.12.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 528 kB
  • sloc: sh: 82; makefile: 2
file content (97 lines) | stat: -rw-r--r-- 2,728 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
waitForWorker <- function(future, ...) {
  UseMethod("waitForWorker")
}

waitForWorker.default <- function(future, ...) NULL

waitForWorker.BatchtoolsUniprocessFuture <- function(future, ...) NULL


registerFuture <- function(future, ...) {
  UseMethod("registerFuture")
}

registerFuture.default <- function(future, ...) NULL

registerFuture.BatchtoolsUniprocessFuture <- function(future, ...) NULL



unregisterFuture <- function(future, ...) {
  UseMethod("unregisterFuture")
}

unregisterFuture.default <- function(future, ...) NULL

unregisterFuture.BatchtoolsUniprocessFuture <- function(future, ...) NULL


registerFuture.BatchtoolsFuture <- function(future, ...) {
  freg <- sprintf("workers-%s", class(future)[1])
  FutureRegistry(freg, action = "add", future = future, earlySignal = FALSE, ...)
}


unregisterFuture.BatchtoolsFuture <- function(future, ...) {
  freg <- sprintf("workers-%s", class(future)[1])
  FutureRegistry(freg, action = "remove", future = future, ...)
}


#' @importFrom future FutureError
waitForWorker.BatchtoolsFuture <- function(future,
         workers,
         await = NULL,
         timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
         delta = getOption("future.wait.interval", 0.2),
         alpha = getOption("future.wait.alpha", 1.01)) {
  debug <- getOption("future.debug", FALSE)

  stop_if_not(is.null(await) || is.function(await))
  workers <- as.integer(workers)
  stop_if_not(length(workers) == 1, is.finite(workers), workers >= 1L)
  stop_if_not(length(timeout) == 1, is.finite(timeout), timeout >= 0)
  stop_if_not(length(alpha) == 1, is.finite(alpha), alpha > 0)

  freg <- sprintf("workers-%s", class(future)[1])

  ## Use a default await() function?
  if (is.null(await)) {
    await <- function() FutureRegistry(freg, action = "collect-first")
  }  
 
  ## Number of occupied workers
  usedWorkers <- function() {
    length(FutureRegistry(freg, action = "list", earlySignal = FALSE))
  }

  t0 <- Sys.time()
  dt <- 0
  iter <- 1L
  interval <- delta
  finished <- FALSE
  while (dt <= timeout) {
    ## Check for available workers
    used <- usedWorkers()
    finished <- (used < workers)
    if (finished) break

    if (debug) mdebugf("Poll #%d (%s): usedWorkers() = %d, workers = %d", iter, format(round(dt, digits = 2L)), used, workers)

    ## Wait
    Sys.sleep(interval)
    interval <- alpha * interval
    
    ## Finish/close workers, iff possible
    await()

    iter <- iter + 1L
    dt <- difftime(Sys.time(), t0)
  }

  if (!finished) {
    msg <- sprintf("TIMEOUT: All %d workers are still occupied after %s (polled %d times)", workers, format(round(dt, digits = 2L)), iter)
    if (debug) mdebug(msg)
    stop(FutureError(msg))
  }
}