File: websocket.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (70 lines) | stat: -rw-r--r-- 2,435 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import annotations

from dask.utils import key_split

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.diagnostics.task_stream import colors


class WebsocketPlugin(SchedulerPlugin):

    name = "websocket"

    def __init__(self, socket, scheduler):
        self.socket = socket
        self.scheduler = scheduler

    def restart(self, scheduler, **kwargs):
        """Run when the scheduler restarts itself"""
        self.socket.send("restart", {})

    def add_worker(self, scheduler=None, worker=None, **kwargs):
        """Run when a new worker enters the cluster"""
        self.socket.send("add_worker", {"worker": worker})

    def remove_worker(self, scheduler=None, worker=None, **kwargs):
        """Run when a worker leaves the cluster"""
        self.socket.send("remove_worker", {"worker": worker})

    def add_client(self, scheduler=None, client=None, **kwargs):
        """Run when a new client connects"""
        self.socket.send("add_client", {"client": client})

    def remove_client(self, scheduler=None, client=None, **kwargs):
        """Run when a client disconnects"""
        self.socket.send("remove_client", {"client": client})

    def update_graph(self, scheduler, client=None, **kwargs):
        """Run when a new graph / tasks enter the scheduler"""
        self.socket.send("update_graph", {"client": client})

    def transition(self, key, start, finish, *args, **kwargs):
        """Run whenever a task changes state

        Parameters
        ----------
        key : string
        start : string
            Start state of the transition.
            One of released, waiting, processing, memory, error.
        finish : string
            Final state of the transition.
        *args, **kwargs : More options passed when transitioning
            This may include worker ID, compute time, etc.
        """
        if key not in self.scheduler.tasks:
            return
        kwargs["key"] = key
        startstops = kwargs.get("startstops", [])
        for startstop in startstops:
            color = colors[startstop["action"]]
            if type(color) is not str:
                color = color(kwargs)
            data = {
                "key": key,
                "name": key_split(key),
                "color": color,
                **kwargs,
                **startstop,
            }
            self.socket.send("transition", data)