File: test_status.py

package info (click to toggle)
pyotgw 2.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 408 kB
  • sloc: python: 4,580; sh: 5; makefile: 2
file content (227 lines) | stat: -rw-r--r-- 6,798 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""Tests for pyotgw/status.py"""

import asyncio
import logging
from unittest.mock import MagicMock

import pytest

import pyotgw.vars as v
from tests.helpers import called_once


def test_reset(pygw_status):
    """Test StatusManager.reset()"""
    assert pygw_status.status == v.DEFAULT_STATUS

    pygw_status.submit_partial_update(v.OTGW, {"Test": "value"})

    assert pygw_status.status != v.DEFAULT_STATUS
    assert not pygw_status._updateq.empty()

    pygw_status.reset()

    assert pygw_status.status == v.DEFAULT_STATUS
    assert pygw_status._updateq.get_nowait() == v.DEFAULT_STATUS
    assert pygw_status._updateq.empty()


def test_status(pygw_status):
    """Test StatusManager.status()"""
    assert pygw_status.status == pygw_status._status
    assert pygw_status.status is not pygw_status._status


def test_delete_value(pygw_status):
    """Test StatusManager.delete_value()"""
    assert not pygw_status.delete_value("Invalid", v.OTGW_MODE)
    assert not pygw_status.delete_value(v.OTGW, v.OTGW_MODE)

    pygw_status.submit_partial_update(v.THERMOSTAT, {v.DATA_ROOM_SETPOINT: 20.5})
    pygw_status._updateq.get_nowait()

    assert pygw_status.delete_value(v.THERMOSTAT, v.DATA_ROOM_SETPOINT)
    assert pygw_status._updateq.get_nowait() == v.DEFAULT_STATUS


def test_submit_partial_update(caplog, pygw_status):
    """Test StatusManager.submit_partial_update()"""
    with caplog.at_level(logging.ERROR):
        assert not pygw_status.submit_partial_update("Invalid", {})

    assert pygw_status._updateq.empty()
    assert caplog.record_tuples == [
        (
            "pyotgw.status",
            logging.ERROR,
            "Invalid status part for update: Invalid",
        ),
    ]
    caplog.clear()

    with caplog.at_level(logging.ERROR):
        assert not pygw_status.submit_partial_update(v.OTGW, "Invalid")

    assert pygw_status._updateq.empty()
    assert caplog.record_tuples == [
        (
            "pyotgw.status",
            logging.ERROR,
            f"Update for {v.OTGW} is not a dict: Invalid",
        ),
    ]
    caplog.clear()

    pygw_status.submit_partial_update(v.BOILER, {v.DATA_CONTROL_SETPOINT: 1.5})
    pygw_status.submit_partial_update(v.OTGW, {v.OTGW_ABOUT: "test value"})
    pygw_status.submit_partial_update(v.THERMOSTAT, {v.DATA_ROOM_SETPOINT: 20})

    assert pygw_status.status == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {v.OTGW_ABOUT: "test value"},
        v.THERMOSTAT: {v.DATA_ROOM_SETPOINT: 20},
    }
    assert pygw_status._updateq.qsize() == 3
    assert pygw_status._updateq.get_nowait() == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {},
        v.THERMOSTAT: {},
    }
    assert pygw_status._updateq.get_nowait() == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {v.OTGW_ABOUT: "test value"},
        v.THERMOSTAT: {},
    }
    assert pygw_status._updateq.get_nowait() == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {v.OTGW_ABOUT: "test value"},
        v.THERMOSTAT: {v.DATA_ROOM_SETPOINT: 20},
    }


def test_submit_full_update(caplog, pygw_status):
    """Test StatusManager.submit_full_update()"""
    assert pygw_status.submit_full_update({})
    assert pygw_status._updateq.qsize() == 1
    assert pygw_status._updateq.get_nowait() == v.DEFAULT_STATUS

    with caplog.at_level(logging.ERROR):
        pygw_status.submit_full_update({"Invalid": {}})

    assert pygw_status._updateq.empty()
    assert caplog.record_tuples == [
        (
            "pyotgw.status",
            logging.ERROR,
            "Invalid status part for update: Invalid",
        ),
    ]
    caplog.clear()

    with caplog.at_level(logging.ERROR):
        pygw_status.submit_full_update({v.OTGW: "Invalid"})

    assert pygw_status._updateq.empty()
    assert caplog.record_tuples == [
        (
            "pyotgw.status",
            logging.ERROR,
            f"Update for {v.OTGW} is not a dict: Invalid",
        ),
    ]
    caplog.clear()

    pygw_status.submit_full_update(
        {
            v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
            v.OTGW: {v.OTGW_ABOUT: "test value"},
            v.THERMOSTAT: {v.DATA_ROOM_SETPOINT: 20},
        }
    )

    assert pygw_status.status == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {v.OTGW_ABOUT: "test value"},
        v.THERMOSTAT: {v.DATA_ROOM_SETPOINT: 20},
    }
    assert pygw_status._updateq.qsize() == 1
    assert pygw_status._updateq.get_nowait() == {
        v.BOILER: {v.DATA_CONTROL_SETPOINT: 1.5},
        v.OTGW: {v.OTGW_ABOUT: "test value"},
        v.THERMOSTAT: {v.DATA_ROOM_SETPOINT: 20},
    }


def test_subscribe(pygw_status):
    """Test StatusManager.subscribe()"""

    def empty_callback():
        return

    assert pygw_status.subscribe(empty_callback)
    assert empty_callback in pygw_status._notify
    assert not pygw_status.subscribe(empty_callback)


def test_unsubscribe(pygw_status):
    """Test StatusManager.unsubscribe()"""

    def empty_callback():
        return

    assert not pygw_status.unsubscribe(empty_callback)
    pygw_status.subscribe(empty_callback)
    assert pygw_status.unsubscribe(empty_callback)
    assert empty_callback not in pygw_status._notify


@pytest.mark.asyncio
async def test_stop_reporting(pygw_status):
    """Test StatusManager.stop_reporting()"""
    assert isinstance(pygw_status._update_task, asyncio.Task)
    await pygw_status.cleanup()
    assert pygw_status._update_task is None


@pytest.mark.asyncio
async def test_process_updates(caplog, pygw_status):
    """Test StatusManager._process_updates()"""

    await pygw_status.cleanup()
    pygw_status.__init__()
    with caplog.at_level(logging.DEBUG):
        # Let the reporting routine start
        await asyncio.sleep(0)

    assert isinstance(pygw_status._update_task, asyncio.Task)
    assert caplog.record_tuples == [
        (
            "pyotgw.status",
            logging.DEBUG,
            "Starting reporting routine",
        ),
    ]
    caplog.clear()

    async def empty_callback_1(status):
        return

    async def empty_callback_2(status):
        return

    mock_callback_1 = MagicMock(side_effect=empty_callback_1)
    mock_callback_2 = MagicMock(side_effect=empty_callback_2)

    pygw_status.subscribe(mock_callback_1)
    pygw_status.subscribe(mock_callback_2)
    pygw_status.submit_partial_update(v.OTGW, {v.OTGW_ABOUT: "Test Value"})
    await asyncio.gather(called_once(mock_callback_1), called_once(mock_callback_2))

    for mock in (mock_callback_1, mock_callback_2):
        mock.assert_called_once_with(
            {
                v.BOILER: {},
                v.OTGW: {v.OTGW_ABOUT: "Test Value"},
                v.THERMOSTAT: {},
            }
        )