1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
#!/usr/bin/env python
# Copyright (C) 2023- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
# This example shows TaskVine executing a manually constructed dask graph.
# See vine_example_dask_delayed.py for an example where the graph
# is constructed by dask.
import ndcctools.taskvine as vine
from ndcctools.taskvine.compat import DaskVine
import argparse
import getpass
import sys
import traceback
from operator import add # use add function in the example graph
dsk_graph = {
"x": 1,
"y": 2,
"z": (add, "x", "y"),
"w": (sum, ["x", "y", "z"]),
"v": [(sum, ["w", "z"]), 2],
"t": (sum, "v")
}
expected_result = 11
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="vine_example_dask_graph.py",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="""This example shows TaskVine executing a manually constructed dask graph.
See vine_example_dask_delayed.py for an example where the graph
is constructed by dask.""")
parser.add_argument(
"--name",
nargs="?",
type=str,
help="name to assign to the manager.",
default=f"vine-dask-graph-{getpass.getuser()}",
)
parser.add_argument(
"--port",
nargs="?",
type=int,
help="port for the manager to listen for connections. If 0, pick any available.",
default=9123,
)
parser.add_argument(
"--disable-peer-transfers",
action="store_true",
help="disable transfers among workers.",
default=False,
)
args = parser.parse_args()
m = DaskVine(port=args.port, ssl=True)
m.set_name(args.name)
print(f"Listening for workers at port: {m.port}")
if args.disable_peer_transfers:
m.disable_peer_transfers()
# checkpoint at even levels when nodes have at least one dependency
def checkpoint(dag, key):
if dag.depth_of(key) % 2 == 0 and len(dag.get_dependencies(key)) > 0:
print(f"checkpoint for {key}")
return True
return False
f = vine.Factory(manager=m)
f.cores = 4
f.disk = 2000
f.memory = 2000
f.max_workers = 1
f.min_workers = 1
with f:
desired_keys = ["t", "v"]
desired_keys = list(dsk_graph.keys())
print(f"dask graph example is:\n{dsk_graph}")
print(f"desired keys are {desired_keys}")
try:
results = m.get(dsk_graph, desired_keys, lazy_transfers=True, checkpoint_fn=checkpoint, resources={"cores": 1})
print({k: v for k, v in zip(desired_keys, results)})
except Exception:
traceback.print_exc()
print("Terminating workers...", end="")
print("done!")
sys.exit(0)
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
|