File: future-3-topologies.md.rsp

package info (click to toggle)
r-cran-future 1.11.1.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,380 kB
  • sloc: sh: 14; makefile: 2
file content (228 lines) | stat: -rw-r--r-- 9,930 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
<%@meta language="R-vignette" content="--------------------------------
%\VignetteIndexEntry{A Future for R: Future Topologies}
%\VignetteAuthor{Henrik Bengtsson}
%\VignetteKeyword{R}
%\VignetteKeyword{package}
%\VignetteKeyword{vignette}
%\VignetteKeyword{future}
%\VignetteKeyword{promise}
%\VignetteEngine{R.rsp::rsp}
%\VignetteTangle{FALSE}
--------------------------------------------------------------------"%>
<%
library("R.utils")
`%<-%` <- future::`%<-%`
options("withCapture/newline"=FALSE)
%>

# <%@meta name="title"%>

Futures can be nested in R such that one future creates another set of futures and so on.  This may, for instance, occur within nested for loops, e.g.
```r
library("future")
library("listenv")
x <- listenv()
for (ii in 1:3) {
  x[[ii]] %<-% {
    y <- listenv()
    for (jj in 1:3) {
      y[[jj]] %<-% { ii + jj / 10 }
    }
    y
  }
}
unlist(x)
## [1] 1.1 1.2 1.3 2.1 2.2 2.3 3.1 3.2 3.3
```
The default is to use synchronous futures unless otherwise specified, which is also true for nested futures.  If we for instance specify, `plan(multiprocess)`, the first layer of futures (`x[[ii]] %<-% { expr }`) will be processed asynchronously in background R processes, and the futures in the second layer of futures (`y[[jj]] %<-% { expr }`) will be processed synchronously in the separate background R processes.  If we wish to be explicit about this, we can specify `plan(list(multiprocess, sequential))`.


## Example: High-Throughput Sequencing
Consider a high-throughput sequencing (HT-Seq) project with 50 human DNA samples where we have one FASTQ file per sample containing the raw sequence reads as they come out of the sequencing machine.  With this data, we wish to align each FASTQ to a reference genome such that we generate 24 individual BAM files per sample - one per chromosome.

Here is the layout of what such an analysis could look like in R using futures.
```r
library("future")
library("listenv")
htseq_align <- function(fq, chr) { chr }

fqs <- dir(pattern = "[.]fastq$")

bams <- listenv()
for (ss in seq_along(fqs)) {
  fq <- fqs[ss]
  bams[[ss]] %<-% {
    bams_ss <- listenv()
    for (cc in 1:24) {
      bams_ss[[cc]] %<-% htseq_align(fq, chr = cc)
    }
    as.list(bams_ss)
  }
}
bams <- as.list(bams)
```

The default is to use synchronous futures, so without further specifications, the above will process each sample and each chromosome sequentially.  Next, we will consider what can be done with the following two computer setups:

* A single machine with 8 cores
* A compute cluster with 3 machines each with 16 cores

### One multi-core machine
With a single machine of 8 cores, we could choose to process multiple samples at the same time while processing chromosomes sequentially.  In other words, we would like to evaluate the outer layer of futures using multiprocess futures and the inner ones as sequential futures.  This can be specified as:
```r
plan(list(multiprocess, sequential))
```
The internals for processing multiprocess future queries `availableCores()` to infer how many cores can be used simultaneously, so there is no need to explicitly specify that there are 8 cores available.

_Comment_: Since synchronous is the default future, we could skip trailing sequential futures in the setup, e.g. `plan(list(multiprocess))` or just `plan(multiprocess)`.  However, it does not hurt to be explicit.

If we instead would like to process the sample sequentially and the chromosomes in parallel, we can use:
```r
plan(list(sequential, multiprocess))
```

#### Built-in protection against recursive parallelism

Above we have processed either the outer or the inner set of future in parallel.  What if we want to process both layers in parallel?  It's tempting to use:
```r
plan(list(multiprocess, multiprocess))
```
Although this does not give an error, we will find that the inner layer of futures will be processed sequentially just as if we would use `plan(list(multiprocess, sequential))`.  This behavior is due to the built-in protection against nested parallelism.  If both layers would run in parallel, each using the 8 cores available on the machine, we would be running 8 * 8 = 64 parallel processes - that would for sure overload our computer.  What happens internally is that for the outer layer, `availableCores()` equals eight (8), whereas for the inner layer it equals one (1).

Now, we could imagine that we process the outer layer with, say, two parallel futures, and then the inner layer with four parallel futures.  In that case, we would end up running on at most eight cores (= 2 * 4).  This can be achieved by forcing a fixed number of workers at each layer (not recommended):
```r
plan(list(tweak(multiprocess, workers = 2), tweak(multiprocess, workers = 4)))
```



### An ad-hoc compute cluster
With a compute cluster of 3 machines each with 16 cores, we can run up to 48 alignment processes in parallel.  A natural setup is to have one machine process one sample in parallel.  We could specify this as:
```r
nodes <- c("n1", "n2", "n3")
plan(list(tweak(cluster, workers = nodes), multiprocess))
```
_Comment:_ Multiprocess futures are agile to its environment, that is, they will query the machine they are running on to find out how many parallel processes it can run at the same time.

One possible downside to the above setup is that we might not utilize all available cores all the time.  This is because the alignment of the shorter chromosomes will finish sooner than the longer ones, which means that we might at the end of each sample have only a few alignment processes running on each machine leaving the remaining cores idle/unused.  An alternative set up is then to use the following setup:
```r
nodes <- rep(c("n1", "n2", "n3"), each = 8)
plan(list(tweak(cluster, workers = nodes), multiprocess))
```
This will cause up to 24 (= 3*8) samples to be processed in parallel each processing two chromosomes at the same time.


## Example: A Remote Compute Cluster
Imagine we have access to a remote compute cluster, with login node `remote.server.org`, and that the cluster has three nodes `n1`, `n2`, and `n3`.  Also, let us assume we have already set up the cluster such that we can log in via public key authentication via SSH, i.e. when we do `ssh remote.server.org` authentication is done automatically.

With the above setup, we can use nested futures in our local R session to evaluate R expression on the remote compute cluster and its three nodes.  Here is a proof of concept illustrating how the different nested futures are evaluated on different machines.

```r
library("future")
library("listenv")

## Set up access to remote login node
login <- tweak(remote, workers = "remote.server.org")
plan(login)

## Set up cluster nodes on login node
nodes %<-% { .keepme <- parallel::makeCluster(c("n1", "n2", "n3")) }

## Specify future topology
## login node -> { cluster nodes } -> { multiple cores }
plan(list(
  login,
  tweak(cluster, workers = nodes),
  multiprocess
))


## (a) This will be evaluated on the cluster login computer
x %<-% {
  thost <- Sys.info()[["nodename"]]
  tpid <- Sys.getpid()
  y <- listenv()
  for (task in 1:4) {
    ## (b) This will be evaluated on a compute node on the cluster
    y[[task]] %<-% {
      mhost <- Sys.info()[["nodename"]]
      mpid <- Sys.getpid()
      z <- listenv()
      for (jj in 1:2) {
        ## (c) These will be evaluated in separate processes on the same compute node
        z[[jj]] %<-% data.frame(task = task,
		                        top.host = thost, top.pid = tpid,
                                mid.host = mhost, mid.pid = mpid,
                                host = Sys.info()[["nodename"]],
								pid = Sys.getpid())
      }
      Reduce(rbind, z)
    }
  }
  Reduce(rbind, y)
}

print(x)
##   task top.host top.pid mid.host mid.pid host    pid
## 1    1    login  391547       n1  391878   n1 393943
## 2    1    login  391547       n1  391878   n1 393951
## 3    2    login  391547       n2  392204   n2 393971
## 4    2    login  391547       n2  392204   n2 393978
## 5    3    login  391547       n3  392527   n3 394040
## 6    3    login  391547       n3  392527   n3 394048
## 7    4    login  391547       n1  391878   n1 393959
## 8    4    login  391547       n1  391878   n1 393966
```

Try the above `x %<-% { ... }` future with, say, `plan(list(sequential, multiprocess))` and see what the output will be.



## Example: Adjust the Number of Workers for Each Cluster Node

When using
```r
nodes <- c("n1", "n2", "n3")
plan(list(tweak(cluster, workers = nodes), multiprocess))
```
the number of workers used on each of the nodes (`n1`, `n2`, and `n3`) is
given by the value of `availableCores()` on each of those nodes.  In turn,
`availableCores()` typically defaults to the number of cores on those nodes.
Now, imagine you want to use only 50% of these cores.  This can be done by
tweaking the `multiprocess` plan by passing a function to `workers`;
```r
halfCores <- function() { max(1, round(0.5 * availableCores()))
plan(list(
  tweak(cluster, workers = nodes),
  tweak(multiprocess, workers = halfCores)
))
```
With this, each node will use at most 50% of the cores available.
For instance, if `n1` and `n2` have eight cores, and `n3` has 32 cores,
then they nodes will use four, four, and 16 cores, respectively.

Another example is:
```r
customWorkers <- function() {
  switch(Sys.info()[["nodename"]],
    "n1" = 2L,
    "n2" = 3L,
    ## default:
    availableCores()
  )
}
plan(list(
  tweak(cluster, workers = nodes),
  tweak(multiprocess, workers = customWorkers)
))
```
In this case, node `n1` will always use two cores, `n2` three cores,
and `n3` will respect what `availableCores()` returns.


[listenv]: https://cran.r-project.org/package=listenv
[globals]: https://cran.r-project.org/package=globals
[Futures in R: Common issues with solutions]: future-issues.html

---
Copyright Henrik Bengtsson, 2015-2019