File: future_promise.R

package info (click to toggle)
r-cran-promises 1.2.0.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,420 kB
  • sloc: cpp: 45; sh: 13; makefile: 2
file content (541 lines) | stat: -rw-r--r-- 20,902 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541

debug_msg_can_print <- FALSE
debug_msg <- function(...) {
  if (debug_msg_can_print) {
    message(...)
  }
}

assert_work_queue_pkgs <- local({
  val <- NULL
  function() {
    if (!is.null(val)) return()
    for (pkg in list(
      list(name = "future", version = "1.21.0"),
      list(name = "fastmap", version = "1.1.0")
    )) {
      if (!is_available(pkg$name, pkg$version)) {
        stop("Package `", pkg$name, "` (", pkg$version, ") needs to be installed")
      }
    }
    val <<- TRUE
    return()
  }
})
future_worker_is_free <- function() {
  future::nbrOfFreeWorkers() > 0
}


Delay <- R6::R6Class("Delay",
  private = list(
    delay_count = 0
  ),
  public = list(
    is_reset = function() {
      private$delay_count == 0
    },
    reset = function() {
      private$delay_count <- 0
    },

    increase = function() {
      private$delay_count <- private$delay_count + 1
    },

    delay = function() {
      stop("$delay() not implemented")
    }
  ), active = list(
    count = function() {
      private$delay_count
    }
  )
)

ExpoDelay <- R6::R6Class("ExpoDelay",
  inherit = Delay,
  private = list(
    base = 1 / 100,
    min_seconds = 0.01,
    max_seconds = 2
  ),
  public = list(
    initialize = function(
      base = 1 / 100,
      min_seconds = 0.01,
      max_seconds = 2
    ) {
      stopifnot(length(base) == 1 && is.numeric(base) && base >= 0)
      stopifnot(length(min_seconds) == 1 && is.numeric(min_seconds) && min_seconds >= 0)
      stopifnot(length(max_seconds) == 1 && is.numeric(max_seconds) && max_seconds >= 0)

      private$base <- base
      private$max_seconds <- max_seconds
      private$min_seconds <- min_seconds

      self
    },

    # return number of milliseconds until next attempt
    # will randomly backoff to avoid extra work on long poll times
    delay = function() {
      # calculate expo backoff value
      expo_val <- private$base * ((2 ^ private$delay_count) - 1)
      # find random value
      random_val <- runif(n = 1, max = min(private$max_seconds, expo_val))
      # perform `min()` on second step to avoid `runif(1, min = 5, max = 4)` which produces `NaN`
      max(private$min_seconds, random_val)
    }
  )
)


# Situations
# √ No future workers are busy. All future calls are `future_promises()`
#  * Can be accomplished using future job followup promise which calls `$attempt_work()`
# √ All future workers are busy with other tasks (but will become available).
#  * Require using delay
# √ While processing the first batch, existing future workers are taken over
#  * Require delay

# FIFO queue of workers
#' Future promise work queue
#'
#' #' `r lifecycle::badge('experimental')`
#'
#' An \pkg{R6} class to help with scheduling work to be completed. `WorkQueue` will only execute work if the `can_proceed()` returns `TRUE`. For the use case of `future`, `can_proceed()` defaults to `future::nbrOfFreeWorkers() > 0` which will not allow for work to be executed if a \pkg{future} worker is not available.
#'
#' `WorkQueue` will constantly try to start new work once prior work item finishes.  However, if `can_proceed()` returns `FALSE` (no future workers are available) and there is more work to be done, then work is attempted later a random amount of time later using exponential backoff.  The exponential backoff will cap out at 10 seconds to prevent unnecessarily large wait times.
#'
#' Each time `WorkQueue` tries to start more work, it will repeat until `can_proceed()` returns `FALSE` or there is no more work in the `queue`.
#'
#' @section Global event loop:
#'
#' The global loop is used by default as the internal `WorkQueue` "delayed check" uses a single delay check for the whole queue, rather than having each item in the queue attempt to process.
#' This behavior might change in the future, but we are not exactly sure how at this point.
#'
#' If a private `later` loop wants to become synchronous by running until all jobs are completed but is waiting on a `future_promise()`, the private loop will not complete unless the global loop is allowed to move forward.
#'
#' However, it is possible to use a private loop inside a user-defined `WorkQueue` may work which can be provided directly to `future_promise(queue=custom_queue)`. Having a concrete example (or need) will help us understand the problem better. If you have an example, please reach out .
#'
#' @seealso [future_promise_queue()] which returns a `WorkQueue` which is cached per R session.
#' @keywords internal
WorkQueue <- R6::R6Class("WorkQueue",

  # TODO - private loop proposal:
  # The queued data would actually be a list of queues whose _key_ matches
  # the loop ID. This would require that `schedule_work()` take in `loop` and have each loop have its own queue.
  # The scheduled work in each queue would contain the `work` function and `submission_time`.
  # Once `can_proceed_fn()` returns TRUE, the queue with the earliest `submission_time` should be processed.
  # This concept is similar to a merge sort when trying to merge two pre-sorted lists.
  # Check time: O(k), k = number of later loops ever registered. (This could become big!)
  # Maybe, if `ID != 0`, the queue is removed if the number of elements goes to 0.
  # Check time: O(kk), kk <= k, kk = number of _active_ later loops.
  #
  # Thought process, let's say chromote used the global WorkQueue
  # and wanted to have its local loop synchronize (.. while(later::loop_empty(local_loop)) later::run_now(local_loop))  ..).
  # [ ] In `do_work()`, Get loop from `later::current_loop()`
  #   * If is global loop, get item with earliest submission time
  #   * If is private loop (ID != 0), get first item in private queue
  # [ ] Validate that an work item finishing in loop X, has a promise created using loop X
  # [ ] Would each private queue need its own delay check?
  #
  #
  # Implementation question:
  # * Would it be better if we have a larger WorkQueue class that managed many WorkQueues for each loop?
  #    * This would allow for each loop to have its own delay check, delay counter, and loop
  private = list(
    queue = "fastmap::fastqueue()",
    can_proceed_fn = "future_worker_is_free()",
    # only _really_ used for delay checking
    loop = "later::global_loop()",
    delay = "ExpoDelay$new()",

    # Used as a semaphore to make sure only 1 call of `attempt_work()` is delayed
    cancel_delayed_attempt_work = NULL,

    # Increment delay. Used with ExpoDelay
    increase_delay = function() {
      debug_msg("increase_delay()...", private$delay$count)
      # Increment delay and try again later
      private$delay$increase()
    },

    # Reset delay and cancel any delayed `attempt_work()` calls
    reset_delay = function() {
      debug_msg("reset_delay()")
      private$delay$reset()

      # disable any delayed executions of `attempt_work`
      if (is.function(private$cancel_delayed_attempt_work)) {
        private$cancel_delayed_attempt_work()
      }
      private$cancel_delayed_attempt_work <- NULL
    },

    # Returns a logical which let's work begin
    can_proceed = function() {
      isTRUE(private$can_proceed_fn())
    },

    # Function to attempt as much work as possible
    # If no workers are available and a queue has elements,
    #   If a delayed check has already been registered, Return
    #   Else, check again after some delay
    attempt_work = function(can_delay = FALSE) {
      debug_msg('attempt_work()')
      # If nothing to start, return early
      if (private$queue$size() == 0) return()

      # If we are not waiting on someone else, we can do work now
      while ((private$queue$size() > 0) && private$can_proceed()) {
        # Do work right away
        private$reset_delay()
        private$do_work()
      }

      # If there are still items to be processed, but we can not proceed...
      if (private$queue$size() > 0 && ! private$can_proceed()) {
        # If we are allowed to delay (default FALSE), or nothing is currently delaying
        if (can_delay || is.null(private$cancel_delayed_attempt_work)) {

          # Try again later
          private$increase_delay()
          private$cancel_delayed_attempt_work <-
            later::later(
              loop = private$loop,
              delay = private$delay$delay(),
              function() {
                private$attempt_work(can_delay = TRUE)
              }
            )
        }
      }
    },

    # Actually process an item in the queue
    do_work = function() {
      debug_msg("do_work()")

      # Get first item in queue
      work_fn <- private$queue$remove()

      # Safety check...
      # If nothing is returned, no work to be done. Return early
      if (!is.function(work_fn)) return()

      # Do scheduled work
      debug_msg("execute work")
      future_job <- work_fn()

      # Try to attempt work immediately after the future job has finished
      finally(future_job, function() {
        debug_msg("finished work. queue size: ", private$queue$size())
        private$attempt_work()
      })

      return()
    }
  ),
  public = list(
    #' @description Create a new `WorkQueue`
    #' @param can_proceed Function that should return a logical value. If `TRUE` is returned, then the next scheduled work will be executed. By default, this function checks if \code{\link[future:nbrOfWorkers]{future::nbrOfFreeWorkers()} > 0}
    #' @param queue Queue object to use to store the scheduled work. By default, this is a "First In, First Out" queue using [fastmap::fastqueue()]. If using your own queue, it should have the methods `$add(x)`, `$remove()`, `$size()`.
    #' @param loop \pkg{later} loop to use for calculating the next delayed check. Defaults to [later::global_loop()].
    initialize = function(
      # defaults to a future::plan agnostic function
      can_proceed = future_worker_is_free,
      queue = fastmap::fastqueue(), # FIFO
      loop = later::global_loop()
    ) {
      stopifnot(is.function(can_proceed))
      stopifnot(
        is.function(queue$add) &&
        is.function(queue$remove) &&
        is.function(queue$size)
      )
      stopifnot(inherits(loop, "event_loop"))
      delay <- ExpoDelay$new()
      stopifnot(inherits(delay, "Delay"))

      private$can_proceed_fn <- can_proceed
      private$queue <- queue
      private$loop <- loop
      private$delay <- delay

      # make sure delay is reset
      private$reset_delay()

      self
    },

    # add to schedule only
    #' Schedule work
    #' @param fn function to execute when `can_proceed()` returns `TRUE`.
    schedule_work = function(fn) {
      debug_msg("schedule_work()")
      stopifnot(is.function(fn))
      private$queue$add(fn)

      private$attempt_work()

      invisible(self)
    }
  )
)


#' @describeIn future_promise Default `future_promise()` work queue to use. This function returns a [WorkQueue] that is cached per R session.
#' @seealso [`WorkQueue`]
#' @export
future_promise_queue <- local({
  future_promise_queue_ <- NULL
  function() {
    if (is.null(future_promise_queue_)) {
      assert_work_queue_pkgs()
      future_promise_queue_ <<- WorkQueue$new()
    }
    future_promise_queue_
  }
})


#' \pkg{future} promise
#'
#' `r lifecycle::badge('experimental')`
#'
#' When submitting \pkg{future} work, \pkg{future} (by design) will block the main R session until a worker becomes available.
#' This occurs when there is more submitted \pkg{future} work than there are available \pkg{future} workers.
#' To counter this situation, we can create a promise to execute work using future (using `future_promise()`) and only begin the work if a \pkg{future} worker is available.
#'
#' Using `future_promise()` is recommended when ever a continuous runtime is used, such as with \pkg{plumber} or \pkg{shiny}.
#'
#' For more details and examples, please see the [`vignette("future_promise", "promises")`](https://rstudio.github.io/promises/articles/future_promise.html) vignette().
#' @describeIn future_promise Creates a [promise()] that will execute the `expr` using [future::future()].
#' @inheritParams future::future
#' @param expr An R expression. While the `expr` is eventually sent to [`future::future()`], please use the same precautions that you would use with regular `promises::promise()` expressions. `future_promise()` may have to hold the `expr` in a [promise()] while waiting for a \pkg{future} worker to become available.
#' @param ... extra parameters provided to [`future::future()`]
#' @param queue A queue that is used to schedule work to be done using [future::future()].  This queue defaults to [future_promise_queue()] and requires that method `queue$schedule_work(fn)` exist.  This method should take in a function that will execute the promised \pkg{future} work.
#' @return Unlike [future::future()`], `future_promise()` returns a [promise()] object that will eventually resolve the \pkg{future} `expr`.
#' @examples
#' \donttest{# Relative start time
#' start <- Sys.time()
#' # Helper to force two `future` workers
#' with_two_workers <- function(expr) {
#'   if (!require("future")) {
#'     message("`future` not installed")
#'     return()
#'   }
#'   old_plan <- future::plan(future::multisession(workers = 2))
#'   on.exit({future::plan(old_plan)}, add = TRUE)
#'   start <<- Sys.time()
#'   force(expr)
#'   while(!later::loop_empty()) {Sys.sleep(0.1); later::run_now()}
#'   invisible()
#' }
#' # Print a status message. Ex: `"PID: XXX; 2.5s promise done"`
#' print_msg <- function(pid, msg) {
#'   message(
#'     "PID: ", pid, "; ",
#'     round(difftime(Sys.time(), start, units = "secs"), digits = 1), "s " ,
#'     msg
#'   )
#' }
#'
#' # `"promise done"` will appear after four workers are done and the main R session is not blocked
#' # The important thing to note is the first four times will be roughly the same
#' with_two_workers({
#'   promise_resolve(Sys.getpid()) %...>% print_msg("promise done")
#'   for (i in 1:6) future::future({Sys.sleep(1); Sys.getpid()}) %...>% print_msg("future done")
#' })
#' {
#' #> PID: XXX; 2.5s promise done
#' #> PID: YYY; 2.6s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 2.6s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 3.4s future done
#' #> PID: ZZZ; 3.6s future done
#' }
#'
#' # `"promise done"` will almost immediately, before any workers have completed
#' # The first two `"future done"` comments appear earlier the example above
#' with_two_workers({
#'   promise_resolve(Sys.getpid()) %...>% print_msg("promise")
#'   for (i in 1:6) future_promise({Sys.sleep(1); Sys.getpid()}) %...>% print_msg("future done")
#' })
#' {
#' #> PID: XXX; 0.2s promise done
#' #> PID: YYY; 1.3s future done
#' #> PID: ZZZ; 1.4s future done
#' #> PID: YYY; 2.5s future done
#' #> PID: ZZZ; 2.6s future done
#' #> PID: YYY; 3.4s future done
#' #> PID: ZZZ; 3.6s future done
#' }}
#' @export
future_promise <- function(
  expr = NULL,
  envir = parent.frame(),
  substitute = TRUE,
  globals = TRUE,
  packages = NULL,
  ...,
  queue = future_promise_queue()
) {

  # make sure queue is the right structure
  stopifnot(is.function(queue$schedule_work) && length(formals(queue$schedule_work)) >= 1)

  if (substitute) expr <- substitute(expr)

  # Force all variables to curb values changing before execution
  # Does NOT fix R environment values changing
  force(envir)
  force(substitute)
  force(globals)
  force(packages)
  force(list(...))

  ## Record globals
  gp <- future::getGlobalsAndPackages(expr, envir = envir, globals = globals)
  force(gp)

  promise(function(resolve, reject) {
    # add to queue
    queue$schedule_work(function() {
      ### Should the worker function be taken at creation time or submission time?
      ## The current implementation has `$can_proceed()` method of `WorkQueue` be plan agnostic.
      ## Therefore, it will always ask the current plan if a worker is available.
      ## If so, then the _current_ plan should be used. Not a plan that existed at initialization time.
      exec_future <- future::plan()

      # execute the future and return a promise so the schedule knows exactly when it is done
      future_job <- exec_future(
        gp$expr,
        envir = envir,
        substitute = FALSE,
        globals = gp$globals,
        packages = unique(c(packages, gp$packages)),
        ...
      )

      # Resolve the outer promising job value
      resolve(future_job)
      # Return a promising object that can be chained by the `queue` after executing this _work_
      future_job
    })
  })
}



if (FALSE) {

  # ConstDelay <- R6::R6Class("ConstDelay",
  #   inherit = Delay,
  #   private = list(
  #     const = 0.1,
  #     random = TRUE
  #   ),
  #   public = list(
  #     initialize = function(const = 0.1, random = TRUE) {
  #       stopifnot(length(const) == 1 && is.numeric(const) && const >= 0)
  #       private$const <- const
  #       private$random <- isTRUE(random)

  #       self
  #     },
  #     delay = function() {
  #       if (private$random) {
  #         runif(n = 1, max = private$const)
  #       } else {
  #         private$const
  #       }
  #     }
  #   )
  # )
  # LinearDelay <- R6::R6Class("LinearDelay",
  #   inherit = Delay,
  #   private = list(
  #     delta = 0.05,
  #     random = TRUE
  #   ),
  #   public = list(
  #     initialize = function(delta = 0.03, random = TRUE) {
  #       stopifnot(length(delta) == 1 && is.numeric(delta) && delta >= 0)
  #       private$delta <- delta
  #       private$random <- isTRUE(random)

  #       self
  #     },

  #     delay = function() {
  #       delta_delay <- private$delay_count * private$delta
  #       if (private$random) {
  #         runif(n = 1, max = delta_delay)
  #       } else {
  #         delta_delay
  #       }
  #     }
  #   )
  # )


  # dev_load <- pkgload::load_all

  # ## test
  # dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) };

  # ## block workers mid job
  # dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:2, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i) }) -> ignore;

  # ## block main worker mid job
  # dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:4, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i/4) }) -> ignore;


  # ## block workers pre job
  # dev_load(); print_i(); lapply(1:2, function(i) { message("*************** adding blockage", i); future::future({ Sys.sleep(4); message("*************** blockage done", i); i}) }) -> future_jobs; lapply(future_jobs, function(fj) { as.promise(fj) %...>% { print(.) } }); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) };

  # ## block main worker workers pre job
  # dev_load(); print_i(); start <- Sys.time(); promise_all(.list = lapply(1:10, function(x) { future_promise({ Sys.sleep(1); print(paste0(x)) })})) %...>% { print(Sys.time() - start) }; lapply(1:4, function(i) { later::later(function() { message("*************** adding blockage", i); fj <- future::future({ Sys.sleep(4); message("*************** blockage done", i); i}); then(fj, function(x) { print(paste0("block - ", i))}); }, delay = 0.5 + i/4) }) -> ignore;

  future::plan(future::multisession(workers = 2))

  debug_msg_can_print <- TRUE

  print_i <- function(i = 0) { if (i <= 50) { print(i); later::later(function() { print_i(i + 1) }, delay = 0.1) } }

  slow_calc <- function(n) {
    Sys.sleep(n)
    "slow!"
  }
  n <- 2
  prom <- future_promise
  # prom <- future::future
  a1 <- prom({
    print(paste0("start 1 - ", Sys.time()))
    print(slow_calc(n))
  })
  a2 <- prom({
    print(paste0("start 2 - ", Sys.time()))
    print(slow_calc(n))
  })
  a3 <- prom({
    print(paste0("start 3 - ", Sys.time()))
    print(slow_calc(n))
  })
  a4 <- prom({
    print(paste0("start 4 - ", Sys.time()))
    print(slow_calc(n))
  })

  print("done assignement!")

  a1 %...>% { message("end 1 - ", format(Sys.time())) }
  a2 %...>% { message("end 2 - ", format(Sys.time())) }
  a3 %...>% { message("end 3 - ", format(Sys.time())) }
  a4 %...>% { message("end 4 - ", format(Sys.time())) }

}