1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
from typing import Callable
import pytest
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.scheduler import Scheduler
@pytest.fixture(scope='module')
async def myflow(mod_flow, mod_scheduler, mod_run, mod_one_conf):
id_ = mod_flow(mod_one_conf)
schd = mod_scheduler(id_)
async with mod_run(schd):
yield schd
def test_graphql(myflow):
"""Test GraphQL endpoint method."""
request_string = f'''
query {{
workflows(ids: ["{myflow.id}"]) {{
id
}}
}}
'''
data = myflow.server.graphql(request_string)
assert myflow.id == data['workflows'][0]['id']
def test_graphql_error(myflow):
"""Test GraphQL endpoint method."""
request_string = f'''
query {{
workflows(ids: ["{myflow.id}"]) {{
id
notafield
alsonotafield
}}
}}
'''
with pytest.raises(Exception) as excinfo:
myflow.server.graphql(request_string)
assert "Cannot query field 'notafield'" in excinfo
assert "Cannot query field 'alsonotafield'" in excinfo
def test_pb_data_elements(myflow):
"""Test Protobuf elements endpoint method."""
element_type = 'workflow'
data = PB_METHOD_MAP['pb_data_elements'][element_type]()
data.ParseFromString(
myflow.server.pb_data_elements(element_type)
)
assert data.added.id == myflow.id
def test_pb_entire_workflow(myflow):
"""Test Protobuf entire workflow endpoint method."""
data = PB_METHOD_MAP['pb_entire_workflow']()
data.ParseFromString(
myflow.server.pb_entire_workflow()
)
assert data.workflow.id == myflow.id
async def test_stop(one: Scheduler, start):
"""Test stop."""
async with start(one):
async with asyncio.timeout(2):
# Wait for the server to consume the STOP signal.
# If it doesn't, the test will fail with a asyncio.timeout error.
await one.server.stop('TESTING')
assert one.server.stopped
async def test_receiver_basic(one: Scheduler, start, log_filter):
"""Test the receiver with different message objects."""
async with asyncio.timeout(5):
async with start(one):
# start with a message that works
msg = {'command': 'api', 'user': 'bono', 'args': {}}
res = one.server.receiver(msg)
assert not res.get('error')
assert res['data']
assert res['cylc_version'] == CYLC_VERSION
# simulate a command failure with the original message
# (the one which worked earlier) - should error
def _api(*args, **kwargs):
raise Exception('oopsie')
one.server.api = _api
res = one.server.receiver(msg)
assert res == {
'error': {'message': 'oopsie'},
'cylc_version': CYLC_VERSION,
}
assert log_filter(logging.ERROR, 'oopsie')
@pytest.mark.parametrize(
'msg, expected',
[
pytest.param(
{'user': 'bono', 'args': {}},
"Request missing field 'command' required for"
f" Cylc {CYLC_VERSION}",
id='missing-command',
),
pytest.param(
{'command': 'foobar', 'user': 'bono', 'args': {}},
f"No method by the name 'foobar' at Cylc {CYLC_VERSION}",
id='bad-command',
),
],
)
async def test_receiver_bad_requests(one: Scheduler, start, msg, expected):
"""Test the receiver with different bad requests."""
async with asyncio.timeout(5):
async with start(one):
res = one.server.receiver(msg)
assert res == {
'error': {'message': expected},
'cylc_version': CYLC_VERSION,
}
async def test_publish_before_shutdown(
one: Scheduler, start: Callable
):
"""Test that the server publishes final deltas before shutting down."""
async with start(one):
one.server.publish_queue.put([(b'fake', b'blah')])
await one.server.stop('i said stop!')
assert not one.server.publish_queue.qsize()
|