File: test_task_outputs.py

package info (click to toggle)
cylc-flow 8.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 14,368 kB
  • sloc: python: 87,751; sh: 17,109; sql: 233; xml: 171; javascript: 78; lisp: 55; makefile: 11
file content (358 lines) | stat: -rw-r--r-- 11,307 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from types import SimpleNamespace
from typing import (
    Optional,
    Set,
)

import pytest

from cylc.flow.task_outputs import (
    TASK_OUTPUT_EXPIRED,
    TASK_OUTPUT_FAILED,
    TASK_OUTPUT_SUBMIT_FAILED,
    TASK_OUTPUT_SUBMITTED,
    TASK_OUTPUT_SUCCEEDED,
    TASK_OUTPUTS,
    TaskOutputs,
    get_completion_expression,
    get_trigger_completion_variable_maps,
)
from cylc.flow.util import sstrip


def tdef(required, optional, completion=None):
    """Stub a task definition.

    Args:
        required: Collection of required outputs.
        optional: Collection of optional outputs.
        completion: User defined execution completion expression.

    """
    return SimpleNamespace(
        rtconfig={
            'completion': completion,
        },
        outputs={
            output: (
                output,
                (
                    # output is required:
                    True if output in required
                    # output is optional:
                    else False if output in optional
                    # output is ambiguous (i.e. not referenced in graph):
                    else None
                )
            )
            for output in set(TASK_OUTPUTS) | set(required) | set(optional)
        },
    )


def test_completion_implicit():
    """It should generate a completion expression when none is provided.

    The outputs should be considered "complete" according to the logic in
    proposal point 5:
    https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
    """
    # one required output - succeeded
    outputs = TaskOutputs(tdef([TASK_OUTPUT_SUCCEEDED], []))

    # the completion expression should only contain the one required output
    assert outputs._completion_expression == 'succeeded'
    # the outputs should be incomplete - it hasn't run yet
    assert outputs.is_complete() is False

    # set the submit-failed output
    outputs.set_message_complete(TASK_OUTPUT_SUBMIT_FAILED)
    # the outputs should be incomplete - submited-failed is a "final" output
    assert outputs.is_complete() is False

    # set the submitted and succeeded outputs
    outputs.set_message_complete(TASK_OUTPUT_SUBMITTED)
    outputs.set_message_complete(TASK_OUTPUT_SUCCEEDED)
    # the outputs should be complete - it has run an succeedd
    assert outputs.is_complete() is True

    # set the expired output
    outputs.set_message_complete(TASK_OUTPUT_EXPIRED)
    # the outputs should still be complete - it has run and succeeded
    assert outputs.is_complete() is True


def test_completion_explicit():
    """It should use the provided completion expression.

    The outputs should be considered "complete" according to the logic in
    proposal point 5:
    https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
    """
    outputs = TaskOutputs(tdef(
        # no required outputs
        [],
        # four optional outputs
        [
            TASK_OUTPUT_SUCCEEDED,
            TASK_OUTPUT_FAILED,
            'x',
            'y',
        ],
        # one pair must be satisfied for the outputs to be complete
        completion='(succeeded and x) or (failed and y)',
    ))

    # the outputs should be incomplete - it hasn't run yet
    assert outputs.is_complete() is False

    # set the succeeded and failed outputs
    outputs.set_message_complete(TASK_OUTPUT_SUCCEEDED)
    outputs.set_message_complete(TASK_OUTPUT_FAILED)

    # the task should be incomplete - it has executed but the completion
    # expression is not satisfied
    assert outputs.is_complete() is False

    # satisfy the (failed and y) pair
    outputs.set_message_complete('y')
    assert outputs.is_complete() is True

    # satisfy the (succeeded and x) pair
    outputs._completed['y'] = False
    outputs.set_message_complete('x')
    assert outputs.is_complete() is True


@pytest.mark.parametrize(
    'required, optional, expression', [
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED},
            [],
            'succeeded',
            id='0',
        ),
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED, 'x'},
            [],
            '(succeeded and x)',
            id='1',
        ),
        pytest.param(
            [],
            {TASK_OUTPUT_SUCCEEDED},
            'succeeded or failed',
            id='2',
        ),
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED},
            {TASK_OUTPUT_EXPIRED},
            'succeeded or expired',
            id='3',
        ),
        pytest.param(
            [],
            {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_EXPIRED},
            'succeeded or failed or expired',
            id='4',
        ),
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED},
            {TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUBMITTED},
            'succeeded or submit_failed or expired',
            id='5',
        ),
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_SUBMITTED},
            {TASK_OUTPUT_EXPIRED},
            '(submitted and succeeded) or expired',
            id='6',
        ),
        pytest.param(
            [],
            {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_SUBMIT_FAILED},
            'succeeded or failed or submit_failed',
            id='7',
        ),
        pytest.param(
            {'x'},
            {
                TASK_OUTPUT_SUCCEEDED,
                TASK_OUTPUT_SUBMIT_FAILED,
                TASK_OUTPUT_EXPIRED,
            },
            '(x and succeeded) or failed or submit_failed or expired',
            id='8',
        ),
    ],
)
def test_get_completion_expression_implicit(required, optional, expression):
    """It should generate a completion expression if none is provided."""
    assert get_completion_expression(tdef(required, optional)) == expression


def test_get_completion_expression_explicit():
    """If a completion expression is used, it should be used unmodified."""
    assert get_completion_expression(tdef(
        {'x', 'y'},
        {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED, TASK_OUTPUT_EXPIRED},
        '((failed and x) or (succeeded and y)) or expired'
    )) == '((failed and x) or (succeeded and y)) or expired'


def test_format_completion_status():
    outputs = TaskOutputs(
        tdef(
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            {TASK_OUTPUT_EXPIRED},
        )
    )
    assert outputs.format_completion_status(
        indent=2, gutter=2
    ) == '  ' + sstrip(
        '''
          ┆  (
        ⨯ ┆    succeeded
        ⨯ ┆    and x
        ⨯ ┆    and y
          ┆  )
        ⨯ ┆  or expired
        '''
    )
    outputs.set_message_complete('succeeded')
    outputs.set_message_complete('x')
    assert outputs.format_completion_status(
        indent=2, gutter=2
    ) == '  ' + sstrip(
        '''
          ┆  (
        ✓ ┆    succeeded
        ✓ ┆    and x
        ⨯ ┆    and y
          ┆  )
        ⨯ ┆  or expired
        '''
    )


@pytest.mark.parametrize(
    'required, optional, expected_required, expected_expression', [
        # this task has three required outputs and one optional output
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            {'z'},
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            None,
            id="3-required-1-optional",
        ),
        # this task does not have any required outputs (besides the implicitly
        # required submitted/started outputs)
        # Note: validation should prevent this at the config level
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            {TASK_OUTPUT_FAILED},  # task may fail
            set(),
            None,
            id="no-required-outputs",
        ),
        # the preconditions expiry/submitted are excluded from this logic when
        # defined as optional:
        pytest.param(
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            {TASK_OUTPUT_EXPIRED},  # task may expire
            {TASK_OUTPUT_SUCCEEDED, 'x', 'y'},
            '(succeeded and x and y) or expired',
            id="expiry-submitted",
        ),
        # NOTE: a required output might not be required!
        # If success is optional, then apparently-required outputs are made
        # implicitly optional. See
        # https://github.com/cylc/cylc-flow/pull/6505#issuecomment-2517781523
        pytest.param(
            {'x'},
            {TASK_OUTPUT_SUCCEEDED},
            set(),
            '(x and succeeded) or failed',
            id="implicit-optional",
        ),
        pytest.param(
            set(),
            {'x', TASK_OUTPUT_SUCCEEDED},
            set(),
            'succeeded or failed',
            id="all-optional",
        ),
    ]
)
def test_iter_required_outputs(
    required: Set[str],
    optional: Set[str],
    expected_required: Set[str],
    expected_expression: Optional[str],
):
    """It should yield required outputs only."""
    outputs = TaskOutputs(tdef(required, optional))
    if expected_expression:
        assert outputs._completion_expression == expected_expression
    assert set(outputs.iter_required_messages()) == expected_required


def test_iter_required_outputs__disable():
    # Get all outputs required for success path (excluding failure, what
    # is still required):
    outputs = TaskOutputs(
        tdef(
            {},
            {'a', 'succeeded', 'b', 'y', 'failed', 'x'},
            '(x and y and failed) or (a and b and succeeded)'
        )
    )

    assert set(outputs.iter_required_messages()) == set()

    # Disabling succeeded leaves us with failure required outputs:
    assert set(
        outputs.iter_required_messages(disable=TASK_OUTPUT_SUCCEEDED)
    ) == {
        TASK_OUTPUT_FAILED,
        'x',
        'y',
    }

    # Disabling failed leaves us with succeeded required outputs:
    assert set(outputs.iter_required_messages(disable=TASK_OUTPUT_FAILED)) == {
        TASK_OUTPUT_SUCCEEDED,
        'a',
        'b',
    }

    # Disabling an abitrary output leaves us with required outputs
    # from another branch:
    assert set(outputs.iter_required_messages(disable='a')) == {
        TASK_OUTPUT_FAILED,
        'x',
        'y',
    }


def test_get_trigger_completion_variable_maps():
    """It should return a bi-map of triggers to compvars."""
    t2c, c2t = get_trigger_completion_variable_maps(('a', 'b-b', 'c-c-c'))
    assert t2c == {'a': 'a', 'b-b': 'b_b', 'c-c-c': 'c_c_c'}
    assert c2t == {'a': 'a', 'b_b': 'b-b', 'c_c_c': 'c-c-c'}