File: test_sequential_xtriggers.py

package info (click to toggle)
cylc-flow 8.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 14,368 kB
  • sloc: python: 87,751; sh: 17,109; sql: 233; xml: 171; javascript: 78; lisp: 55; makefile: 11
file content (309 lines) | stat: -rw-r--r-- 10,491 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# mypy: disable-error-code=union-attr

"""Test interactions with sequential xtriggers."""

from unittest.mock import patch
import pytest

from cylc.flow.commands import (
    run_cmd,
    force_trigger_tasks,
    remove_tasks
)
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.exceptions import XtriggerConfigError
from cylc.flow.id import TaskTokens
from cylc.flow.scheduler import Scheduler


def list_cycles(schd: Scheduler):
    """List the task instance cycle points present in the pool."""
    return sorted(itask.tokens['cycle'] for itask in schd.pool.get_tasks())


@pytest.fixture()
def sequential(flow, scheduler):
    id_ = flow({
        'scheduler': {
            'cycle point format': 'CCYY',
        },
        'scheduling': {
            'runahead limit': 'P2',
            'initial cycle point': '2000',
            'graph': {
                'P1Y': '@wall_clock => foo',
            }
        }
    })
    return scheduler(id_)


async def test_remove(sequential: Scheduler, start):
    """It should spawn the next instance when a task is removed.

    Ensure that removing a task with a sequential xtrigger does not break the
    chain causing future instances to be removed from the workflow.
    """
    async with start(sequential):
        # the scheduler starts with one task in the pool
        assert list_cycles(sequential) == ['2000']

        # it sequentially spawns out to the runahead limit
        for year in range(2000, 2010):
            foo = sequential.pool.get_task(ISO8601Point(f'{year}'), 'foo')
            if foo.state(is_runahead=True):
                break
            sequential.xtrigger_mgr.call_xtriggers_async(foo)
            sequential.pool.spawn_parentless_sequential_xtriggers()
        assert list_cycles(sequential) == [
            '2000',
            '2001',
            '2002',
            '2003',
        ]

        # remove all tasks in the pool
        await run_cmd(remove_tasks(sequential, ['*'], ["1"]))

        # the next cycle should be automatically spawned
        assert list_cycles(sequential) == ['2004']

        # NOTE: You won't spot this issue in a functional test because the
        # re-spawned tasks are detected as completed and automatically removed.
        # So ATM not dangerous, but potentially inefficient.


async def test_trigger(sequential, start):
    """It should spawn its next instance if triggered ahead of time.

    If you manually trigger a sequentially spawned task before its xtriggers
    have become satisfied, then the sequential spawning chain is broken.

    The task pool should defend against this to ensure that triggering a task
    doesn't cancel it's future instances.
    """
    async with start(sequential):
        assert list_cycles(sequential) == ['2000']

        foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
        await run_cmd(force_trigger_tasks(sequential, [foo.identity], ["1"]))
        foo.state_reset('succeeded')
        sequential.pool.spawn_on_output(foo, 'succeeded')

        assert list_cycles(sequential) == ['2000', '2001']


async def test_set_outputs(sequential, start):
    """It should spawn its next instance if outputs are set ahead of time.

    If you set outputs of a sequentially spawned task before its xtriggers
    have become satisfied, then the sequential spawning chain is broken.

    The task pool should defend against this to ensure that setting outputs
    doesn't cancel it's future instances and their downstream tasks.
    """
    async with start(sequential):
        assert list_cycles(sequential) == ['2000']

        sequential.pool.get_task(ISO8601Point('2000'), 'foo')
        # set foo:succeeded it should spawn next instance
        sequential.pool.set_prereqs_and_outputs(
            {TaskTokens('2000', 'foo')}, ["succeeded"], [], [])

        assert list_cycles(sequential) == ['2001']


async def test_set_prereqs(sequential, start):
    """It should spawn next after manual xtrigger prereq satisfaction."""
    async with start(sequential):
        assert list_cycles(sequential) == ['2000']

        sequential.pool.get_task(ISO8601Point('2000'), 'foo')
        # satisfy foo's xtriggers - it should spawn next instance
        sequential.pool.set_prereqs_and_outputs(
            {TaskTokens('2000', 'foo')}, [], ['xtrigger/all:succeeded'], [])
        sequential.pool.spawn_parentless_sequential_xtriggers()

        assert list_cycles(sequential) == ['2000', '2001']


async def test_reload(sequential, start):
    """It should set the is_xtrigger_sequential flag on reload.

    TODO: test that changes to the sequential status in the config get picked
          up on reload
    """
    async with start(sequential):
        # the task should be marked as sequential
        pre_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
        assert pre_reload.is_xtrigger_sequential is True

        # reload the workflow
        sequential.pool.reload(sequential.config)

        # the original task proxy should have been replaced
        post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
        assert id(pre_reload) != id(post_reload)

        # the new task should be marked as sequential
        assert post_reload.is_xtrigger_sequential is True


@pytest.mark.parametrize('is_sequential', [True, False])
@pytest.mark.parametrize('xtrig_def', [
    'wall_clock(sequential={})',
    'wall_clock(PT1H, sequential={})',
    'xrandom(1, 1, sequential={})',
])
async def test_sequential_arg_ok(
    flow, scheduler, start, xtrig_def: str, is_sequential: bool
):
    """Test passing the sequential argument to xtriggers."""
    wid = flow({
        'scheduler': {
            'cycle point format': 'CCYY',
        },
        'scheduling': {
            'initial cycle point': '2000',
            'runahead limit': 'P1',
            'xtriggers': {
                'myxt': xtrig_def.format(is_sequential),
            },
            'graph': {
                'P1Y': '@myxt => foo',
            }
        }
    })
    schd: Scheduler = scheduler(wid)
    expected_num_cycles = 1 if is_sequential else 3
    async with start(schd):
        itask = schd.pool.get_task(ISO8601Point('2000'), 'foo')
        assert itask.is_xtrigger_sequential is is_sequential
        assert len(list_cycles(schd)) == expected_num_cycles


def test_sequential_arg_bad(flow, validate):
    """Test validation of 'sequential' arg for custom xtrigger function def"""
    wid = flow({
        'scheduling': {
            'xtriggers': {
                'myxt': 'custom_xt(42)'
            },
            'graph': {
                'R1': '@myxt => foo'
            }
        }
    })

    def xtrig1(x, sequential):
        """This uses 'sequential' without a default value"""
        return True

    def xtrig2(x, sequential='True'):
        """This uses 'sequential' with a default of wrong type"""
        return True

    for xtrig in (xtrig1, xtrig2):
        with patch(
            'cylc.flow.xtrigger_mgr.get_xtrig_func',
            return_value=xtrig
        ):
            with pytest.raises(XtriggerConfigError) as excinfo:
                validate(wid)
            assert (
                "reserved argument 'sequential' with no boolean default"
            ) in str(excinfo.value)


def test_sequential_arg_bad2(flow, validate):
    """Test validation of 'sequential' arg for xtrigger calls"""
    wid = flow({
        'scheduling': {
            'initial cycle point': '2000',
            'xtriggers': {
                'clock': 'wall_clock(sequential=3)',
            },
            'graph': {
                'R1': '@clock => foo',
            },
        },
    })

    with pytest.raises(XtriggerConfigError) as excinfo:
        validate(wid)
    assert (
        "invalid argument 'sequential=3' - must be boolean"
    ) in str(excinfo.value)


@pytest.mark.parametrize('is_sequential', [True, False])
async def test_any_sequential(flow, scheduler, start, is_sequential: bool):
    """Test that a task is marked as sequential if any of its xtriggers are."""
    wid = flow({
        'scheduling': {
            'xtriggers': {
                'xt1': 'custom_xt()',
                'xt2': f'custom_xt(sequential={is_sequential})',
                'xt3': 'custom_xt(sequential=False)',
            },
            'graph': {
                'R1': '@xt1 & @xt2 & @xt3 => foo',
            }
        }
    })

    with patch(
        'cylc.flow.xtrigger_mgr.get_xtrig_func',
        return_value=lambda *a, **k: True
    ):
        schd: Scheduler = scheduler(wid)
        async with start(schd):
            itask = schd.pool.get_task(IntegerPoint('1'), 'foo')
            assert itask.is_xtrigger_sequential is is_sequential


async def test_override(flow, scheduler, start):
    """Test that the 'sequential=False' arg can override a default of True."""
    wid = flow({
        'scheduling': {
            'sequential xtriggers': True,
            'xtriggers': {
                'xt1': 'custom_xt()',
                'xt2': 'custom_xt(sequential=False)',
            },
            'graph': {
                'R1': '''
                    @xt1 => foo
                    @xt2 => bar
                ''',
            }
        }
    })

    with patch(
        'cylc.flow.xtrigger_mgr.get_xtrig_func',
        return_value=lambda *a, **k: True
    ):
        schd: Scheduler = scheduler(wid)
        async with start(schd):
            foo = schd.pool.get_task(IntegerPoint('1'), 'foo')
            assert foo.is_xtrigger_sequential is True
            bar = schd.pool.get_task(IntegerPoint('1'), 'bar')
            assert bar.is_xtrigger_sequential is False