File: scheduling.py

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (128 lines) | stat: -rw-r--r-- 3,290 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from __future__ import annotations

from random import randint
from time import time

import matplotlib.pyplot as plt

import dask
from dask import local, multiprocessing, threaded


def noop(x):
    pass


nrepetitions = 1


def trivial(width, height):
    """Embarrassingly parallel dask"""
    d = {("x", 0, i): i for i in range(width)}
    for j in range(1, height):
        d.update({("x", j, i): (noop, ("x", j - 1, i)) for i in range(width)})
    return d, [("x", height - 1, i) for i in range(width)]


def crosstalk(width, height, connections):
    """Natural looking dask with some inter-connections"""
    d = {("x", 0, i): i for i in range(width)}
    for j in range(1, height):
        d.update(
            {
                ("x", j, i): (
                    noop,
                    [("x", j - 1, randint(0, width)) for _ in range(connections)],
                )
                for i in range(width)
            }
        )
    return d, [("x", height - 1, i) for i in range(width)]


def dense(width, height):
    """Full barriers between each step"""
    d = {("x", 0, i): i for i in range(width)}
    for j in range(1, height):
        d.update(
            {
                ("x", j, i): (noop, [("x", j - 1, k) for k in range(width)])
                for i in range(width)
            }
        )
    return d, [("x", height - 1, i) for i in range(width)]


import numpy as np

x = np.logspace(0, 4, 10)
trivial_results = dict()
for get in (dask.get, threaded.get, local.get_sync, multiprocessing.get):
    y = list()
    for n in x:
        dsk, keys = trivial(int(n), 5)
        start = time()
        get(dsk, keys)
        end = time()
        y.append(end - start)
    trivial_results[get] = np.array(y)


########
# Plot #
########

f, (left, right) = plt.subplots(
    nrows=1, ncols=2, sharex=True, figsize=(12, 5), squeeze=True
)

for get in trivial_results:
    left.loglog(x * 5, trivial_results[get], label=get.__module__)
    right.loglog(x * 5, trivial_results[get] / x, label=get.__module__)

left.set_title("Cost for Entire graph")
right.set_title("Cost per task")
left.set_ylabel("Duration (s)")
right.set_ylabel("Duration (s)")
left.set_xlabel("Number of tasks")
right.set_xlabel("Number of tasks")

plt.legend()
plt.savefig("images/scaling-nodes.png")

#####################
# Crosstalk example #
#####################

x = np.linspace(1, 100, 10)
crosstalk_results = dict()
for get in [threaded.get, local.get_sync]:  # type: ignore[assignment]
    y = list()
    for n in x:
        dsk, keys = crosstalk(1000, 5, int(n))
        start = time()
        get(dsk, keys)
        end = time()
        y.append(end - start)
    crosstalk_results[get] = np.array(y)

########
# Plot #
########

f, (left, right) = plt.subplots(
    nrows=1, ncols=2, sharex=True, figsize=(12, 5), squeeze=True
)

for get in crosstalk_results:
    left.plot(x, crosstalk_results[get], label=get.__module__)
    right.semilogy(x, crosstalk_results[get] / 5000.0 / x, label=get.__module__)

left.set_title("Cost for Entire graph")
right.set_title("Cost per edge")
left.set_ylabel("Duration (s)")
right.set_ylabel("Duration (s)")
left.set_xlabel("Number of edges per task")
right.set_xlabel("Number of edges per task")
plt.legend()
plt.savefig("images/scaling-edges.png")