File: priority.rst

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (76 lines) | stat: -rw-r--r-- 3,265 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
Prioritizing Work
=================

When there is more work than workers, Dask has to decide which tasks to
prioritize over others.  Dask can determine these priorities automatically to
optimize performance, or a user can specify priorities manually according to
their needs.

Dask uses the following priorities, in order:

1.  **User priorities**: A user defined priority is provided by the ``priority=`` keyword argument
    to functions like ``compute()``, ``persist()``, ``submit()``, or ``map()``.
    Tasks with higher priorities run before tasks with lower priorities with
    the default priority being zero.

    .. code-block:: python

       future = client.submit(func, *args, priority=10)  # high priority task
       future = client.submit(func, *args, priority=-10)  # low priority task

       df = df.persist(priority=10)  # high priority computation

    Priorities can also be specified using the dask annotations machinery:

    .. code-block:: python

       with dask.annotate(priority=10):
           future = client.submit(func, *args)  # high priority task
       with dask.annotate(priority=-10):
           future = client.submit(func, *args)  # low priority task

       with dask.annotate(priority=10):
           df = df.persist()  # high priority computation


2.  **First in first out chronologically**: Dask prefers computations that were
    submitted early.  Because users can submit computations asynchronously it
    may be that several different computations are running on the workers at
    the same time.  Generally Dask prefers those groups of tasks that were
    submitted first.

    As a nuance, tasks that are submitted within a close window are often
    considered to be submitted at the same time.

    .. code-block:: python

       x = x.persist()  # submitted first and so has higher priority
       # wait a while
       x = x.persist()  # submitted second and so has lower priority

    In this case "a while" depends on the kind of computation. Operations
    that are often used in bulk processing, like ``compute`` and ``persist``
    consider any two computations submitted in the same sixty seconds
    to have the same priority.  Operations that are often used in real-time
    processing, like ``submit`` or ``map`` are considered the same priority if
    they are submitted within the 100 milliseconds of each other.  This
    behavior can be controlled with the ``fifo_timeout=`` keyword:

    .. code-block:: python

       x = x.persist()
       # wait one minute
       x = x.persist(fifo_timeout='10 minutes')  # has the same priority

       a = client.submit(func, *args)
       # wait no time at all
       b = client.submit(func, *args, fifo_timeout='0ms')  # is lower priority

3.  **Graph Structure**: Within any given computation (a compute or persist
    call) Dask orders tasks in such a way as to minimize the memory-footprint
    of the computation.  This is discussed in more depth in the
    `task ordering documentation <https://github.com/dask/dask/blob/main/dask/order.py>`_.

If multiple tasks each have exactly the same priorities outlined above, then
the order in which tasks arrive at a worker, in a last in first out manner,
is used to determine the order in which tasks run.