1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
from toolz import merge
from time import time
import dask
from dask import threaded, multiprocessing, local
from dask.compatibility import Iterator
from random import randint
import matplotlib.pyplot as plt
def noop(x):
pass
nrepetitions = 1
def trivial(width, height):
""" Embarrassingly parallel dask """
d = {('x', 0, i): i for i in range(width)}
for j in range(1, height):
d.update({('x', j, i): (noop, ('x', j - 1, i))
for i in range(width)})
return d, [('x', height - 1, i) for i in range(width)]
def crosstalk(width, height, connections):
""" Natural looking dask with some inter-connections """
d = {('x', 0, i): i for i in range(width)}
for j in range(1, height):
d.update({('x', j, i): (noop, [('x', j - 1, randint(0, width))
for _ in range(connections)])
for i in range(width)})
return d, [('x', height - 1, i) for i in range(width)]
def dense(width, height):
""" Full barriers between each step """
d = {('x', 0, i): i for i in range(width)}
for j in range(1, height):
d.update({('x', j, i): (noop, [('x', j - 1, k)
for k in range(width)])
for i in range(width)})
return d, [('x', height - 1, i) for i in range(width)]
import numpy as np
x = np.logspace(0, 4, 10)
trivial_results = dict()
for get in [dask.get, threaded.get, local.get_sync, multiprocessing.get]:
y = list()
for n in x:
dsk, keys = trivial(int(n), 5)
start = time()
get(dsk, keys)
end = time()
y.append(end - start)
trivial_results[get] = np.array(y)
########
# Plot #
########
f, (left, right) = plt.subplots(nrows=1, ncols=2, sharex=True, figsize=(12, 5), squeeze=True)
for get in trivial_results:
left.loglog(x * 5, trivial_results[get], label=get.__module__)
right.loglog(x * 5, trivial_results[get] / x, label=get.__module__)
left.set_title('Cost for Entire graph')
right.set_title('Cost per task')
left.set_ylabel('Duration (s)')
right.set_ylabel('Duration (s)')
left.set_xlabel('Number of tasks')
right.set_xlabel('Number of tasks')
plt.legend()
plt.savefig('images/scaling-nodes.png')
#####################
# Crosstalk example #
#####################
x = np.linspace(1, 100, 10)
crosstalk_results = dict()
for get in [threaded.get, local.get_sync]:
y = list()
for n in x:
dsk, keys = crosstalk(1000, 5, int(n))
start = time()
get(dsk, keys)
end = time()
y.append(end - start)
crosstalk_results[get] = np.array(y)
########
# Plot #
########
f, (left, right) = plt.subplots(nrows=1, ncols=2, sharex=True, figsize=(12, 5), squeeze=True)
for get in crosstalk_results:
left.plot(x, crosstalk_results[get], label=get.__module__)
right.semilogy(x, crosstalk_results[get] / 5000. / x, label=get.__module__)
left.set_title('Cost for Entire graph')
right.set_title('Cost per edge')
left.set_ylabel('Duration (s)')
right.set_ylabel('Duration (s)')
left.set_xlabel('Number of edges per task')
right.set_xlabel('Number of edges per task')
plt.legend()
plt.savefig('images/scaling-edges.png')
|