File: dlt_main_loop_by_reading_dlt_file_unit_test.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (148 lines) | stat: -rw-r--r-- 6,179 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved.
"""Basic unittests for the py_dlt_file_main_loop function"""
import os
import unittest
import tempfile
from threading import Thread
import time

from dlt.dlt import cDLTFile, py_dlt_file_main_loop
from tests.utils import (
    append_stream_to_file,
    stream_multiple,
    stream_with_params,
    create_messages,
    append_message_to_file,
)


class TestMainLoopByReadingDltFile(unittest.TestCase):
    def setUp(self):
        # Empty content dlt file is created
        _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt")
        self.dlt_reader = cDLTFile(filename=self.dlt_file_name, is_live=True, iterate_unblock_mode=False)
        # message_queue to store the dispatched messages from main loop
        self.message_queue = []
        # When callback() is called, then it is reset to True
        self.callback_is_called = False
        # With this variable, we could test different return value from callback()
        # If callback() returns True, then main loop keeps going; otherwise, it breaks
        self.callback_return_value = True
        # Thread for main loop, which is instantiated in test case
        self.main_loop = None

    def _callback_for_message(self, message):
        self.callback_is_called = True
        print("Called here")
        if message:
            self.message_queue.append(message)
        return self.callback_return_value

    def _start_main_loop(self):
        self.main_loop = Thread(
            target=py_dlt_file_main_loop,
            kwargs={"dlt_reader": self.dlt_reader, "callback": self._callback_for_message},
        )
        # self.main_loop.daemon = True
        self.main_loop.start()
        time.sleep(1)

    def tearDown(self):
        if not self.dlt_reader.stop_reading_proc.is_set():
            self.dlt_reader.stop_reading_proc.set()
            # After the stop of dlt_reader, main loop should be stopped automatically
            if self.main_loop:
                for _ in range(5):
                    if not self.main_loop.is_alive():
                        break
                    time.sleep(0.1)
                self.assertFalse(self.main_loop.is_alive())
        os.remove(self.dlt_file_name)

    def test_001_empty_dlt_file(self):
        """When dlt file has empty content, then no message could be dispatched, and no return value from main loop"""
        self._start_main_loop()
        time.sleep(0.1)
        # When file has empty content, callback() will not be called by any message
        self.assertFalse(self.callback_is_called)
        self.assertEqual(0, len(self.message_queue))

    def test_002_first_write_then_read_dlt_file(self):
        """
        Simulate a real dlt file case: first write to it, and then use main loop to read it
        """
        # First write to dlt file without opening main loop
        append_stream_to_file(stream_multiple, self.dlt_file_name)
        time.sleep(0.1)
        # Expectation: py_dlt_file_main_loop reads out the first batch messages to message_queue
        self._start_main_loop()
        time.sleep(0.1)
        self.assertTrue(self.callback_is_called)
        self.assertEqual(2, len(self.message_queue))

    def test_003_first_read_then_write_dlt_file(self):
        """
        Simulate a real dlt file case: first open main loop to read, then write to the file at opening main loop
        """
        # First only main loop to read dlt file
        self._start_main_loop()
        # Then write to dlt file
        append_stream_to_file(stream_multiple, self.dlt_file_name)
        time.sleep(0.1)
        # Expect the written logs could be dispatched by main loop
        self.assertTrue(self.callback_is_called)
        self.assertEqual(2, len(self.message_queue))

    def test_004_read_2_writes(self):
        """
        Test main loop reads from 2 consecutive writes to dlt file
        """
        # First only main loop to read dlt file
        self._start_main_loop()
        # First write to dlt file
        append_stream_to_file(stream_multiple, self.dlt_file_name)
        time.sleep(0.1)
        # Expect main loop could dispatch 2 logs
        self.assertTrue(self.callback_is_called)
        self.assertEqual(2, len(self.message_queue))
        # Second write to dlt file, and expect to dispatch 3 logs
        append_stream_to_file(stream_with_params, self.dlt_file_name)
        time.sleep(0.1)
        self.assertEqual(3, len(self.message_queue))

    def test_005_callback_return_false(self):
        """
        If callback returns false, then main loop should exit
        """
        # Set callback return value to False
        self.callback_return_value = False
        # Write to file
        append_stream_to_file(stream_multiple, self.dlt_file_name)
        time.sleep(0.1)
        # Open main loop to dispatch logs
        self._start_main_loop()
        # Expect main loop could dispatch 2 logs
        self.assertTrue(self.callback_is_called)
        # Callback returns False after it handles the first message, which terminates main loop
        # So, main loop wont be able to proceed the second message
        self.assertEqual(1, len(self.message_queue))
        self.assertFalse(self.main_loop.is_alive())

    def test_006_read_empty_apid_ctid_message(self):
        """
        Simulate a case to read a apid==b"" and ctid==b"" message
        """
        # Construct a message with apid==b"" and ctid==b""
        message = create_messages(stream_with_params, from_file=True)[0]
        message.extendedheader.apid = b""
        message.extendedheader.ctid = b""
        # Write this message into dlt file
        append_message_to_file(message, self.dlt_file_name)
        # Expectation: py_dlt_file_main_loop reads out the first batch messages to message_queue
        self._start_main_loop()
        time.sleep(0.1)
        self.assertTrue(self.callback_is_called)
        self.assertEqual(1, len(self.message_queue))
        # Expectation: the received message should have apid==b"" and ctid==b""
        self.assertEqual("", self.message_queue[0].apid)
        self.assertEqual("", self.message_queue[0].ctid)