File: test_task_state.py

package info (click to toggle)
cylc-flow 8.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 14,368 kB
  • sloc: python: 87,751; sh: 17,109; sql: 233; xml: 171; javascript: 78; lisp: 55; makefile: 11
file content (173 lines) | stat: -rw-r--r-- 5,782 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import MagicMock
import pytest
from types import SimpleNamespace

from cylc.flow.prerequisite import Prerequisite
from cylc.flow.taskdef import TaskDef
from cylc.flow.cycling.integer import IntegerSequence, IntegerPoint
from cylc.flow.run_modes import RunMode, disable_task_event_handlers
from cylc.flow.task_trigger import Dependency, TaskTrigger
from cylc.flow.task_state import (
    TaskState,
    TASK_STATUS_PREPARING,
    TASK_STATUS_SUBMIT_FAILED,
    TASK_STATUS_SUBMITTED,
    TASK_STATUS_SUCCEEDED,
    TASK_STATUS_WAITING,
    TASK_STATUS_RUNNING,
)


@pytest.mark.parametrize(
    'state,is_held',
    [
        (TASK_STATUS_WAITING, True),
        (TASK_STATUS_SUCCEEDED, False)
    ]
)
def test_state_comparison(state, is_held):
    """Test the __call__ method."""
    tdef = TaskDef('foo', {}, '123', '123')
    tstate = TaskState(tdef, '123', state, is_held)

    assert tstate(state, is_held=is_held)
    assert tstate(state)
    assert tstate(is_held=is_held)
    assert tstate(state, 'of', 'flux')
    assert tstate(state, 'of', 'flux', is_held=is_held)

    assert not tstate(state + 'x', is_held=not is_held)
    assert not tstate(state, is_held=not is_held)
    assert not tstate(state + 'x', is_held=is_held)
    assert not tstate(state + 'x')
    assert not tstate(is_held=not is_held)
    assert not tstate(state + 'x', 'of', 'flux')


@pytest.mark.parametrize(
    'state,is_held,should_reset',
    [
        (None, None, False),
        (TASK_STATUS_WAITING, None, False),
        (None, True, False),
        (TASK_STATUS_WAITING, True, False),
        (TASK_STATUS_SUCCEEDED, None, True),
        (None, False, True),
        (TASK_STATUS_WAITING, False, True),
    ]
)
def test_reset(state, is_held, should_reset):
    """Test that tasks do or don't have their state changed."""
    tdef = TaskDef('foo', {}, '123', '123')
    # create task state:
    #   * status: waiting
    #   * is_held: true
    tstate = TaskState(tdef, '123', TASK_STATUS_WAITING, True)
    assert tstate.reset(state, is_held) == should_reset
    if is_held is not None:
        assert tstate.is_held == is_held
    if state is not None:
        assert tstate.status == state


def test_task_prereq_duplicates(set_cycling_type):
    """Test prerequisite duplicates from multiple recurrences are discarded."""

    set_cycling_type()

    seq1 = IntegerSequence('R1', "1")
    seq2 = IntegerSequence('R/1/P1', "1")

    trig = TaskTrigger('a', "1", 'succeeded', None, None, None, None)

    dep = Dependency([trig], [trig], False)

    tdef = TaskDef('foo', {}, IntegerPoint("1"), IntegerPoint("1"))
    tdef.add_dependency(dep, seq1)
    tdef.add_dependency(dep, seq2)  # duplicate!

    tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_WAITING, False)

    prereqs = [p._satisfied for p in tstate.prerequisites]

    assert prereqs == [{("1", "a", "succeeded"): False}]


def test_task_state_order():
    """Test is_gt and is_gte methods."""

    tdef = TaskDef('foo', {}, IntegerPoint("1"), IntegerPoint("1"))
    tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_SUBMITTED, False)

    assert tstate.is_gt(TASK_STATUS_WAITING)
    assert tstate.is_gt(TASK_STATUS_PREPARING)
    assert tstate.is_gt(TASK_STATUS_SUBMIT_FAILED)
    assert not tstate.is_gt(TASK_STATUS_SUBMITTED)
    assert tstate.is_gte(TASK_STATUS_SUBMITTED)
    assert not tstate.is_gt(TASK_STATUS_RUNNING)
    assert not tstate.is_gte(TASK_STATUS_RUNNING)


def test_get_resolved_dependencies():
    prereq1 = Prerequisite(IntegerPoint('2'))
    prereq1[('1', 'a', 'x')] = True
    prereq1[('1', 'b', 'x')] = False
    prereq1[('1', 'c', 'x')] = 'satisfied from database'
    prereq1[('1', 'd', 'x')] = 'force satisfied'
    prereq2 = Prerequisite(IntegerPoint('2'))
    prereq2[('1', 'e', 'succeeded')] = False
    prereq2[('1', 'e', 'failed')] = True
    task_state = TaskState(
        MagicMock(), IntegerPoint('2'), TASK_STATUS_WAITING, False
    )
    task_state.prerequisites = [prereq1, prereq2]
    assert task_state.get_resolved_dependencies() == [
        '1/a',
        '1/c',
        '1/d',
        '1/e',
    ]


@pytest.mark.parametrize(
    'itask_run_mode, disable_handlers, expect',
    (
        ('live', True, False),
        ('live', False, False),
        ('dummy', True, False),
        ('dummy', False, False),
        ('simulation', True, True),
        ('simulation', False, True),
        ('skip', True, True),
        ('skip', False, False),
    )
)
def test_disable_task_event_handlers(itask_run_mode, disable_handlers, expect):
    """Conditions under which task event handlers should not be used.
    """
    # Construct a fake itask object:
    itask = SimpleNamespace(
        run_mode=RunMode(itask_run_mode),
        platform={'disable task event handlers': disable_handlers},
        tdef=SimpleNamespace(
            rtconfig={
                'skip': {'disable task event handlers': disable_handlers}})
    )
    # Check method:
    assert disable_task_event_handlers(itask) is expect