1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
from __future__ import annotations
from dask.utils import key_split
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.diagnostics.task_stream import colors
class WebsocketPlugin(SchedulerPlugin):
name = "websocket"
def __init__(self, socket, scheduler):
self.socket = socket
self.scheduler = scheduler
def restart(self, scheduler, **kwargs):
"""Run when the scheduler restarts itself"""
self.socket.send("restart", {})
def add_worker(self, scheduler=None, worker=None, **kwargs):
"""Run when a new worker enters the cluster"""
self.socket.send("add_worker", {"worker": worker})
def remove_worker(self, scheduler=None, worker=None, **kwargs):
"""Run when a worker leaves the cluster"""
self.socket.send("remove_worker", {"worker": worker})
def add_client(self, scheduler=None, client=None, **kwargs):
"""Run when a new client connects"""
self.socket.send("add_client", {"client": client})
def remove_client(self, scheduler=None, client=None, **kwargs):
"""Run when a client disconnects"""
self.socket.send("remove_client", {"client": client})
def update_graph(self, scheduler, client=None, **kwargs):
"""Run when a new graph / tasks enter the scheduler"""
self.socket.send("update_graph", {"client": client})
def transition(self, key, start, finish, *args, **kwargs):
"""Run whenever a task changes state
Parameters
----------
key : string
start : string
Start state of the transition.
One of released, waiting, processing, memory, error.
finish : string
Final state of the transition.
*args, **kwargs : More options passed when transitioning
This may include worker ID, compute time, etc.
"""
if key not in self.scheduler.tasks:
return
kwargs["key"] = key
startstops = kwargs.get("startstops", [])
for startstop in startstops:
color = colors[startstop["action"]]
if type(color) is not str:
color = color(kwargs)
data = {
"key": key,
"name": key_split(key),
"color": color,
**kwargs,
**startstop,
}
self.socket.send("transition", data)
|