1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Working documentation for the integration test framework.
Here are some examples which cover a range of uses (and also provide some
useful testing in the process 😀.)
"""
import asyncio
import logging
from pathlib import Path
import pytest
from cylc.flow import __version__
from cylc.flow.scheduler import Scheduler
async def test_create_flow(flow, run_dir):
"""Use the flow fixture to create workflows on the file system."""
# Ensure a flow.cylc file gets written out
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'graph': {
'R1': 'foo'
}
}
})
workflow_dir = run_dir / id_
flow_file = workflow_dir / 'flow.cylc'
assert workflow_dir.exists()
assert flow_file.exists()
async def test_run(flow, scheduler, run, one_conf):
"""Create a workflow, initialise the scheduler and run it."""
# Ensure the scheduler can survive for at least one second without crashing
id_ = flow(one_conf)
schd = scheduler(id_)
async with run(schd):
await asyncio.sleep(1) # this yields control to the main loop
async def test_logging(flow, scheduler, start, one_conf, log_filter):
"""We can capture log records when we run a scheduler."""
# Ensure that the cylc version is logged on startup.
id_ = flow(one_conf)
schd = scheduler(id_)
async with start(schd):
# this returns a list of log records containing __version__
assert log_filter(contains=__version__)
async def test_scheduler_arguments(flow, scheduler, start, one_conf):
"""We can provide options to the scheduler when we __init__ it.
These options match their command line equivalents.
Use the `dest` value specified in the option parser.
"""
# Ensure the paused_start option is obeyed by the scheduler.
id_ = flow(one_conf)
schd = scheduler(id_, paused_start=True)
async with start(schd):
assert schd.is_paused
id_ = flow(one_conf)
schd = scheduler(id_, paused_start=False)
async with start(schd):
assert not schd.is_paused
async def test_shutdown(flow, scheduler, start, one_conf):
"""Shut down a workflow.
The scheduler automatically shuts down once you exit the `async with`
block, however you can manually shut it down within this block if you
like.
"""
# Ensure the TCP server shuts down with the scheduler.
id_ = flow(one_conf)
schd = scheduler(id_)
async with start(schd):
pass
assert schd.server.replier.socket.closed
async def test_install(flow, scheduler, one_conf, run_dir):
"""You don't have to run workflows, it's usually best not to!
You can take the scheduler through the startup sequence as far as needed
for your test.
"""
# Ensure the installation of the job script is completed.
id_ = flow(one_conf)
schd = scheduler(id_)
await schd.install()
assert Path(
run_dir, schd.workflow, '.service', 'etc', 'job.sh'
).exists()
async def test_task_pool(one, start):
"""You don't have to run the scheduler to play with the task pool.
There are two fixtures to start a scheduler:
`start`
Takes a scheduler through the startup sequence.
`run`
Takes a scheduler through the startup sequence, then sets the main loop
going.
Unless you need the Scheduler main loop running, use `start`.
This test uses a pre-prepared Scheduler called "one".
"""
# Ensure that the correct number of tasks get added to the task pool.
async with start(one):
# pump the scheduler's heart manually
one.pool.release_runahead_tasks()
assert len(one.pool.active_tasks) == 1
async def test_exception(one, run, log_filter):
"""Through an exception into the scheduler to see how it will react.
You have to do this from within the scheduler itself.
The easy way is to patch the object.
"""
class MyException(Exception):
pass
# replace the main loop with something that raises an exception
def killer():
raise MyException('mess')
one._main_loop = killer
# make sure that this error causes the flow to shutdown
with pytest.raises(MyException):
async with run(one):
# The `run` fixture's shutdown logic waits for the main loop to run
pass
# make sure the exception was logged
assert len(log_filter(logging.CRITICAL, contains='mess')) == 1
# make sure the server socket has closed - a good indication of a
# successful clean shutdown
assert one.server.replier.socket.closed
@pytest.fixture(scope='module')
async def myflow(mod_flow, mod_scheduler, mod_one_conf):
"""You can save setup/teardown time by reusing fixtures
Write a module-scoped fixture and it can be shared by all tests in the
current module.
The standard fixtures all have `mod_` alternatives to allow you to do
this.
Pytest has been configured to run all tests from the same module in the
same xdist worker, in other words, module scoped fixtures only get
created once per module, even when distributing tests.
Obviously this goes with the usual warnings about not mutating the
object you are testing in the tests.
"""
id_ = mod_flow(mod_one_conf)
schd = mod_scheduler(id_)
return schd
def test_module_scoped_fixture(myflow):
"""Ensure the host is set on __init__.
The myflow fixture will be shared between all test functions within this
Python module.
"""
assert myflow.host
async def test_db_select(one, start, db_select):
"""Demonstrate and test querying the workflow database."""
# run a workflow
schd: Scheduler = one
async with start(schd):
# Select all from workflow_params table:
assert ('UTC_mode', '0') in db_select(schd, False, 'workflow_params')
# Select name & status columns from task_states table:
results = db_select(schd, False, 'task_states', 'name', 'status')
assert results[0] == ('one', 'waiting')
# Select all columns where name==one & status==waiting from
# task_states table:
results = db_select(
schd, False, 'task_states', name='one', status='waiting')
assert len(results) == 1
async def test_reflog(flow, scheduler, run, reflog, complete):
"""Test the triggering of tasks.
This is the integration test version of "reftest" in the funtional tests.
It works by capturing the triggers which caused each submission so that
they can be compared with the expected outcome.
"""
id_ = flow({
'scheduling': {
'initial cycle point': '1',
'final cycle point': '1',
'cycling mode': 'integer',
'graph': {
'P1': '''
a => b => c
x => b => z
b[-P1] => b
'''
}
}
})
schd = scheduler(id_, paused_start=False)
async with run(schd):
triggers = reflog(schd) # Note: add flow_nums=True to capture flows
await complete(schd)
assert triggers == {
# 1/a was triggered by nothing (i.e. it's parentless)
('1/a', None),
# 1/b was triggered by three tasks (note the pre-initial dependency)
('1/b', ('0/b', '1/a', '1/x')),
('1/c', ('1/b',)),
('1/x', None),
('1/z', ('1/b',)),
}
async def test_reftest(flow, scheduler, reftest):
"""Test the triggering of tasks.
This uses the reftest fixture which combines the reflog and
complete fixtures. Suitable for use when you just want to do a simple
reftest.
"""
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a => b'
}
}
})
schd = scheduler(id_, paused_start=False)
assert await reftest(schd) == {
('1/a', None),
('1/b', ('1/a',)),
}
async def test_show(one: Scheduler, start, cylc_show):
"""Demonstrate the `cylc_show` fixture"""
async with start(one):
out = await cylc_show(one, '1/one')
assert list(out.keys()) == ['1/one']
assert out['1/one']['state'] == 'waiting'
|