File: SnowParam-utils.R

package info (click to toggle)
r-bioc-biocparallel 1.40.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,768 kB
  • sloc: cpp: 139; sh: 14; makefile: 8
file content (117 lines) | stat: -rw-r--r-- 3,089 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
.connect_timeout <-
    function()
{
    timeout <- getOption("timeout")
    timeout_is_valid <-
        length(timeout) == 1L && !is.na(timeout) &&
        timeout > 0L
    if (!timeout_is_valid)
        stop("'getOption(\"timeout\")' must be positive integer(1)")
    timeout
}


### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### snow::MPI
###

bprunMPIworker <- function() {
    comm <- 1
    intercomm <- 2
    Rmpi::mpi.comm.get.parent(intercomm)
    Rmpi::mpi.intercomm.merge(intercomm,1,comm)
    Rmpi::mpi.comm.set.errhandler(comm)
    Rmpi::mpi.comm.disconnect(intercomm)

    .bpworker_impl(snow::makeMPImaster(comm))

    Rmpi::mpi.comm.disconnect(comm)
    Rmpi::mpi.quit()
}

### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### parallel::FORK
###

.bpfork <- function (nnodes, host, port)
{
    nnodes <- as.integer(nnodes)
    if (is.na(nnodes) || nnodes < 1L)
        stop("'nnodes' must be >= 1")

    if (length(host) != 1L || is.na(host) || !is.character(host))
        stop("'host' must be character(1)")
    if (length(port) != 1L || is.na(port) || !is.integer(port))
        stop("'port' must be integer(1)")

    connect_timeout <- .connect_timeout()
    idle_timeout <- IDLE_TIMEOUT

    cl <- vector("list", nnodes)
    for (rank in seq_along(cl)) {
        .bpforkChild(host, port, rank, connect_timeout, idle_timeout)
        cl[[rank]] <- .bpforkConnect(
            host, port, rank, connect_timeout, idle_timeout
        )
    }

    class(cl) <- c("SOCKcluster", "cluster")
    cl
}

.bpforkChild <-
    function(host, port, rank, connect_timeout, idle_timeout)
{
    parallel::mcparallel({
        con <- NULL
        suppressWarnings({
            while (is.null(con)) {
                con <- tryCatch({
                    socketConnection(
                        host, port, FALSE, TRUE, "a+b",
                        timeout = connect_timeout
                    )
                }, error=function(e) {})
            }
            socketTimeout(con, idle_timeout)
        })
        node <- structure(list(con = con), class = "SOCK0node")
        .bpworker_impl(node)
    }, detached=TRUE)
}

.bpforkConnect <-
    function(host, port, rank, connect_timeout, idle_timeout)
{
    idle_timeout <- IDLE_TIMEOUT
    con <- socketConnection(
        host, port, TRUE, TRUE, "a+b", timeout = connect_timeout
    )
    socketTimeout(con, idle_timeout)
    structure(list(con = con, host = host, rank = rank),
              class = c("forknode", "SOCK0node"))
}

### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### EXEC command cache
###

## read/write the static value
.load_task_static <-
    function(value)
{
    static_data <- .task_const(value)
    if (is.null(static_data)) {
        static_data <- options("BIOCPARALLEL_SNOW_STATIC")[[1]]
        .task_remake(value, static_data)
    } else {
        options(BIOCPARALLEL_SNOW_STATIC = static_data)
        value
    }
}

.clean_task_static <-
    function()
{
    options(BIOCPARALLEL_SNOW_STATIC = NULL)
}