File: taskrecord.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (105 lines) | stat: -rw-r--r-- 3,113 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import annotations

import datetime
import threading
from concurrent.futures import Future

# only for type checking:
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union

from typing_extensions import TypedDict

if TYPE_CHECKING:
    from parsl.dataflow.futures import AppFuture

import parsl.dataflow.dflow as dflow
from parsl.dataflow.states import States


class TaskRecord(TypedDict, total=False):
    """This stores most information about a Parsl task"""

    dfk: dflow.DataFlowKernel
    """The DataFlowKernel which is managing this task.
    """

    func_name: str

    status: States

    depends: List[Future]

    app_fu: AppFuture
    """The Future which was returned to the user when an app was invoked.
    """

    exec_fu: Optional[Future]
    """When a task has been launched on an executor, stores the Future
    returned by that executor.
    """

    executor: str
    """The name of the executor which this task will be/is being/was
    executed on.
    """

    retries_left: int
    fail_count: int
    fail_cost: float
    fail_history: List[str]

    checkpoint: bool  # this change is also in #1516
    """Should this task be checkpointed?
    """

    hashsum: Optional[str]  # hash for checkpointing/memoization.
    """The hash used for checkpointing and memoisation. This is not known
    until at least all relevant dependencies have completed, and will be
    None before that.
    """

    task_launch_lock: threading.Lock
    """This lock is used to ensure that task launch only happens once.
    A task can be launched by dependencies completing from arbitrary
    threads, and a race condition would exist when dependencies complete
    in multiple threads very close together in time, which this lock
    prevents.
    """

    # these three could be more strongly typed perhaps but I'm not thinking about that now
    func: Callable
    fn_hash: str
    args: Sequence[Any]
    # in some places we uses a Tuple[Any, ...] and in some places a List[Any].
    # This is an attempt to correctly type both of those.
    kwargs: Dict[str, Any]

    time_invoked: Optional[datetime.datetime]
    time_returned: Optional[datetime.datetime]
    try_time_launched: Optional[datetime.datetime]
    try_time_returned: Optional[datetime.datetime]

    memoize: bool
    """Should this task be memoized?"""
    ignore_for_cache: Sequence[str]
    from_memo: Optional[bool]

    id: int
    try_id: int

    resource_specification: Dict[str, Any]
    """Dictionary containing relevant info for a task execution.
    Includes resources to allocate and execution mode as a given
    executor permits."""

    join: bool
    """Is this a join_app?"""

    joins: Union[None, Future, List[Future]]
    """If this is a join app and the python body has executed, then this
    contains the Future or list of Futures that the join app will join."""

    join_lock: threading.Lock
    """Restricts access to end-of-join behavior to ensure that joins
    only complete once, even if several joining Futures complete close
    together in time."""