File: TransientMulticoreParam-class.R

package info (click to toggle)
r-bioc-biocparallel 1.40.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,768 kB
  • sloc: cpp: 139; sh: 14; makefile: 8
file content (123 lines) | stat: -rw-r--r-- 2,739 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
.TransientMulticoreParam <-
    setRefClass(
        "TransientMulticoreParam",
        contains = "MulticoreParam"
    )

TransientMulticoreParam <-
    function(param)
{
    param <- as(param, "TransientMulticoreParam")
    bpstart(param)
}

.TRANSIENTMULTICOREPARAM_JOBNODE <- new.env(parent=emptyenv())
.TRANSIENTMULTICOREPARAM_RESULT <- new.env(parent=emptyenv())

setMethod(
    "bpstart", "TransientMulticoreParam",
    function(x, ...)
{
    parallel::mccollect(wait=TRUE)

    rm(
        list=ls(envir = .TRANSIENTMULTICOREPARAM_JOBNODE),
        envir = .TRANSIENTMULTICOREPARAM_JOBNODE
    )
    rm(
        list = ls(envir = .TRANSIENTMULTICOREPARAM_RESULT),
        envir = .TRANSIENTMULTICOREPARAM_RESULT
    )
    .bpstart_impl(x)
})

setMethod(
    "bpstop", "TransientMulticoreParam",
    function(x)
{
    .bpstop_impl(x)
})

setMethod(
    "bpbackend", "TransientMulticoreParam",
    function(x)
{
    x
})

setMethod(
    "length", "TransientMulticoreParam",
    function(x)
{
    bpnworkers(x)
})

##
## send / recv
##

setMethod(
    ".recv_all", "TransientMulticoreParam",
    function(backend)
{
    replicate(length(backend), .recv_any(backend), simplify=FALSE)
})

setMethod(
    ".send_to", "TransientMulticoreParam",
    function(backend, node, value)
{
    if (value$type == "EXEC") {
        job <- parallel::mcparallel(.bpworker_EXEC(value))
        id <- as.character(job$pid)
        .TRANSIENTMULTICOREPARAM_JOBNODE[[id]] <- node
    }
    TRUE
})

setMethod(
    ".recv_any", "TransientMulticoreParam",
    function(backend)
{
    .BUFF <- .TRANSIENTMULTICOREPARAM_RESULT # alias
    tryCatch({
        while (!length(.BUFF)) {
            result <- parallel::mccollect(wait = FALSE, timeout = 1)
            for (id in names(result))
                .BUFF[[id]] <- result[[id]]
        }
        id <- head(names(.BUFF), 1L)

        value <- .BUFF[[id]]
        rm(list = id, envir = .BUFF)
        node <- .TRANSIENTMULTICOREPARAM_JOBNODE[[id]]
        rm(list = id, envir = .TRANSIENTMULTICOREPARAM_JOBNODE)
        list(node = node, value = value)
    }, error  = function(e) {
        ## indicate error, but do not stop
        .error_worker_comm(e, "'.recv_any()' data failed")
    })
})

setMethod(
    ".send", "TransientMulticoreParam",
    function(worker, value)
{
    stop("'.send,TransientMulticoreParam-method' not implemented")
})

setMethod(
    ".recv", "TransientMulticoreParam",
    function(worker)
{
    stop("'.recv,TransientMulticoreParam-method' not implemented")
})

setMethod(
    ".close", "TransientMulticoreParam",
    function(worker)
{
    stop("'.close,TransientMulticoreParam-method' not implemented")
})

setMethod(".manager", "TransientMulticoreParam", .manager_ANY)