File: dlt_broker_time_test.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (331 lines) | stat: -rw-r--r-- 11,120 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# Copyright (C) 2021. BMW Car IT GmbH. All rights reserved.
"""Test DLTBroker with enabling dlt_time"""
from contextlib import contextmanager
from multiprocessing import Queue
import queue as tqueue
import time
from unittest.mock import ANY, patch, MagicMock

import pytest

from dlt.dlt_broker import create_filter_ack_queue, DLTBroker, logger
from dlt.dlt_broker_handlers import DLTContextHandler, DLTFilterAckMessageHandler, DLTMessageHandler
from tests.utils import MockDLTMessage


def fake_py_dlt_client_main_loop(client, callback, *args, **kwargs):
    return True


@contextmanager
def dlt_broker(pydlt_main_func=fake_py_dlt_client_main_loop, enable_dlt_time=True, enable_filter_set_ack=False):
    """Initialize a fake DLTBroker"""
    with patch("dlt.dlt_broker_handlers.DLTMessageHandler._client_connect"), patch(
        "dlt.dlt_broker_handlers.py_dlt_client_main_loop", side_effect=pydlt_main_func
    ):
        broker = DLTBroker("42.42.42.42", enable_dlt_time=enable_dlt_time, enable_filter_set_ack=enable_filter_set_ack)
        broker.msg_handler._client = MagicMock()

        try:
            broker.start()

            yield broker

        finally:
            broker.stop()


@contextmanager
def dlt_filter_ack_msg_handler():
    queue = Queue()

    handler = DLTFilterAckMessageHandler(queue)
    try:
        handler.start()
        queue.cancel_join_thread()

        yield (handler, queue)
    finally:
        handler.stop()
        queue.close()


def fake_dlt_msg_handler(msg, with_filter_ack_queue):
    """Create a fake DLTMessageHandler"""
    filter_queue = MagicMock()
    filter_ack_queue = MagicMock() if with_filter_ack_queue else None
    client_cfg = {"ip_address": b"127.0.0.1", "filename": b"/dev/null", "verbose": 0, "port": "1234"}

    filter_queue.empty.side_effect = [False, True]
    filter_queue.get_nowait.return_value = msg

    return DLTMessageHandler(
        filter_queue, MagicMock(), MagicMock(), client_cfg, dlt_time_value=None, filter_ack_queue=filter_ack_queue
    )


def test_start_stop_dlt_broker():
    """Test to stop DLTBroker with dlt-time normally"""
    with dlt_broker(fake_py_dlt_client_main_loop, enable_dlt_time=True) as broker:
        assert broker._dlt_time_value


def test_start_stop_dlt_broker_without_dlt_time():
    """Test to stop DLTBroker without dlt-time normally"""
    with dlt_broker(fake_py_dlt_client_main_loop, enable_dlt_time=False) as broker:
        assert not broker._dlt_time_value


@pytest.mark.parametrize(
    "input_sec,input_msec,expected_value",
    [
        (42, 42, 42.42),  # normal test case
        (1618993559, 7377682, 1618993559.7377682),  # big value. The value will be truncated when type is not double
    ],
)
def test_dlt_broker_get_dlt_time(input_sec, input_msec, expected_value):
    """Test to get time from DLTBroker"""

    def handle(client, callback=None, *args, **kwargs):
        return callback(MockDLTMessage(payload="test_payload", sec=input_sec, msec=input_msec))

    with dlt_broker(handle) as broker:
        time.sleep(0.01)

    assert broker.dlt_time() == expected_value


def test_dlt_broker_get_latest_dlt_time():
    """Test to get the latest time from DLTBroker"""
    # ref: https://stackoverflow.com/questions/3190706/nonlocal-keyword-in-python-2-x
    time_value = {"v": 42}

    def handle(client, callback=None, *args, **kwargs):
        if time_value["v"] < 45:
            time_value["v"] += 1

        time.sleep(0.01)
        return callback(MockDLTMessage(payload="test_payload", sec=time_value["v"], msec=42))

    with dlt_broker(handle) as broker:
        time_vals = set()
        for i in range(10):
            time_vals.add(broker.dlt_time())
            time.sleep(0.01)

    assert sorted(time_vals) == [0.0, 43.42, 44.42, 45.42]


def test_start_stop_dlt_broker_with_dlt_ack_msg_handler():
    """Test to stop DLTBroker with ack msg handler normally"""
    with dlt_broker(fake_py_dlt_client_main_loop, enable_dlt_time=True, enable_filter_set_ack=True) as broker:
        assert broker.filter_ack_msg_handler


def test_start_stop_dlt_broker_without_dlt_ack_msg_handler():
    """Test to stop DLTBroker without ack msg handler normally"""
    with dlt_broker(fake_py_dlt_client_main_loop, enable_dlt_time=True, enable_filter_set_ack=False) as broker:
        assert not broker.filter_ack_msg_handler


def test_create_filter_ack_queue():
    """Test to register and unregister an ack queue"""
    handler_mock = MagicMock()

    with create_filter_ack_queue(handler_mock) as queue:
        queue.put(True)
        assert queue.get()

    handler_mock.register.assert_called_with(queue)
    handler_mock.unregister.assert_called_with(queue)


@pytest.mark.parametrize(
    "ack,required_ack,return_val",
    [
        (True, True, True),
        (False, False, True),
        (True, False, False),
        (False, True, False),
    ],
)
def test_recv_filter_set_ack(ack, required_ack, return_val):
    """Test to receive an ack value"""
    queue = tqueue.Queue()

    queue.put(ack)
    with dlt_broker(enable_filter_set_ack=True) as broker:
        assert return_val == broker._recv_filter_set_ack(queue, required_ack)


def test_recv_filter_set_ack_timeout_ignore():
    """Test not to receive an ack value"""
    queue = tqueue.Queue()

    with dlt_broker(enable_filter_set_ack=True) as broker:
        broker.filter_set_ack_timeout = 0.01
        broker.ignore_filter_set_ack_timeout = True

        assert not broker._recv_filter_set_ack(queue, True)


def test_recv_filter_set_ack_timeout_exception():
    """Test not to receive an ack value and with an exception"""
    queue = tqueue.Queue()

    with dlt_broker(enable_filter_set_ack=True) as broker:
        broker.filter_set_ack_timeout = 0.01
        broker.ignore_filter_set_ack_timeout = False

        with pytest.raises(tqueue.Empty) as err:
            broker._recv_filter_set_ack(queue, True)

        assert not str(err.value)


def test_add_context_with_ack():
    """Test to send a filter-setting message with required ack"""
    queue = tqueue.Queue()

    with patch("dlt.dlt_broker.DLTBroker._recv_filter_set_ack", return_value=True) as ack_mock:
        with dlt_broker(enable_filter_set_ack=True) as broker:
            ori_context_handler = broker.context_handler
            broker.context_handler = MagicMock()
            try:
                broker.add_context(queue, [("APID", "CTID")])

                broker.context_handler.register.assert_called()
                ack_mock.assert_called()
            finally:
                broker.context_handler = ori_context_handler


def test_add_context_with_ack_warning():
    """Test to send a filter-setting message but not received an ack"""
    queue = tqueue.Queue()

    with patch("dlt.dlt_broker.DLTBroker._recv_filter_set_ack", return_value=False) as ack_mock, patch.object(
        logger, "warning"
    ) as logger_mock:
        with dlt_broker(enable_filter_set_ack=True) as broker:
            ori_context_handler = broker.context_handler
            broker.context_handler = MagicMock()
            try:
                broker.add_context(queue, [("APID", "CTID")])

                broker.context_handler.register.assert_called()
                ack_mock.assert_called()

                logger_mock.assert_called_with(ANY, ANY, [("APID", "CTID")], id(queue))
            finally:
                broker.context_handler = ori_context_handler


def test_start_stop_dlt_filter_ack_msg_handler():
    """Test to start/stop DLTFilterAckMessageHandler normally"""
    with dlt_filter_ack_msg_handler() as (handler, _):
        pass

    assert not handler.is_alive()


def test_dlt_filter_ack_msg_handler_register():
    """Test to register a new ack queue into DLTFilterAckMessageHandler"""
    queue_ack = tqueue.Queue()

    with dlt_filter_ack_msg_handler() as (handler, queue):
        handler.register(queue_ack)

        queue.put((id(queue_ack), True))
        assert queue_ack.get()


def test_dlt_filter_ack_msg_handler_unregister():
    """Test to unregister a new ack queue into DLTFilterAckMessageHandler"""
    queue_ack = tqueue.Queue()

    with dlt_filter_ack_msg_handler() as (handler, queue):
        handler.register(queue_ack)

        handler.unregister(queue_ack)
        with pytest.raises(tqueue.Empty):
            queue.put((id(queue_ack), False))
            queue_ack.get_nowait()


def test_make_send_filter_msg():
    """Test to generate a filter message"""
    handler = DLTContextHandler(MagicMock(), MagicMock())

    is_register = True
    filters = [("APID", "CTID")]
    queue = MagicMock()

    assert handler._make_send_filter_msg(queue, filters, is_register) == (id(queue), filters, is_register)


def test_make_send_filter_msg_with_ack_queue():
    """Test to generate a filter message with ack queue setting"""
    handler = DLTContextHandler(MagicMock(), MagicMock())

    is_register = True
    filters = [("APID", "CTID")]
    queue = MagicMock()
    queue_ack = MagicMock()

    assert handler._make_send_filter_msg(queue, filters, is_register, context_filter_ack_queue=queue_ack) == (
        id(queue),
        id(queue_ack),
        filters,
        is_register,
    )


def test_dlt_message_handler_process_filter_queue_add():
    """Test to add a filter"""
    handler = fake_dlt_msg_handler(msg=(42, [("APID", "CTID")], True), with_filter_ack_queue=True)
    handler._process_filter_queue()

    assert handler.context_map[("APID", "CTID")] == [42]
    handler.filter_ack_queue.put.assert_not_called()


def test_dlt_message_handler_process_filter_queue_add_ack():
    """Test to add a filter with ack"""
    handler = fake_dlt_msg_handler(msg=(42, 43, [("APID", "CTID")], True), with_filter_ack_queue=True)
    handler._process_filter_queue()

    assert handler.context_map[("APID", "CTID")] == [42]
    handler.filter_ack_queue.put.assert_called_with((43, True))


def test_dlt_message_handler_process_filter_queue_remove():
    """Test to remove a filter"""
    handler = fake_dlt_msg_handler(msg=(42, [("APID", "CTID")], False), with_filter_ack_queue=True)
    handler.context_map[("APID", "CTID")].append(42)

    handler._process_filter_queue()

    assert ("APID", "CTID") not in handler.context_map
    handler.filter_ack_queue.put.assert_not_called()


def test_dlt_message_handler_process_filter_queue_remove_ack():
    """Test to remove a filter with ack"""
    handler = fake_dlt_msg_handler(msg=(42, 43, [("APID", "CTID")], False), with_filter_ack_queue=True)
    handler.context_map[("APID", "CTID")].append(42)

    handler._process_filter_queue()

    assert ("APID", "CTID") not in handler.context_map
    handler.filter_ack_queue.put.assert_called_with((43, False))


def test_dlt_message_handler_process_filter_queue_remove_exception():
    """Test to remove a filter when the filter does not exists"""
    handler = fake_dlt_msg_handler(msg=(42, [("APID", "CTID")], False), with_filter_ack_queue=True)

    handler._process_filter_queue()

    assert not handler.context_map[("APID", "CTID")]
    handler.filter_ack_queue.put.assert_not_called()