1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
|
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# mypy: disable-error-code=union-attr
"""Test interactions with sequential xtriggers."""
from unittest.mock import patch
import pytest
from cylc.flow.commands import (
run_cmd,
force_trigger_tasks,
remove_tasks
)
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.exceptions import XtriggerConfigError
from cylc.flow.id import TaskTokens
from cylc.flow.scheduler import Scheduler
def list_cycles(schd: Scheduler):
"""List the task instance cycle points present in the pool."""
return sorted(itask.tokens['cycle'] for itask in schd.pool.get_tasks())
@pytest.fixture()
def sequential(flow, scheduler):
id_ = flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'runahead limit': 'P2',
'initial cycle point': '2000',
'graph': {
'P1Y': '@wall_clock => foo',
}
}
})
return scheduler(id_)
async def test_remove(sequential: Scheduler, start):
"""It should spawn the next instance when a task is removed.
Ensure that removing a task with a sequential xtrigger does not break the
chain causing future instances to be removed from the workflow.
"""
async with start(sequential):
# the scheduler starts with one task in the pool
assert list_cycles(sequential) == ['2000']
# it sequentially spawns out to the runahead limit
for year in range(2000, 2010):
foo = sequential.pool.get_task(ISO8601Point(f'{year}'), 'foo')
if foo.state(is_runahead=True):
break
sequential.xtrigger_mgr.call_xtriggers_async(foo)
sequential.pool.spawn_parentless_sequential_xtriggers()
assert list_cycles(sequential) == [
'2000',
'2001',
'2002',
'2003',
]
# remove all tasks in the pool
await run_cmd(remove_tasks(sequential, ['*'], ["1"]))
# the next cycle should be automatically spawned
assert list_cycles(sequential) == ['2004']
# NOTE: You won't spot this issue in a functional test because the
# re-spawned tasks are detected as completed and automatically removed.
# So ATM not dangerous, but potentially inefficient.
async def test_trigger(sequential, start):
"""It should spawn its next instance if triggered ahead of time.
If you manually trigger a sequentially spawned task before its xtriggers
have become satisfied, then the sequential spawning chain is broken.
The task pool should defend against this to ensure that triggering a task
doesn't cancel it's future instances.
"""
async with start(sequential):
assert list_cycles(sequential) == ['2000']
foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
await run_cmd(force_trigger_tasks(sequential, [foo.identity], ["1"]))
foo.state_reset('succeeded')
sequential.pool.spawn_on_output(foo, 'succeeded')
assert list_cycles(sequential) == ['2000', '2001']
async def test_set_outputs(sequential, start):
"""It should spawn its next instance if outputs are set ahead of time.
If you set outputs of a sequentially spawned task before its xtriggers
have become satisfied, then the sequential spawning chain is broken.
The task pool should defend against this to ensure that setting outputs
doesn't cancel it's future instances and their downstream tasks.
"""
async with start(sequential):
assert list_cycles(sequential) == ['2000']
sequential.pool.get_task(ISO8601Point('2000'), 'foo')
# set foo:succeeded it should spawn next instance
sequential.pool.set_prereqs_and_outputs(
{TaskTokens('2000', 'foo')}, ["succeeded"], [], [])
assert list_cycles(sequential) == ['2001']
async def test_set_prereqs(sequential, start):
"""It should spawn next after manual xtrigger prereq satisfaction."""
async with start(sequential):
assert list_cycles(sequential) == ['2000']
sequential.pool.get_task(ISO8601Point('2000'), 'foo')
# satisfy foo's xtriggers - it should spawn next instance
sequential.pool.set_prereqs_and_outputs(
{TaskTokens('2000', 'foo')}, [], ['xtrigger/all:succeeded'], [])
sequential.pool.spawn_parentless_sequential_xtriggers()
assert list_cycles(sequential) == ['2000', '2001']
async def test_reload(sequential, start):
"""It should set the is_xtrigger_sequential flag on reload.
TODO: test that changes to the sequential status in the config get picked
up on reload
"""
async with start(sequential):
# the task should be marked as sequential
pre_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
assert pre_reload.is_xtrigger_sequential is True
# reload the workflow
sequential.pool.reload(sequential.config)
# the original task proxy should have been replaced
post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
assert id(pre_reload) != id(post_reload)
# the new task should be marked as sequential
assert post_reload.is_xtrigger_sequential is True
@pytest.mark.parametrize('is_sequential', [True, False])
@pytest.mark.parametrize('xtrig_def', [
'wall_clock(sequential={})',
'wall_clock(PT1H, sequential={})',
'xrandom(1, 1, sequential={})',
])
async def test_sequential_arg_ok(
flow, scheduler, start, xtrig_def: str, is_sequential: bool
):
"""Test passing the sequential argument to xtriggers."""
wid = flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P1',
'xtriggers': {
'myxt': xtrig_def.format(is_sequential),
},
'graph': {
'P1Y': '@myxt => foo',
}
}
})
schd: Scheduler = scheduler(wid)
expected_num_cycles = 1 if is_sequential else 3
async with start(schd):
itask = schd.pool.get_task(ISO8601Point('2000'), 'foo')
assert itask.is_xtrigger_sequential is is_sequential
assert len(list_cycles(schd)) == expected_num_cycles
def test_sequential_arg_bad(flow, validate):
"""Test validation of 'sequential' arg for custom xtrigger function def"""
wid = flow({
'scheduling': {
'xtriggers': {
'myxt': 'custom_xt(42)'
},
'graph': {
'R1': '@myxt => foo'
}
}
})
def xtrig1(x, sequential):
"""This uses 'sequential' without a default value"""
return True
def xtrig2(x, sequential='True'):
"""This uses 'sequential' with a default of wrong type"""
return True
for xtrig in (xtrig1, xtrig2):
with patch(
'cylc.flow.xtrigger_mgr.get_xtrig_func',
return_value=xtrig
):
with pytest.raises(XtriggerConfigError) as excinfo:
validate(wid)
assert (
"reserved argument 'sequential' with no boolean default"
) in str(excinfo.value)
def test_sequential_arg_bad2(flow, validate):
"""Test validation of 'sequential' arg for xtrigger calls"""
wid = flow({
'scheduling': {
'initial cycle point': '2000',
'xtriggers': {
'clock': 'wall_clock(sequential=3)',
},
'graph': {
'R1': '@clock => foo',
},
},
})
with pytest.raises(XtriggerConfigError) as excinfo:
validate(wid)
assert (
"invalid argument 'sequential=3' - must be boolean"
) in str(excinfo.value)
@pytest.mark.parametrize('is_sequential', [True, False])
async def test_any_sequential(flow, scheduler, start, is_sequential: bool):
"""Test that a task is marked as sequential if any of its xtriggers are."""
wid = flow({
'scheduling': {
'xtriggers': {
'xt1': 'custom_xt()',
'xt2': f'custom_xt(sequential={is_sequential})',
'xt3': 'custom_xt(sequential=False)',
},
'graph': {
'R1': '@xt1 & @xt2 & @xt3 => foo',
}
}
})
with patch(
'cylc.flow.xtrigger_mgr.get_xtrig_func',
return_value=lambda *a, **k: True
):
schd: Scheduler = scheduler(wid)
async with start(schd):
itask = schd.pool.get_task(IntegerPoint('1'), 'foo')
assert itask.is_xtrigger_sequential is is_sequential
async def test_override(flow, scheduler, start):
"""Test that the 'sequential=False' arg can override a default of True."""
wid = flow({
'scheduling': {
'sequential xtriggers': True,
'xtriggers': {
'xt1': 'custom_xt()',
'xt2': 'custom_xt(sequential=False)',
},
'graph': {
'R1': '''
@xt1 => foo
@xt2 => bar
''',
}
}
})
with patch(
'cylc.flow.xtrigger_mgr.get_xtrig_func',
return_value=lambda *a, **k: True
):
schd: Scheduler = scheduler(wid)
async with start(schd):
foo = schd.pool.get_task(IntegerPoint('1'), 'foo')
assert foo.is_xtrigger_sequential is True
bar = schd.pool.get_task(IntegerPoint('1'), 'bar')
assert bar.is_xtrigger_sequential is False
|