File: test_validate_run.py

package info (click to toggle)
python-globus-sdk 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,172 kB
  • sloc: python: 35,227; sh: 44; makefile: 35
file content (43 lines) | stat: -rw-r--r-- 1,415 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from __future__ import annotations

import pytest

from globus_sdk import FlowsAPIError, SpecificFlowClient
from globus_sdk.testing import load_response


def test_validate_run(specific_flow_client_class: type[SpecificFlowClient]):
    metadata = load_response(SpecificFlowClient.validate_run).metadata

    flow_client = specific_flow_client_class(flow_id=metadata["flow_id"])

    resp = flow_client.validate_run(body=metadata["request_body"])
    assert resp.http_status == 200


def test_validate_run_returns_error_for_invalid_payload(
    specific_flow_client_class: type[SpecificFlowClient],
):
    metadata = load_response(
        SpecificFlowClient.validate_run, case="invalid_input_payload"
    ).metadata

    flow_client = specific_flow_client_class(flow_id=metadata["flow_id"])

    with pytest.raises(FlowsAPIError) as error:
        flow_client.validate_run(body=metadata["request_body"])
    assert error.value.http_status == 400


def test_validate_run_returns_error_for_lacking_run_permission(
    specific_flow_client_class: type[SpecificFlowClient],
):
    metadata = load_response(
        SpecificFlowClient.validate_run, case="not_a_flow_starter"
    ).metadata

    flow_client = specific_flow_client_class(flow_id=metadata["flow_id"])

    with pytest.raises(FlowsAPIError) as error:
        flow_client.validate_run(body=metadata["request_body"])
    assert error.value.http_status == 403