1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
|
debug_msg_can_print <- FALSE
debug_msg <- function(...) {
if (debug_msg_can_print) {
message(...)
}
}
assert_work_queue_pkgs <- local({
val <- NULL
function() {
if (!is.null(val)) return()
for (pkg in list(
list(name = "future", version = "1.21.0"),
list(name = "fastmap", version = "1.1.0")
)) {
if (!is_available(pkg$name, pkg$version)) {
stop("Package `", pkg$name, "` (", pkg$version, ") needs to be installed")
}
}
val <<- TRUE
return()
}
})
future_worker_is_free <- function() {
future::nbrOfFreeWorkers() > 0
}
Delay <- R6::R6Class("Delay",
private = list(
delay_count = 0
),
public = list(
is_reset = function() {
private$delay_count == 0
},
reset = function() {
private$delay_count <- 0
},
increase = function() {
private$delay_count <- private$delay_count + 1
},
delay = function() {
stop("$delay() not implemented")
}
), active = list(
count = function() {
private$delay_count
}
)
)
ExpoDelay <- R6::R6Class("ExpoDelay",
inherit = Delay,
private = list(
base = 1 / 100,
min_seconds = 0.01,
max_seconds = 2
),
public = list(
initialize = function(
base = 1 / 100,
min_seconds = 0.01,
max_seconds = 2
) {
stopifnot(length(base) == 1 && is.numeric(base) && base >= 0)
stopifnot(length(min_seconds) == 1 && is.numeric(min_seconds) && min_seconds >= 0)
stopifnot(length(max_seconds) == 1 && is.numeric(max_seconds) && max_seconds >= 0)
private$base <- base
private$max_seconds <- max_seconds
private$min_seconds <- min_seconds
self
},
# return number of milliseconds until next attempt
# will randomly backoff to avoid extra work on long poll times
delay = function() {
# calculate expo backoff value
expo_val <- private$base * ((2 ^ private$delay_count) - 1)
# find random value
random_val <- runif(n = 1, max = min(private$max_seconds, expo_val))
# perform `min()` on second step to avoid `runif(1, min = 5, max = 4)` which produces `NaN`
max(private$min_seconds, random_val)
}
)
)
# Situations
# √ No future workers are busy. All future calls are `future_promises()`
# * Can be accomplished using future job followup promise which calls `$attempt_work()`
# √ All future workers are busy with other tasks (but will become available).
# * Require using delay
# √ While processing the first batch, existing future workers are taken over
# * Require delay
# FIFO queue of workers
#' Future promise work queue
#'
#' #' `r lifecycle::badge('experimental')`
#'
#' An \pkg{R6} class to help with scheduling work to be completed. `WorkQueue` will only execute work if the `can_proceed()` returns `TRUE`. For the use case of `future`, `can_proceed()` defaults to `future::nbrOfFreeWorkers() > 0` which will not allow for work to be executed if a \pkg{future} worker is not available.
#'
#' `WorkQueue` will constantly try to start new work once prior work item finishes. However, if `can_proceed()` returns `FALSE` (no future workers are available) and there is more work to be done, then work is attempted later a random amount of time later using exponential backoff. The exponential backoff will cap out at 10 seconds to prevent unnecessarily large wait times.
#'
#' Each time `WorkQueue` tries to start more work, it will repeat until `can_proceed()` returns `FALSE` or there is no more work in the `queue`.
#'
#' @section Global event loop:
#'
#' The global loop is used by default as the internal `WorkQueue` "delayed check" uses a single delay check for the whole queue, rather than having each item in the queue attempt to process.
#' This behavior might change in the future, but we are not exactly sure how at this point.
#'
#' If a private `later` loop wants to become synchronous by running until all jobs are completed but is waiting on a `future_promise()`, the private loop will not complete unless the global loop is allowed to move forward.
#'
#' However, it is possible to use a private loop inside a user-defined `WorkQueue` may work which can be provided directly to `future_promise(queue=custom_queue)`. Having a concrete example (or need) will help us understand the problem better. If you have an example, please reach out .
#'
#' @seealso [future_promise_queue()] which returns a `WorkQueue` which is cached per R session.
#' @keywords internal
WorkQueue <- R6::R6Class("WorkQueue",
# TODO - private loop proposal:
# The queued data would actually be a list of queues whose _key_ matches
# the loop ID. This would require that `schedule_work()` take in `loop` and have each loop have its own queue.
# The scheduled work in each queue would contain the `work` function and `submission_time`.
# Once `can_proceed_fn()` returns TRUE, the queue with the earliest `submission_time` should be processed.
# This concept is similar to a merge sort when trying to merge two pre-sorted lists.
# Check time: O(k), k = number of later loops ever registered. (This could become big!)
# Maybe, if `ID != 0`, the queue is removed if the number of elements goes to 0.
# Check time: O(kk), kk <= k, kk = number of _active_ later loops.
#
# Thought process, let's say chromote used the global WorkQueue
# and wanted to have its local loop synchronize (.. while(later::loop_empty(local_loop)) later::run_now(local_loop)) ..).
# [ ] In `do_work()`, Get loop from `later::current_loop()`
# * If is global loop, get item with earliest submission time
# * If is private loop (ID != 0), get first item in private queue
# [ ] Validate that an work item finishing in loop X, has a promise created using loop X
# [ ] Would each private queue need its own delay check?
#
#
# Implementation question:
# * Would it be better if we have a larger WorkQueue class that managed many WorkQueues for each loop?
# * This would allow for each loop to have its own delay check, delay counter, and loop
private = list(
queue = "fastmap::fastqueue()",
can_proceed_fn = "future_worker_is_free()",
# only _really_ used for delay checking
loop = "later::global_loop()",
delay = "ExpoDelay$new()",
# Used as a semaphore to make sure only 1 call of `attempt_work()` is delayed
cancel_delayed_attempt_work = NULL,
# Increment delay. Used with ExpoDelay
increase_delay = function() {
debug_msg("increase_delay()...", private$delay$count)
# Increment delay and try again later
private$delay$increase()
},
# Reset delay and cancel any delayed `attempt_work()` calls
reset_delay = function() {
debug_msg("reset_delay()")
private$delay$reset()
# disable any delayed executions of `attempt_work`
if (is.function(private$cancel_delayed_attempt_work)) {
private$cancel_delayed_attempt_work()
}
private$cancel_delayed_attempt_work <- NULL
},
# Returns a logical which let's work begin
can_proceed = function() {
isTRUE(private$can_proceed_fn())
},
# Function to attempt as much work as possible
# If no workers are available and a queue has elements,
# If a delayed check has already been registered, Return
# Else, check again after some delay
attempt_work = function(can_delay = FALSE) {
debug_msg('attempt_work()')
# If nothing to start, return early
if (private$queue$size() == 0) return()
# If we are not waiting on someone else, we can do work now
while ((private$queue$size() > 0) && private$can_proceed()) {
# Do work right away
private$reset_delay()
private$do_work()
}
# If there are still items to be processed, but we can not proceed...
if (private$queue$size() > 0 && ! private$can_proceed()) {
# If we are allowed to delay (default FALSE), or nothing is currently delaying
if (can_delay || is.null(private$cancel_delayed_attempt_work)) {
# Try again later
private$increase_delay()
private$cancel_delayed_attempt_work <-
later::later(
loop = private$loop,
delay = private$delay$delay(),
function() {
private$attempt_work(can_delay = TRUE)
}
)
}
}
},
# Actually process an item in the queue
do_work = function() {
debug_msg("do_work()")
# Get first item in queue
work_fn <- private$queue$remove()
# Safety check...
# If nothing is returned, no work to be done. Return early
if (!is.function(work_fn)) return()
# Do scheduled work
debug_msg("execute work")
future_job <- work_fn()
# Try to attempt work immediately after the future job has finished
finally(future_job, function() {
debug_msg("finished work. queue size: ", private$queue$size())
private$attempt_work()
})
return()
}
),
public = list(
#' @description Create a new `WorkQueue`
#' @param can_proceed Function that should return a logical value. If `TRUE` is returned, then the next scheduled work will be executed. By default, this function checks if \code{\link[future:nbrOfWorkers]{future::nbrOfFreeWorkers()} > 0}
#' @param queue Queue object to use to store the scheduled work. By default, this is a "First In, First Out" queue using [fastmap::fastqueue()]. If using your own queue, it should have the methods `$add(x)`, `$remove()`, `$size()`.
#' @param loop \pkg{later} loop to use for calculating the next delayed check. Defaults to [later::global_loop()].
initialize = function(
# defaults to a future::plan agnostic function
can_proceed = future_worker_is_free,
queue = fastmap::fastqueue(), # FIFO
loop = later::global_loop()
) {
stopifnot(is.function(can_proceed))
stopifnot(
is.function(queue$add) &&
is.function(queue$remove) &&
is.function(queue$size)
)
stopifnot(inherits(loop, "event_loop"))
delay <- ExpoDelay$new()
stopifnot(inherits(delay, "Delay"))
private$can_proceed_fn <- can_proceed
private$queue <- queue
private$loop <- loop
private$delay <- delay
# make sure delay is reset
private$reset_delay()
self
},
# add to schedule only
#' Schedule work
#' @param fn function to execute when `can_proceed()` returns `TRUE`.
schedule_work = function(fn) {
debug_msg("schedule_work()")
stopifnot(is.function(fn))
private$queue$add(fn)
private$attempt_work()
invisible(self)
}
)
)
#' @describeIn future_promise Default `future_promise()` work queue to use. This function returns a [WorkQueue] that is cached per R session.
#' @seealso [`WorkQueue`]
#' @export
future_promise_queue <- local({
future_promise_queue_ <- NULL
function() {
if (is.null(future_promise_queue_)) {
assert_work_queue_pkgs()
future_promise_queue_ <<- WorkQueue$new()
}
future_promise_queue_
}
})
#' \pkg{future} promise
#'
#' `r lifecycle::badge('experimental')`
#'
#' When submitting \pkg{future} work, \pkg{future} (by design) will block the main R session until a worker becomes available.
#' This occurs when there is more submitted \pkg{future} work than there are available \pkg{future} workers.
#' To counter this situation, we can create a promise to execute work using future (using `future_promise()`) and only begin the work if a \pkg{future} worker is available.
#'
#' Using `future_promise()` is recommended when ever a continuous runtime is used, such as with \pkg{plumber} or \pkg{shiny}.
#'
#' For more details and examples, please see the [`vignette("future_promise", "promises")`](https://rstudio.github.io/promises/articles/future_promise.html) vignette().
#' @describeIn future_promise Creates a [promise()] that will execute the `expr` using [future::future()].
#' @inheritParams future::future
#' @param expr An R expression. While the `expr` is eventually sent to [`future::future()`], please use the same precautions that you would use with regular `promises::promise()` expressions. `future_promise()` may have to hold the `expr` in a [promise()] while waiting for a \pkg{future} worker to become available.
#' @param ... extra parameters provided to [`future::future()`]
#' @param queue A queue that is used to schedule work to be done using [future::future()]. This queue defaults to [future_promise_queue()] and requires that method `queue$schedule_work(fn)` exist. This method should take in a function that will execute the promised \pkg{future} work.
#' @return Unlike [future::future()`], `future_promise()` returns a [promise()] object that will eventually resolve the \pkg{future} `expr`.
#' @examples
#' \donttest{# Relative start time
#' start <- Sys.time()
#' # Helper to force two `future` workers
#' with_two_workers <- function(expr) {
#' if (!require("future")) {
#' message("`future` not installed")
#' return()
#' }
#' old_plan <- future::plan(future::multisession(workers = 2))
#' on.exit({future::plan(old_plan)}, add = TRUE)
#' start <<- Sys.time()
#' force(expr)
#' while(!later::loop_empty()) {Sys.sleep(0.1); later::run_now()}
#' invisible()
#' }
#' # Print a status message. Ex: `"PID: XXX; 2.5s promise done"`
#' print_msg <- function(pid, msg) {
#' message(
#' "PID: ", pid, "; ",
#' round(difftime(Sys.time(), start, units = "secs"), digits = 1), "s " ,
#' msg
#' )
#' }
#'
#' # `"promise done"` will appear after four workers are done and the main R session is not blocked
#' # The important thing to note is the first four times will be roughly the same
#' with_two_workers({
#' promise_resolve(Sys.getpid()) %...>% print_msg("promise done")
#' for (i in 1:6) future::future({Sys.sleep(1); Sys.getpid()}) %...>% print_msg("future done")
#' })
#' {
#' #> PID: XXX; 2.5s promise done
#' #> PID: YYY; 2.6s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 2.6s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 3.4s future done
#' #> PID: ZZZ; 3.6s future done
#' }
#'
#' # `"promise done"` will almost immediately, before any workers have completed
#' # The first two `"future done"` comments appear earlier the example above
#' with_two_workers({
#' promise_resolve(Sys.getpid()) %...>% print_msg("promise")
#' for (i in 1:6) future_promise({Sys.sleep(1); Sys.getpid()}) %...>% print_msg("future done")
#' })
#' {
#' #> PID: XXX; 0.2s promise done
#' #> PID: YYY; 1.3s future done
#' #> PID: ZZZ; 1.4s future done
#' #> PID: YYY; 2.5s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 3.4s future done
#' #> PID: ZZZ; 3.6s future done
#' }}
#' @export
future_promise <- function(
expr = NULL,
envir = parent.frame(),
substitute = TRUE,
globals = TRUE,
packages = NULL,
...,
queue = future_promise_queue()
) {
# make sure queue is the right structure
stopifnot(is.function(queue$schedule_work) && length(formals(queue$schedule_work)) >= 1)
if (substitute) expr <- substitute(expr)
# Force all variables to curb values changing before execution
# Does NOT fix R environment values changing
force(envir)
force(substitute)
force(globals)
force(packages)
force(list(...))
## Record globals
gp <- future::getGlobalsAndPackages(expr, envir = envir, globals = globals)
force(gp)
promise(function(resolve, reject) {
# add to queue
queue$schedule_work(function() {
### Should the worker function be taken at creation time or submission time?
## The current implementation has `$can_proceed()` method of `WorkQueue` be plan agnostic.
## Therefore, it will always ask the current plan if a worker is available.
## If so, then the _current_ plan should be used. Not a plan that existed at initialization time.
exec_future <- future::plan()
# execute the future and return a promise so the schedule knows exactly when it is done
future_job <- exec_future(
gp$expr,
envir = envir,
substitute = FALSE,
globals = gp$globals,
packages = unique(c(packages, gp$packages)),
...
)
# Resolve the outer promising job value
resolve(future_job)
# Return a promising object that can be chained by the `queue` after executing this _work_
future_job
})
})
}
if (FALSE) {
# ConstDelay <- R6::R6Class("ConstDelay",
# inherit = Delay,
# private = list(
# const = 0.1,
# random = TRUE
# ),
# public = list(
# initialize = function(const = 0.1, random = TRUE) {
# stopifnot(length(const) == 1 && is.numeric(const) && const >= 0)
# private$const <- const
# private$random <- isTRUE(random)
# self
# },
# delay = function() {
# if (private$random) {
# runif(n = 1, max = private$const)
# } else {
# private$const
# }
# }
# )
# )
# LinearDelay <- R6::R6Class("LinearDelay",
# inherit = Delay,
# private = list(
# delta = 0.05,
# random = TRUE
# ),
# public = list(
# initialize = function(delta = 0.03, random = TRUE) {
# stopifnot(length(delta) == 1 && is.numeric(delta) && delta >= 0)
# private$delta <- delta
# private$random <- isTRUE(random)
# self
# },
# delay = function() {
# delta_delay <- private$delay_count * private$delta
# if (private$random) {
# runif(n = 1, max = delta_delay)
# } else {
# delta_delay
# }
# }
# )
# )
# dev_load <- pkgload::load_all
# ## test
# dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) };
# ## block workers mid job
# dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:2, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i) }) -> ignore;
# ## block main worker mid job
# dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:4, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i/4) }) -> ignore;
# ## block workers pre job
# dev_load(); print_i(); lapply(1:2, function(i) { message("*************** adding blockage", i); future::future({ Sys.sleep(4); message("*************** blockage done", i); i}) }) -> future_jobs; lapply(future_jobs, function(fj) { as.promise(fj) %...>% { print(.) } }); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) };
# ## block main worker workers pre job
# dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:4, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i/4) }) -> ignore;
future::plan(future::multisession(workers = 2))
debug_msg_can_print <- TRUE
print_i <- function(i = 0) { if (i <= 50) { print(i); later::later(function() { print_i(i + 1) }, delay = 0.1) } }
slow_calc <- function(n) {
Sys.sleep(n)
"slow!"
}
n <- 2
prom <- future_promise
# prom <- future::future
a1 <- prom({
print(paste0("start 1 - ", Sys.time()))
print(slow_calc(n))
})
a2 <- prom({
print(paste0("start 2 - ", Sys.time()))
print(slow_calc(n))
})
a3 <- prom({
print(paste0("start 3 - ", Sys.time()))
print(slow_calc(n))
})
a4 <- prom({
print(paste0("start 4 - ", Sys.time()))
print(slow_calc(n))
})
print("done assignement!")
a1 %...>% { message("end 1 - ", format(Sys.time())) }
a2 %...>% { message("end 2 - ", format(Sys.time())) }
a3 %...>% { message("end 3 - ", format(Sys.time())) }
a4 %...>% { message("end 4 - ", format(Sys.time())) }
}
|