File: bpstart-methods.R

package info (click to toggle)
r-bioc-biocparallel 1.40.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,768 kB
  • sloc: cpp: 139; sh: 14; makefile: 8
file content (94 lines) | stat: -rw-r--r-- 2,474 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
### =========================================================================
### ClusterManager object: ensures started clusters are stopped
### -------------------------------------------------------------------------

.ClusterManager <- local({
    ## package-global registry of backends; use to avoid closing
    ## socket connections of unreferenced backends during garbage
    ## collection -- bpstart(MulticoreParam(1)); gc(); gc()
    uid <- 0
    env <- environment()
    list(add = function(cluster) {
        uid <<- uid + 1L
        cuid <- as.character(uid)
        env[[cuid]] <- cluster          # protection
        cuid
    }, drop = function(cuid) {
        if (length(cuid) && cuid %in% names(env))
            rm(list=cuid, envir=env)
        invisible(NULL)
    }, get = function(cuid) {
        env[[cuid]]
    }, ls = function() {
        cuid <- setdiff(ls(env), c("uid", "env"))
        cuid[order(as.integer(cuid))]
    })
})

### =========================================================================
### bpstart() methods
### -------------------------------------------------------------------------

setMethod("bpstart", "ANY", function(x, ...) invisible(x))

setMethod("bpstart", "missing",
    function(x, ...)
{
    x <- registered()[[1]]
    bpstart(x)
})

##
## .bpstart_impl: common functionality after bpisup()
##

.bpstart_error_handler <-
    function(x, response, id)
{
    value <- lapply(response, function(elt) elt[["value"]][["value"]])
    if (!all(bpok(value))) {
        on.exit(try(bpstop(x)))
        stop(
            "\nbpstart() ", id, " error:\n",
            conditionMessage(.error_bplist(value))
        )
    }
}

.bpstart_set_rng_stream <-
    function(x)
{
    ## initialize the random number stream; increment the stream only
    ## in bpstart_impl
    .RNGstream(x) <- .rng_init_stream(bpRNGseed(x))

    invisible(.RNGstream(x))
}


.bpstart_set_finalizer <-
    function(x)
{
    if (length(x$.uid) == 0L) {
        finalizer_env <- as.environment(list(self=x$.self))
        reg.finalizer(
            finalizer_env, function(e) bpstop(e[["self"]]), onexit=TRUE
        )
        x$.finalizer_env <- finalizer_env
    }
    x$.uid <- .ClusterManager$add(bpbackend(x))

    invisible(x)
}

.bpstart_impl <-
    function(x)
{
    ## common actions once bpisup(backend)

    ## initialize the random number stream
    .bpstart_set_rng_stream(x)

    ## clean up when x left open
    .bpstart_set_finalizer(x)
}