1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest.mock import MagicMock
import pytest
from types import SimpleNamespace
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.taskdef import TaskDef
from cylc.flow.cycling.integer import IntegerSequence, IntegerPoint
from cylc.flow.run_modes import RunMode, disable_task_event_handlers
from cylc.flow.task_trigger import Dependency, TaskTrigger
from cylc.flow.task_state import (
TaskState,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
TASK_STATUS_RUNNING,
)
@pytest.mark.parametrize(
'state,is_held',
[
(TASK_STATUS_WAITING, True),
(TASK_STATUS_SUCCEEDED, False)
]
)
def test_state_comparison(state, is_held):
"""Test the __call__ method."""
tdef = TaskDef('foo', {}, '123', '123')
tstate = TaskState(tdef, '123', state, is_held)
assert tstate(state, is_held=is_held)
assert tstate(state)
assert tstate(is_held=is_held)
assert tstate(state, 'of', 'flux')
assert tstate(state, 'of', 'flux', is_held=is_held)
assert not tstate(state + 'x', is_held=not is_held)
assert not tstate(state, is_held=not is_held)
assert not tstate(state + 'x', is_held=is_held)
assert not tstate(state + 'x')
assert not tstate(is_held=not is_held)
assert not tstate(state + 'x', 'of', 'flux')
@pytest.mark.parametrize(
'state,is_held,should_reset',
[
(None, None, False),
(TASK_STATUS_WAITING, None, False),
(None, True, False),
(TASK_STATUS_WAITING, True, False),
(TASK_STATUS_SUCCEEDED, None, True),
(None, False, True),
(TASK_STATUS_WAITING, False, True),
]
)
def test_reset(state, is_held, should_reset):
"""Test that tasks do or don't have their state changed."""
tdef = TaskDef('foo', {}, '123', '123')
# create task state:
# * status: waiting
# * is_held: true
tstate = TaskState(tdef, '123', TASK_STATUS_WAITING, True)
assert tstate.reset(state, is_held) == should_reset
if is_held is not None:
assert tstate.is_held == is_held
if state is not None:
assert tstate.status == state
def test_task_prereq_duplicates(set_cycling_type):
"""Test prerequisite duplicates from multiple recurrences are discarded."""
set_cycling_type()
seq1 = IntegerSequence('R1', "1")
seq2 = IntegerSequence('R/1/P1', "1")
trig = TaskTrigger('a', "1", 'succeeded', None, None, None, None)
dep = Dependency([trig], [trig], False)
tdef = TaskDef('foo', {}, IntegerPoint("1"), IntegerPoint("1"))
tdef.add_dependency(dep, seq1)
tdef.add_dependency(dep, seq2) # duplicate!
tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_WAITING, False)
prereqs = [p._satisfied for p in tstate.prerequisites]
assert prereqs == [{("1", "a", "succeeded"): False}]
def test_task_state_order():
"""Test is_gt and is_gte methods."""
tdef = TaskDef('foo', {}, IntegerPoint("1"), IntegerPoint("1"))
tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_SUBMITTED, False)
assert tstate.is_gt(TASK_STATUS_WAITING)
assert tstate.is_gt(TASK_STATUS_PREPARING)
assert tstate.is_gt(TASK_STATUS_SUBMIT_FAILED)
assert not tstate.is_gt(TASK_STATUS_SUBMITTED)
assert tstate.is_gte(TASK_STATUS_SUBMITTED)
assert not tstate.is_gt(TASK_STATUS_RUNNING)
assert not tstate.is_gte(TASK_STATUS_RUNNING)
def test_get_resolved_dependencies():
prereq1 = Prerequisite(IntegerPoint('2'))
prereq1[('1', 'a', 'x')] = True
prereq1[('1', 'b', 'x')] = False
prereq1[('1', 'c', 'x')] = 'satisfied from database'
prereq1[('1', 'd', 'x')] = 'force satisfied'
prereq2 = Prerequisite(IntegerPoint('2'))
prereq2[('1', 'e', 'succeeded')] = False
prereq2[('1', 'e', 'failed')] = True
task_state = TaskState(
MagicMock(), IntegerPoint('2'), TASK_STATUS_WAITING, False
)
task_state.prerequisites = [prereq1, prereq2]
assert task_state.get_resolved_dependencies() == [
'1/a',
'1/c',
'1/d',
'1/e',
]
@pytest.mark.parametrize(
'itask_run_mode, disable_handlers, expect',
(
('live', True, False),
('live', False, False),
('dummy', True, False),
('dummy', False, False),
('simulation', True, True),
('simulation', False, True),
('skip', True, True),
('skip', False, False),
)
)
def test_disable_task_event_handlers(itask_run_mode, disable_handlers, expect):
"""Conditions under which task event handlers should not be used.
"""
# Construct a fake itask object:
itask = SimpleNamespace(
run_mode=RunMode(itask_run_mode),
platform={'disable task event handlers': disable_handlers},
tdef=SimpleNamespace(
rtconfig={
'skip': {'disable task event handlers': disable_handlers}})
)
# Check method:
assert disable_task_event_handlers(itask) is expect
|