1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
from __future__ import annotations
import datetime
import threading
from concurrent.futures import Future
# only for type checking:
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union
from typing_extensions import TypedDict
if TYPE_CHECKING:
from parsl.dataflow.futures import AppFuture
import parsl.dataflow.dflow as dflow
from parsl.dataflow.states import States
class TaskRecord(TypedDict, total=False):
"""This stores most information about a Parsl task"""
dfk: dflow.DataFlowKernel
"""The DataFlowKernel which is managing this task.
"""
func_name: str
status: States
depends: List[Future]
app_fu: AppFuture
"""The Future which was returned to the user when an app was invoked.
"""
exec_fu: Optional[Future]
"""When a task has been launched on an executor, stores the Future
returned by that executor.
"""
executor: str
"""The name of the executor which this task will be/is being/was
executed on.
"""
retries_left: int
fail_count: int
fail_cost: float
fail_history: List[str]
checkpoint: bool # this change is also in #1516
"""Should this task be checkpointed?
"""
hashsum: Optional[str] # hash for checkpointing/memoization.
"""The hash used for checkpointing and memoisation. This is not known
until at least all relevant dependencies have completed, and will be
None before that.
"""
task_launch_lock: threading.Lock
"""This lock is used to ensure that task launch only happens once.
A task can be launched by dependencies completing from arbitrary
threads, and a race condition would exist when dependencies complete
in multiple threads very close together in time, which this lock
prevents.
"""
# these three could be more strongly typed perhaps but I'm not thinking about that now
func: Callable
fn_hash: str
args: Sequence[Any]
# in some places we uses a Tuple[Any, ...] and in some places a List[Any].
# This is an attempt to correctly type both of those.
kwargs: Dict[str, Any]
time_invoked: Optional[datetime.datetime]
time_returned: Optional[datetime.datetime]
try_time_launched: Optional[datetime.datetime]
try_time_returned: Optional[datetime.datetime]
memoize: bool
"""Should this task be memoized?"""
ignore_for_cache: Sequence[str]
from_memo: Optional[bool]
id: int
try_id: int
resource_specification: Dict[str, Any]
"""Dictionary containing relevant info for a task execution.
Includes resources to allocate and execution mode as a given
executor permits."""
join: bool
"""Is this a join_app?"""
joins: Union[None, Future, List[Future]]
"""If this is a join app and the python body has executed, then this
contains the Future or list of Futures that the join app will join."""
join_lock: threading.Lock
"""Restricts access to end-of-join behavior to ensure that joins
only complete once, even if several joining Futures complete close
together in time."""
|