File: test_workunit.py

package info (click to toggle)
receptor 1.5.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,772 kB
  • sloc: python: 1,643; makefile: 305; sh: 174
file content (164 lines) | stat: -rw-r--r-- 5,495 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# The goal is to write tests following the click documentation:
# https://click.palletsprojects.com/en/8.0.x/testing/

import pytest
import time


@pytest.fixture(scope="function")
def wait_for_workunit_state():
    def _wait_for_workunit_state(
        node_controller,
        unitid: str,
        expected_detail: str = None,
        expected_state_name: str = None,
        timeout_seconds: int = 30,
    ) -> bool:
        """Wait for a workunit to finish

        At least 'expected_detail' or 'expected_state_name' must be specified.

        Args:
            node_controller: The node controller used to create the workunit
            unitid: The unitid of the workunit to wait for
            expected_detail: The expected detail of the workunit
            expected_state_name: The expected state name of the workunit
            timeout_seconds: The number of seconds to wait before timing out

        Returns:
            True if the workunit finished, False if it timed out
        """
        if expected_detail is None and expected_state_name is None:
            raise ValueError(
                "At least 'expected_detail' or 'expected_state_name' must be specified"
            )

        remaining_time = timeout_seconds

        if expected_detail is not None:
            for _ in range(remaining_time):
                status = node_controller.simple_command("work status {}".format(unitid))
                if status["Detail"] == expected_detail:
                    return True
                else:
                    time.sleep(1)
                    remaining_time -= 1

        if remaining_time <= 0:
            return False

        if expected_state_name is not None:
            for _ in range(remaining_time):
                status = node_controller.simple_command("work status {}".format(unitid))
                if status["StateName"] == expected_state_name:
                    return True
                else:
                    time.sleep(1)
                    remaining_time -= 1
        return False

    return _wait_for_workunit_state


@pytest.fixture(scope="function")
def wait_for_work_finished(wait_for_workunit_state):
    def _wait_for_work_finished(
        node_controller, unitid: str, timeout_seconds: int = 30
    ) -> bool:
        """Wait for a workunit to finish

        Args:
            node_controller: The node controller used to create the workunit
            unitid: The unitid of the workunit to wait for
            timeout_seconds: The number of seconds to wait before timing out

        Returns:
            True if the workunit finished, False if it timed out
        """

        return wait_for_workunit_state(
            node_controller,
            unitid,
            expected_detail="exit status 0",
            expected_state_name="Succeeded",
            timeout_seconds=timeout_seconds,
        )

    return _wait_for_work_finished


@pytest.mark.usefixtures("receptor_mesh_mesh1")
class TestWorkUnit:
    def test_workunit_simple(
        self,
        invoke_as_json,
        default_receptor_controller_socket_file,
        wait_for_work_finished,
    ):
        # Spawn a long running command
        node1_controller = default_receptor_controller_socket_file

        wait_for = 5  # in seconds

        payload = "That's a long string example! And there's emoji too! 👾"
        work = node1_controller.submit_work("echo-uppercase", payload, node="node3")
        state_result = work.pop("result")
        state_unitid = work.pop("unitid")

        assert state_result == "Job Started"
        assert wait_for_work_finished(
            node1_controller, state_unitid, wait_for
        ), "Workunit timed out and never finished"

        work_result = (
            node1_controller.get_work_results(state_unitid)
            .read()
            .decode("utf-8")
            .strip()
        )

        assert payload.upper() == work_result, (
            f"Workunit did not report the expected result:\n - payload: {payload}"
            f"\n - work_result: {work_result}"
        )

        node1_controller.close()

    def test_workunit_cmd_cancel(
        self,
        invoke_as_json,
        default_receptor_controller_socket_file,
        wait_for_workunit_state,
    ):
        # Spawn a long running command
        node1_controller = default_receptor_controller_socket_file

        sleep_for = 9999  # in seconds
        wait_for = 15  # in seconds

        work = node1_controller.submit_work("sleep", str(sleep_for), node="node3")
        state_result = work.pop("result")
        state_unitid = work.pop("unitid")
        assert state_result == "Job Started"

        # HACK: Wait for the workunit to start
        # receptor should be able to cancel the workunit with this
        time.sleep(5)

        # Run and check cancel command
        cancel_output = node1_controller.simple_command(f"work cancel {state_unitid}")
        assert cancel_output["cancelled"] == state_unitid

        # Wait workunit detail == 'Cancelled'
        assert wait_for_workunit_state(
            node1_controller,
            state_unitid,
            expected_detail="Canceled",
            timeout_seconds=wait_for,
        ), "Workunit timed out and never finished"

        # Get work list and check for the workunit detail state
        work_list = node1_controller.simple_command("work list")
        assert work_list[state_unitid]["Detail"] == "Canceled"

        node1_controller.close()