File: test_flow_validate.py

package info (click to toggle)
python-globus-sdk 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,172 kB
  • sloc: python: 35,227; sh: 44; makefile: 35
file content (46 lines) | stat: -rw-r--r-- 1,418 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import json

import pytest

from globus_sdk import MISSING, FlowsAPIError
from globus_sdk.testing import get_last_request, load_response


@pytest.mark.parametrize("input_schema", [MISSING, {}])
def test_validate_flow(flows_client, input_schema):
    metadata = load_response(flows_client.validate_flow).metadata

    # Prepare the payload
    payload = {"definition": metadata["success"]}
    if input_schema is not MISSING:
        payload["input_schema"] = input_schema

    resp = flows_client.validate_flow(**payload)
    assert resp.data["scopes"] == {
        "User": ["urn:globus:auth:scope:transfer.api.globus.org:all"]
    }

    # Check what was actually sent
    last_req = get_last_request()
    req_body = json.loads(last_req.body)
    # Ensure the input schema is not sent if omitted
    assert req_body == payload


def test_validate_flow_error_parsing(flows_client):
    metadata = load_response(
        flows_client.validate_flow, case="definition_error"
    ).metadata

    # Make sure we get an error response
    with pytest.raises(FlowsAPIError) as excinfo:
        flows_client.validate_flow(definition=metadata["invalid"])

    err = excinfo.value
    assert err.code == "UNPROCESSABLE_ENTITY"
    assert err.messages == [
        (
            "A state of type 'Action' must be defined as either terminal "
            '("End": true) or transitional ("Next": "NextStateId")'
        ),
    ]