1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved.
import logging
from multiprocessing import Event, Queue
import os
import time
import tempfile
import unittest
from queue import Empty
from dlt.dlt_broker_handlers import DLTFileSpinner
from tests.utils import (
create_messages,
stream_multiple,
stream_with_params,
append_stream_to_file,
append_message_to_file,
)
logger = logging.getLogger(__name__)
class TestDLTFileSpinner(unittest.TestCase):
def setUp(self):
self.filter_queue = Queue()
self.message_queue = Queue()
self.stop_event = Event()
# Dlt file is created with empty content
_, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt")
self.dlt_file_spinner = DLTFileSpinner(
self.filter_queue, self.message_queue, self.stop_event, self.dlt_file_name
)
# dispatched_messages from DLTFileSpinner.message_queue
self.dispatched_messages = []
def tearDown(self):
if self.dlt_file_spinner.is_alive():
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
if os.path.exists(self.dlt_file_name):
os.remove(self.dlt_file_name)
def test_init(self):
self.assertFalse(self.dlt_file_spinner.mp_stop_flag.is_set())
self.assertFalse(self.dlt_file_spinner.is_alive())
self.assertTrue(self.dlt_file_spinner.filter_queue.empty())
self.assertTrue(self.dlt_file_spinner.message_queue.empty())
def test_run_basic_without_dlt_file(self):
# Delete the created dlt file
os.remove(self.dlt_file_name)
self.assertFalse(self.dlt_file_spinner.is_alive())
self.dlt_file_spinner.start()
self.assertTrue(self.dlt_file_spinner.is_alive())
self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid())
# DLT file does NOT exist
self.assertFalse(os.path.exists(self.dlt_file_spinner.file_name))
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
self.assertFalse(self.dlt_file_spinner.is_alive())
def test_run_basic_with_empty_dlt_file(self):
self.assertFalse(self.dlt_file_spinner.is_alive())
self.dlt_file_spinner.start()
self.assertTrue(self.dlt_file_spinner.is_alive())
self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid())
# dlt_reader is instantiated and keeps alive
self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name))
# Expect no dlt log is dispatched
time.sleep(2)
self.assertTrue(self.dlt_file_spinner.message_queue.empty())
# First stop dlt reader, then stop DLTFileSpinner
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
self.assertFalse(self.dlt_file_spinner.is_alive())
def test_handle_add_new_filter(self):
self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], True))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id"])
def test_handle_remove_filter_single_entry(self):
self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], True))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id"])
self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], False))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertNotIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
def test_handle_remove_filter_multiple_entries(self):
self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], True))
self.dlt_file_spinner.filter_queue.put(("queue_id2", [("SYS", "JOUR")], True))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id1", "queue_id2"])
self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], False))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id2"])
def test_handle_multiple_similar_filters(self):
self.dlt_file_spinner.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True))
self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], True))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id0", "queue_id1"])
def test_handle_multiple_different_filters(self):
self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True))
self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True))
time.sleep(0.01)
self.dlt_file_spinner.handle(None)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertIn(("DA1", "DC1"), self.dlt_file_spinner.context_map)
self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id0"])
self.assertEqual(self.dlt_file_spinner.context_map[("DA1", "DC1")], ["queue_id1"])
def test_handle_message_tag_and_distribute(self):
self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True))
self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True))
self.filter_queue.put(("queue_id2", [("SYS", None)], True))
self.filter_queue.put(("queue_id3", [(None, "DC1")], True))
self.filter_queue.put(("queue_id4", [(None, None)], True))
time.sleep(0.01)
# - simulate receiving of messages
for _ in range(10):
for message in create_messages(stream_multiple, from_file=True):
self.dlt_file_spinner.handle(message)
self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map)
self.assertIn(("DA1", "DC1"), self.dlt_file_spinner.context_map)
self.assertIn((None, None), self.dlt_file_spinner.context_map)
self.assertIn(("SYS", None), self.dlt_file_spinner.context_map)
self.assertIn((None, "DC1"), self.dlt_file_spinner.context_map)
try:
# 60 == 10 messages of each for SYS, JOUR and None combinations +
# 10 for (None,None)
messages = [self.message_queue.get(timeout=0.01) for _ in range(60)]
# these queues should not get any messages from other queues
self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id0"]), 10)
self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id1"]), 10)
self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id2"]), 10)
self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id3"]), 10)
# this queue should get all messages
self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id4"]), 20)
except Empty:
# - we should not get an Empty for at least 40 messages
self.fail()
def _update_dispatch_messages_from_dlt_file_spinner(self):
for index in range(60):
try:
message = self.dlt_file_spinner.message_queue.get(timeout=0.01)
if not self.dispatched_messages or message[1] != self.dispatched_messages[-1][1]:
self.dispatched_messages.append(message)
except: # noqa: E722
pass
def test_run_with_writing_to_file(self):
"""
Test with real dlt file, which is written at runtime
1. set filter_queue properly, so that the handled messages could be added to message_queue later
2. start DLTFileSpinner
At this moment, no messages are written to dlt file, so no messages in DLTFileSpinner.message_queue
3. write 2 sample messages to dlt file
Expectation: we could dispatch 2 messages from DLTFileSpinner.message_queue
5. stop DLTFileSpinner
"""
# 1. set filter_queue properly, so that the handled messages could be added to message_queue later
self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True))
self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True))
self.filter_queue.put(("queue_id2", [("SYS", None)], True))
self.filter_queue.put(("queue_id3", [(None, "DC1")], True))
self.filter_queue.put(("queue_id4", [(None, None)], True))
time.sleep(0.01)
# 2. start DLTFileSpinner
self.assertFalse(self.dlt_file_spinner.is_alive())
self.dlt_file_spinner.start()
self.assertTrue(self.dlt_file_spinner.is_alive())
self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid())
# dlt_reader is instantiated and keeps alive
self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name))
# With empty file content, no messages are dispatched to message_queue
time.sleep(2)
self.assertTrue(self.dlt_file_spinner.message_queue.empty())
# 3. write 2 sample messages to dlt file
append_stream_to_file(stream_multiple, self.dlt_file_name)
# Expect the written dlt logs are dispatched to message_queue
self._update_dispatch_messages_from_dlt_file_spinner()
self.assertEqual(2, len(self.dispatched_messages))
# 4. stop DLTFileSpinner
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
self.assertFalse(self.dlt_file_spinner.is_alive())
def test_run_with_writing_to_file_twice(self):
"""
Test with real dlt file, which is written at runtime 2 times
1. set filter_queue properly, so that the handled messages could be added to message_queue later
2. start DLTFileSpinner
3. write 2 sample messages to dlt file
Expectation: we could dispatch 2 messages from DLTFileSpinner.message_queue
4. append 1 sample message to dlt file
Expectation: we could dispatch 3 messages from DLTFileSpinner.message_queue
5. stop DLTFileSpinner
"""
# 1. set filter_queue properly, so that the handled messages could be added to message_queue later
self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True))
self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True))
self.filter_queue.put(("queue_id2", [("SYS", None)], True))
self.filter_queue.put(("queue_id3", [(None, "DC1")], True))
self.filter_queue.put(("queue_id4", [(None, None)], True))
time.sleep(0.01)
# 2. start DLTFileSpinner
self.assertFalse(self.dlt_file_spinner.is_alive())
self.dlt_file_spinner.start()
self.assertTrue(self.dlt_file_spinner.is_alive())
self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid())
# dlt_reader is instantiated and keeps alive
self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name))
# With empty file content, no messages are dispatched to message_queue
time.sleep(2)
self.assertTrue(self.dlt_file_spinner.message_queue.empty())
# 3. write 2 sample messages to dlt file
append_stream_to_file(stream_multiple, self.dlt_file_name)
# Expect the written dlt logs are dispatched to message_queue
self._update_dispatch_messages_from_dlt_file_spinner()
self.assertEqual(2, len(self.dispatched_messages))
# 4. append 1 sample message to dlt file
append_stream_to_file(stream_with_params, self.dlt_file_name)
self._update_dispatch_messages_from_dlt_file_spinner()
self.assertEqual(3, len(self.dispatched_messages))
# 5. stop DLTFileSpinner
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
self.assertFalse(self.dlt_file_spinner.is_alive())
def test_run_with_writing_empty_apid_ctid_to_file(self):
"""
Test with real dlt file, which contains message with apid=b"" and ctid=b""
1. set filter_queue properly, so that the handled messages could be added to message_queue later
2. start DLTFileSpinner
At this moment, no messages are written to dlt file, so no messages in DLTFileSpinner.message_queue
3. write message with apid=b"" and ctid=b"" to dlt file
Expectation: we could dispatch 1 message from DLTFileSpinner.message_queue
and, apid==b"" and ctid==b""
5. stop DLTFileSpinner
"""
# 1. set filter_queue properly, so that the handled messages could be added to message_queue later
self.filter_queue.put(("queue_id0", [(None, None)], True))
time.sleep(0.01)
# 2. start DLTFileSpinner
self.assertFalse(self.dlt_file_spinner.is_alive())
self.dlt_file_spinner.start()
self.assertTrue(self.dlt_file_spinner.is_alive())
self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid())
# dlt_reader is instantiated and keeps alive
self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name))
# With empty file content, no messages are dispatched to message_queue
time.sleep(2)
self.assertTrue(self.dlt_file_spinner.message_queue.empty())
# 3. write a message to dlt file
# Construct a message with apid==b"" and ctid==b""
message = create_messages(stream_with_params, from_file=True)[0]
message.extendedheader.apid = b""
message.extendedheader.ctid = b""
# Write this message into dlt file
append_message_to_file(message, self.dlt_file_name)
# Expect the written dlt logs are dispatched to message_queue
self._update_dispatch_messages_from_dlt_file_spinner()
self.assertEqual(1, len(self.dispatched_messages))
# Expectation: the received message should have apid==b"" and ctid==b""
self.assertEqual("", self.dispatched_messages[0][1].apid)
self.assertEqual("", self.dispatched_messages[0][1].ctid)
# 4. stop DLTFileSpinner
self.dlt_file_spinner.break_blocking_main_loop()
self.stop_event.set()
self.dlt_file_spinner.join()
self.assertFalse(self.dlt_file_spinner.is_alive())
|