File: test_wxui_radiothread.py

package info (click to toggle)
chirp 1%3A20221106%2Bpy3-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 11,144 kB
  • sloc: python: 119,334; ansic: 296; sh: 184; makefile: 41
file content (105 lines) | stat: -rw-r--r-- 4,180 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import sys
import time
from unittest import mock

sys.modules['wx'] = wx = mock.MagicMock()

from tests.unit import base
from chirp.wxui import radiothread


class TestRadioThread(base.BaseTest):
    def setUp(self):
        super().setUp()

    def test_radiojob(self):
        radio = mock.MagicMock()
        editor = mock.MagicMock()
        job = radiothread.RadioJob(editor, 'get_memory', [12], {})
        self.assertIsNone(job.dispatch(radio))
        radio.get_memory.assert_called_once_with(12)
        self.assertEqual(job.result, radio.get_memory.return_value)

    def test_radiojob_exception(self):
        radio = mock.MagicMock()
        radio.get_memory.side_effect = ValueError('some error')
        editor = mock.MagicMock()
        job = radiothread.RadioJob(editor, 'get_memory', [12], {})
        self.assertIsNone(job.dispatch(radio))
        radio.get_memory.assert_called_once_with(12)
        self.assertIsInstance(job.result, ValueError)

    def test_thread(self):
        radio = mock.MagicMock()
        radio.get_features.side_effect = ValueError('some error')
        editor = mock.MagicMock()
        # Simulate an edit conflict with the first event by returning
        # False for "delivered" to force us to queue an event.
        editor.radio_thread_event.side_effect = [False, True, True, True]
        thread = radiothread.RadioThread(radio)
        mem = mock.MagicMock()
        job1id = thread.submit(editor, 'get_memory', 12)
        job2id = thread.submit(editor, 'set_memory', mem)
        job3id = thread.submit(editor, 'get_features')
        # We have to start the thread after we submit the main jobs so
        # the order is stable for comparison.
        thread.start()

        # Wait for the main jobs to be processed before we signal exit
        while not all([radio.get_memory.called,
                       radio.set_memory.called,
                       radio.get_features.called]):
            time.sleep(0.1)

        thread.end()
        thread.join(5)
        self.assertFalse(thread.is_alive())
        radio.get_memory.assert_called_once_with(12)
        radio.set_memory.assert_called_once_with(mem)
        radio.get_features.assert_called_once_with()
        self.assertEqual(4, editor.radio_thread_event.call_count)

        # We expect the jobs to be delivered in order of
        # priority. Since we return False for the first call to
        # radio_thread_event(), job2 should be queued and then
        # delivered first on the next cycle.
        expected_order = [job2id, job2id, job3id, job1id]
        for i, (jobid, call) in enumerate(
                zip(expected_order,
                    editor.radio_thread_event.call_args_list)):
            job = call[0][0]
            self.assertEqual(jobid, job.id)

        # We should call non-blocking for every call except the last
        # one, when the queue is empty
        editor.radio_thread_event.assert_has_calls([
            mock.call(mock.ANY, block=False),
            mock.call(mock.ANY, block=False),
            mock.call(mock.ANY, block=False),
            mock.call(mock.ANY, block=True),
        ])

    def test_thread_abort_priority(self):
        radio = mock.MagicMock()
        radio.get_features.side_effect = ValueError('some error')
        editor = mock.MagicMock()
        thread = radiothread.RadioThread(radio)
        mem = mock.MagicMock()
        job1id = thread.submit(editor, 'get_memory', 12)
        job2id = thread.submit(editor, 'set_memory', mem)
        job3id = thread.submit(editor, 'get_features')
        thread.end()
        # We have to start the thread after we submit the main jobs so
        # the order is stable for comparison.
        thread.start()

        thread.join(5)
        self.assertFalse(thread.is_alive())

        # Our end sentinel should have gone to the head of the queue
        # so that exiting the application does not leave a thread
        # running in the background fetching hundreds of memories.
        radio.get_memory.assert_not_called()
        radio.set_memory.assert_not_called()
        radio.get_features.assert_not_called()
        wx.PostEvent.assert_not_called()