File: _stories.py

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (62 lines) | stat: -rw-r--r-- 1,459 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import annotations

from collections.abc import Collection, Iterable
from typing import TYPE_CHECKING

from dask.typing import Key

if TYPE_CHECKING:
    # Circular import
    from distributed.scheduler import Transition


def scheduler_story(
    keys_or_stimuli: set[Key | str], transition_log: Iterable[Transition]
) -> list[Transition]:
    """Creates a story from the scheduler transition log given a set of keys
    describing tasks or stimuli.

    Parameters
    ----------
    keys_or_stimuli : set[str]
        Task keys or stimulus_id's
    log : iterable
        The scheduler transition log

    Returns
    -------
    story : list[tuple]
    """
    return [
        t
        for t in transition_log
        if t[0] in keys_or_stimuli or keys_or_stimuli.intersection(t[3])
    ]


def worker_story(keys_or_stimuli: Collection[Key | str], log: Iterable[tuple]) -> list:
    """Creates a story from the worker log given a set of keys
    describing tasks or stimuli.

    Parameters
    ----------
    keys_or_stimuli : set[str]
        Task keys or stimulus_id's
    log : iterable
        The worker log

    Returns
    -------
    story : list[str]
    """
    return [
        msg
        for msg in log
        if any(key in msg for key in keys_or_stimuli)
        or any(
            key in c
            for key in keys_or_stimuli
            for c in msg
            if isinstance(c, (tuple, list, set))
        )
    ]