1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Random
pids = addprocs_with_testenv(4; topology="master_worker")
let p1 = pids[1], p2 = pids[2]
@test_throws RemoteException remotecall_fetch(()->remotecall_fetch(myid, p2), p1)
end
function test_worker_counts()
# check if the nprocs/nworkers/workers are the same on the remaining workers
np=nprocs()
nw=nworkers()
ws=sort(workers())
for p in workers()
@test (true, true, true) == remotecall_fetch(p, np, nw, ws) do x,y,z
(x==nprocs(), y==nworkers(), z==sort(workers()))
end
end
end
function remove_workers_and_test()
while nworkers() > 0
rmprocs(workers()[1])
test_worker_counts()
if nworkers() == nprocs()
break
end
end
end
remove_workers_and_test()
# connect even pids to other even pids, odd to odd.
mutable struct TopoTestManager <: ClusterManager
np::Integer
end
function launch(manager::TopoTestManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
cmd = `$exename $exeflags --bind-to $(Distributed.LPROC.bind_addr) --worker`
cmd = pipeline(detach(setenv(cmd, dir=dir)))
for i in 1:manager.np
io = open(cmd, "r+")
Distributed.write_cookie(io)
wconfig = WorkerConfig()
wconfig.process = io
wconfig.io = io.out
wconfig.ident = i
wconfig.connect_idents = Vector(i+2:2:manager.np)
push!(launched, wconfig)
end
notify(c)
end
const map_pid_ident=Dict()
function manage(manager::TopoTestManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :register
map_pid_ident[id] = config.ident
elseif op == :interrupt
kill(config.process, 2)
end
end
addprocs_with_testenv(TopoTestManager(8); topology="custom")
while true
if any(x->get(map_pid_ident, x, 0)==0, workers())
yield()
else
break
end
end
let p1, p2
for p1 in workers()
for p2 in workers()
i1 = map_pid_ident[p1]
i2 = map_pid_ident[p2]
if (iseven(i1) && iseven(i2)) || (isodd(i1) && isodd(i2))
@test p2 == remotecall_fetch(p->remotecall_fetch(myid, p), p1, p2)
else
@test_throws RemoteException remotecall_fetch(p->remotecall_fetch(myid, p), p1, p2)
end
end
end
end
remove_workers_and_test()
# test `lazy` connection setup
function def_count_conn()
@everywhere function count_connected_workers()
count(x -> isa(x, Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream),
Distributed.PGRP.workers)
end
end
addprocs_with_testenv(8)
def_count_conn()
# Test for 10 random combinations
wl = workers()
combinations = []
while length(combinations) < 10
from = rand(wl)
to = rand(wl)
if from == to || ((from,to) in combinations) || ((to,from) in combinations)
continue
else
push!(combinations, (from,to))
end
end
# Initially only master-worker connections ought to be setup
expected_num_conns = 8
let num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
@test num_conns == expected_num_conns
end
for (i, (from,to)) in enumerate(combinations)
remotecall_wait(topid->remotecall_fetch(myid, topid), from, to)
global expected_num_conns += 2 # one connection endpoint on both from and to
let num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
@test num_conns == expected_num_conns
end
end
# With lazy=false, all connections ought to be setup during `addprocs`
rmprocs(workers())
addprocs_with_testenv(8; lazy=false)
def_count_conn()
@test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64
# Cannot add more workers with a different `lazy` value
@test_throws ArgumentError addprocs_with_testenv(1; lazy=true)
|