File: dlt_broker_from_file_spinner_test.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (197 lines) | stat: -rw-r--r-- 8,749 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved.
"""Test DLTBroker with message handler DLTFileSpinner"""
import os
import pytest
import tempfile
import time
import unittest
from unittest.mock import ANY, patch
from queue import Queue, Empty

from dlt.dlt_broker import DLTBroker, logger
from tests.utils import (
    stream_multiple,
    stream_with_params,
    append_stream_to_file,
    create_messages,
    append_message_to_file,
)


class TestDLTBrokerFromDLTFileSpinnerWithNotExistingDLT(unittest.TestCase):
    def setUp(self) -> None:
        self.broker = None
        _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt")

    def tearDown(self) -> None:
        if self.broker:
            self.broker.stop()
        if os.path.exists(self.dlt_file_name):
            os.remove(self.dlt_file_name)

    def test_broker_with_not_existing_dlt_file(self):
        """
        Test DLTBroker could work with not existing dlt file

        1. prepare a file name which does not exist
        2. start dlt broker to dispatch messages from this not-existing file --> no error
        3. dlt broker could not add context successfully, but encounter a warning message
        4. no message could be dispatched from not existing file and throws out Queue.Empty exception
        5. dlt_time is 0.0, because it could not be reset according to the latest timestamp of messages
        """
        # Remove the dlt file
        os.remove(self.dlt_file_name)
        # Start broker with non-existing dlt file
        self.broker = DLTBroker(
            filename=self.dlt_file_name,
            enable_dlt_time=True,
            enable_filter_set_ack=True,
            ignore_filter_set_ack_timeout=True,
        )
        self.broker.start()
        # Add context should report warning message
        queue = Queue(maxsize=0)
        with patch.object(logger, "warning") as logger_mock:
            self.broker.add_context(queue, filters=None)
            logger_mock.assert_called_with(ANY, ANY, [(None, None)], id(queue))
        # Not existing dlt file should not throw any exception out
        for _ in range(5):
            with pytest.raises(Empty):
                queue.get_nowait()
        # dlt_time is not None, even though it is not reset with latest timestamp from messages
        self.assertEqual(self.broker.dlt_time(), 0.0)

    def test_broker_with_later_created_dlt_file(self):
        """
        Simulate a scenario: first dlt file does not exist, then dlt file is created and written with messages.

        1. delete the dlt file
        2. start broker
        3. create the dlt file and write 1 sample message
            Expectation: 1 message could be dispatched from broker
        """
        # 1. delete the dlt file
        os.remove(self.dlt_file_name)
        # 2. Start broker with non-existing dlt file
        self.broker = DLTBroker(
            filename=self.dlt_file_name,
            enable_dlt_time=True,
            enable_filter_set_ack=True,
            ignore_filter_set_ack_timeout=True,
        )
        self.broker.start()
        # Add context should report warning message
        queue = Queue(maxsize=0)
        self.broker.add_context(queue, filters=None)
        # 3. Write 1 sample message to the dlt file
        append_stream_to_file(stream_with_params, self.dlt_file_name)
        # Expectation: 1 message could be dispatched from broker
        time.sleep(0.5)
        self.assertIsNotNone(queue.get_nowait())
        # If we try to dispatch for another time, exception Queue.Empty is thrown,
        # because there is no new log from dlt file
        with pytest.raises(Empty):
            queue.get_nowait()


class TestDLTBrokerFromDLTFileSpinner(unittest.TestCase):
    def setUp(self):
        # Dlt file is created with empty content
        _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt")
        self.dispatched_message_queue = Queue(maxsize=0)
        # Instantiate DLTBroker without ignoring fileter ack timeout
        self.broker = DLTBroker(
            filename=self.dlt_file_name,
            enable_dlt_time=True,
            enable_filter_set_ack=True,
            ignore_filter_set_ack_timeout=True,
        )
        self.broker.start()
        self.broker.add_context(self.dispatched_message_queue, filters=None)

    def tearDown(self):
        self.broker.stop()
        os.remove(self.dlt_file_name)

    def test_001_dispatch_from_empty_dlt_file(self):
        """
        From empty file, no message could be dispatched from queue and raise Queue.Empty.
        dlt_time is 0.0, because it could not be reset according to the latest timestamp of messages
        """
        for _ in range(5):
            with pytest.raises(Empty):
                self.dispatched_message_queue.get_nowait()
        self.assertEqual(self.broker.dlt_time(), 0.0)

    def test_002_dispatch_from_real_dlt_file(self):
        """
        Test DltBroker dispatches from a run-time written dlt file

        With a running dlt broker:
        1. Write 2 sample messages to dlt file
        2. These two messages could be dispatched with the running dlt broker
           With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log;
           dlt_time from dlt_broker is equal to the timestamp of 2nd message
        3. Append another 1 message to the same dlt file
        4. Total 3 messages could be dispatched with the dlt broker
           With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log;
           dlt_time from dlt_broker is equal to the timestamp of 3rd message
        """
        # 1. Write 2 sample messages to dlt file
        append_stream_to_file(stream_multiple, self.dlt_file_name)
        # 2. Dispatch 2 messages from dlt broker
        time.sleep(0.1)
        message_1 = self.dispatched_message_queue.get_nowait()
        time.sleep(0.1)
        message_2 = self.dispatched_message_queue.get_nowait()
        self.assertNotEqual(message_1, message_2)
        # If we try to dispatch for another time, exception Queue.Empty is thrown,
        # because there is no new log from dlt file
        with pytest.raises(Empty):
            self.dispatched_message_queue.get_nowait()
        # Validate dlt time from broker
        self.assertEqual(self.broker.dlt_time(), message_2.storage_timestamp)
        # 3. Append another 1 message to the same dlt file
        append_stream_to_file(stream_with_params, self.dlt_file_name)
        # 4. Total 3 messages could be dispatched with the dlt broker
        time.sleep(0.1)
        message_3 = self.dispatched_message_queue.get_nowait()
        self.assertNotEqual(message_1, message_3)
        self.assertNotEqual(message_2, message_3)
        # If try to dispatch for another time, exception Queue.Empty is thrown,
        # because there is no new log from dlt file
        with pytest.raises(Empty):
            self.dispatched_message_queue.get_nowait()
        # Validate dlt time from broker
        self.assertEqual(self.broker.dlt_time(), message_3.storage_timestamp)

    def test_003_dispatch_from_real_dlt_file(self):
        """
        Test DltBroker dispatches apid==b"" and ctid==b"" message from a run-time written dlt file

        With a running dlt broker:
        1. Write apid==b"" and ctid==b"" message to dlt file
        2. This message could be dispatched with the running dlt broker
           a. With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log;
           b. dlt_time from dlt_broker is equal to the timestamp of this message
           c. the received message should have apid==b"" and ctid==b""
        """
        # 1. Write apid==b"" and ctid==b"" message to dlt file
        # Construct a message with apid==b"" and ctid==b""
        message = create_messages(stream_with_params, from_file=True)[0]
        message.extendedheader.apid = b""
        message.extendedheader.ctid = b""
        # Write this message into dlt file
        append_message_to_file(message, self.dlt_file_name)
        # 2. Dispatch from dlt broker
        time.sleep(0.5)
        message = self.dispatched_message_queue.get_nowait()
        # If we try to dispatch for another time, exception Queue.Empty is thrown,
        # because there is no new log from dlt file
        with pytest.raises(Empty):
            self.dispatched_message_queue.get_nowait()
        # Validate dlt time from broker
        self.assertEqual(self.broker.dlt_time(), message.storage_timestamp)
        # Expectation: the received message should have apid==b"" and ctid==b""
        self.assertEqual("", message.apid)
        self.assertEqual("", message.ctid)