1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
from __future__ import annotations
import asyncio
import operator
from distributed import wait
from distributed.diagnostics import GraphLayout
from distributed.utils_test import gen_cluster, inc
@gen_cluster(client=True)
async def test_basic(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
futures = c.map(inc, range(5))
total = c.submit(sum, futures)
await total
assert len(gl.x) == len(gl.y) == 6
assert all(gl.x[f.key] == 0 for f in futures)
assert gl.x[total.key] == 1
assert min(gl.y.values()) < gl.y[total.key] < max(gl.y.values())
@gen_cluster(client=True)
async def test_construct_after_call(c, s, a, b):
futures = c.map(inc, range(5))
total = c.submit(sum, futures)
await total
gl = GraphLayout(s)
s.add_plugin(gl)
assert len(gl.x) == len(gl.y) == 6
assert all(gl.x[f.key] == 0 for f in futures)
assert gl.x[total.key] == 1
assert min(gl.y.values()) < gl.y[total.key] < max(gl.y.values())
@gen_cluster(client=True)
async def test_states(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
await c.submit(sum, c.map(inc, range(5)))
while True:
updates = {state for _, state in gl.state_updates}
if updates == {"waiting", "processing", "memory", "released"}:
break
await asyncio.sleep(0.01)
@gen_cluster(client=True)
async def test_release_tasks(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
futures = c.map(inc, range(5))
total = c.submit(sum, futures)
await total
key = total.key
del total
while key in s.tasks:
await asyncio.sleep(0.01)
assert len(gl.visible_updates) == 1
assert len(gl.visible_edge_updates) == 5
@gen_cluster(client=True)
async def test_forget(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
futures = c.map(inc, range(10))
futures = c.map(inc, futures)
await wait(futures)
del futures
while s.tasks:
await asyncio.sleep(0.01)
assert not gl.x
assert not gl.y
assert not gl.index
assert not gl.index_edge
assert not gl.collision
@gen_cluster(client=True)
async def test_unique_positions(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
x = c.submit(inc, 1)
ys = [c.submit(operator.add, x, i) for i in range(5)]
await wait(ys)
y_positions = [(gl.x[k], gl.y[k]) for k in gl.x]
assert len(y_positions) == len(set(y_positions))
|