File: test_radio_udp.py

package info (click to toggle)
python-parsl 2025.12.01%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,112 kB
  • sloc: python: 24,369; makefile: 352; sh: 252; ansic: 45
file content (204 lines) | stat: -rw-r--r-- 6,448 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import socket
import time

import pytest

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios.udp import UDPRadio
from parsl.multiprocessing import SpawnQueue


@pytest.mark.local
def test_udp(tmpd_cwd):
    """Test UDP radio/receiver pair.
    This test checks that the pair can be started up locally, that a message
    is conveyed from radio to receiver, and that the receiver process goes
    away at shutdown.
    """

    resource_msgs = SpawnQueue()

    radio_config = UDPRadio(address="localhost", atexit_timeout=0)

    # start receiver
    udp_receiver = radio_config.create_receiver(run_dir=str(tmpd_cwd),
                                                resource_msgs=resource_msgs)

    # check hash properties

    assert len(radio_config.hmac_key) == 64, "With default hash, should expect 64 byte key"

    # make radio

    radio_sender = radio_config.create_sender()

    # send message into radio

    message = (MessageType.RESOURCE_INFO, {})

    radio_sender.send(message)

    # verify it comes out of the receiver

    m = resource_msgs.get()

    assert m == message, "The sent message should appear in the queue"

    # shut down router

    udp_receiver.shutdown()

    # we can't inspect the process if it has been closed properly, but
    # we can verify that it raises the expected ValueError the closed
    # processes raise on access.
    with pytest.raises(ValueError):
        udp_receiver.process.exitcode


@pytest.mark.local
def test_bad_hmac(tmpd_cwd, caplog):
    """Test when HMAC does not match.
    """

    resource_msgs = SpawnQueue()

    radio_config = UDPRadio(address="localhost", atexit_timeout=0)

    # start receiver
    udp_receiver = radio_config.create_receiver(run_dir=str(tmpd_cwd),
                                                resource_msgs=resource_msgs)

    # check the hmac is configured in the right place,
    # then change it to something different (by prepending a new byte)
    assert radio_config.hmac_key is not None
    radio_config.hmac_key += b'x'

    # make radio, after changing the HMAC key

    radio_sender = radio_config.create_sender()

    # send message into radio

    message = (MessageType.RESOURCE_INFO, {})

    radio_sender.send(message)

    # We should expect no message from the UDP side. That's hard to
    # state in this scenario because UDP doesn't have any delivery
    # guarantees for the test-failing case.
    # So sleep a while to allow that test to misdeliver and fail.
    time.sleep(1)

    assert resource_msgs.empty(), "receiving queue should be empty"
    assert udp_receiver.process.is_alive(), "UDP router process should still be alive"

    with open(f"{tmpd_cwd}/monitoring_udp_router.log", "r") as logfile:
        assert "ERROR" in logfile.read(), "Router log file should contain an error"

    # shut down router

    udp_receiver.shutdown()

    # we can't inspect the process if it has been closed properly, but
    # we can verify that it raises the expected ValueError the closed
    # processes raise on access.
    with pytest.raises(ValueError):
        udp_receiver.process.exitcode


@pytest.mark.local
def test_wrong_digest(tmpd_cwd, caplog):
    """Test when HMAC does not match.
    """

    resource_msgs = SpawnQueue()

    radio_config = UDPRadio(address="localhost", atexit_timeout=0)

    # start receiver
    udp_receiver = radio_config.create_receiver(run_dir=str(tmpd_cwd),
                                                resource_msgs=resource_msgs)

    # check the hmac is configured in the right place,
    # then change it to a different digest. The choice of different
    # digest is arbitrary.
    assert radio_config.hmac_digest is not None
    radio_config.hmac_digest = "sha3_224"

    # make radio, after changing the HMAC digest

    radio_sender = radio_config.create_sender()

    # send message into radio

    message = (MessageType.RESOURCE_INFO, {})

    radio_sender.send(message)

    # We should expect no message from the UDP side. That's hard to
    # state in this scenario because UDP doesn't have any delivery
    # guarantees for the test-failing case.
    # So sleep a while to allow that test to misdeliver and fail.
    time.sleep(1)

    assert resource_msgs.empty(), "receiving queue should be empty"
    assert udp_receiver.process.is_alive(), "UDP router process should still be alive"

    with open(f"{tmpd_cwd}/monitoring_udp_router.log", "r") as logfile:
        assert "ERROR" in logfile.read(), "Router log file should contain an error"

    # shut down router

    udp_receiver.shutdown()

    # we can't inspect the process if it has been closed properly, but
    # we can verify that it raises the expected ValueError the closed
    # processes raise on access.
    with pytest.raises(ValueError):
        udp_receiver.process.exitcode


@pytest.mark.local
def test_short_message(tmpd_cwd, caplog):
    """Test when UDP message is so short it can't even be parsed into
    HMAC + the rest.
    """

    resource_msgs = SpawnQueue()

    radio_config = UDPRadio(address="localhost", atexit_timeout=0)

    # start receiver
    udp_receiver = radio_config.create_receiver(run_dir=str(tmpd_cwd),
                                                resource_msgs=resource_msgs)

    # now send a bad UDP message, rather than using the sender mechanism.

    sock = socket.socket(socket.AF_INET,
                         socket.SOCK_DGRAM,
                         socket.IPPROTO_UDP)

    sock.sendto(b'', (radio_config.address, radio_config.port))
    sock.close()

    # We should expect no message from the UDP side. That's hard to
    # state in this scenario because UDP doesn't have any delivery
    # guarantees for the test-failing case.
    # So sleep a while to allow that test to misdeliver and fail.
    time.sleep(1)

    assert resource_msgs.empty(), "receiving queue should be empty"
    assert udp_receiver.process.is_alive(), "UDP router process should still be alive"

    with open(f"{tmpd_cwd}/monitoring_udp_router.log", "r") as logfile:
        assert "ERROR" in logfile.read(), "Router log file should contain an error"

    # shut down router

    udp_receiver.shutdown()

    # we can't inspect the process if it has been closed properly, but
    # we can verify that it raises the expected ValueError the closed
    # processes raise on access.
    with pytest.raises(ValueError):
        udp_receiver.process.exitcode