File: dlt_context_handler_unit_test.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (182 lines) | stat: -rw-r--r-- 6,773 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright (C) 2016. BMW Car IT GmbH. All rights reserved.
from multiprocessing import Queue as mp_queue
from queue import Empty, Queue
import time
import unittest

from dlt.dlt_broker_handlers import DLTContextHandler
from tests.utils import create_messages, stream_one, stream_multiple


class TestDLTContextHandler(unittest.TestCase):
    def setUp(self):
        self.filter_queue = mp_queue()
        self.message_queue = mp_queue()
        self.handler = DLTContextHandler(self.filter_queue, self.message_queue)

    def test_init(self):
        self.assertFalse(self.handler.stop_flag.is_set())
        self.assertFalse(self.handler.is_alive())
        self.assertTrue(self.handler.filter_queue.empty())
        self.assertTrue(self.handler.message_queue.empty())

    def test_register_no_filter(self):
        queue = Queue()
        queue_id = id(queue)

        self.handler.register(queue)

        # When no filter is specified, filter (None, None) should be
        # added (ie: match all messages)
        self.assertIn(queue_id, self.handler.context_map)
        self.assertEqual(self.handler.context_map[queue_id], (queue, [(None, None)]))
        self.assertEqual(self.handler.filter_queue.get(), (queue_id, [(None, None)], True))

    def test_register_single_filter(self):
        queue = Queue()
        queue_id = id(queue)
        filters = ("SYS", "JOUR")

        self.handler.register(queue, filters)

        # Specified, filter should be added to filter_queue
        self.assertIn(queue_id, self.handler.context_map)
        self.assertEqual(self.handler.context_map[queue_id], (queue, filters))
        self.assertEqual(self.handler.filter_queue.get(), (queue_id, filters, True))

    def test_register_similar_filters(self):
        queue0 = Queue()
        queue_id0 = id(queue0)
        filters0 = ("SYS", "JOUR")

        queue1 = Queue()
        queue_id1 = id(queue1)
        filters1 = ("SYS", "JOUR")

        self.handler.register(queue0, filters0)
        self.handler.register(queue1, filters1)

        # Each queue should have a unique entry in the context_map and
        # filter_queue (even if they have the same filter)
        self.assertIn(queue_id0, self.handler.context_map)
        self.assertIn(queue_id1, self.handler.context_map)
        self.assertEqual(self.handler.context_map[queue_id0], (queue0, filters0))
        self.assertEqual(self.handler.context_map[queue_id1], (queue1, filters1))
        self.assertEqual(self.handler.filter_queue.get(), (queue_id0, filters0, True))
        self.assertEqual(self.handler.filter_queue.get(), (queue_id1, filters1, True))

    def test_unregister(self):
        queue = Queue()
        queue_id = id(queue)
        filters = ("SYS", "JOUR")

        self.handler.register(queue, filters)
        self.assertIn(queue_id, self.handler.context_map)
        self.assertEqual(self.handler.filter_queue.get(), (queue_id, filters, True))

        self.handler.unregister(queue)
        self.assertNotIn(queue_id, self.handler.context_map)
        self.assertEqual(self.handler.filter_queue.get(), (queue_id, filters, False))

    def test_run_no_messages(self):
        try:
            self.handler.start()
            time.sleep(0.2)
            self.handler.stop()
            self.assertTrue(self.handler.stop_flag.is_set())
            self.assertFalse(self.handler.is_alive())
        except:  # noqa: E722
            self.fail()

    def test_run_single_context_queue(self):
        queue = Queue()
        queue_id = id(queue)
        filters = ("DA1", "DC1")
        self.handler.register(queue, filters)

        self.handler.start()

        # - simulate feeding of messages into the message_queue
        for _ in range(10):
            self.handler.message_queue.put((queue_id, create_messages(stream_one)))

        try:
            for _ in range(10):
                queue.get(timeout=0.01)
        except Empty:
            # - we should not get an Empty for exactly 10 messages
            self.fail()
        finally:
            self.handler.stop()

    def test_run_multiple_context_queue(self):
        self.handler.start()

        queue0 = Queue()
        queue_id0 = id(queue0)
        filters0 = ("DA1", "DC1")
        self.handler.register(queue0, filters0)

        queue1 = Queue()
        queue_id1 = id(queue1)
        filters1 = ("SYS", "JOUR")
        self.handler.register(queue1, filters1)

        # - queue with no filter
        queue2 = Queue()
        queue_id2 = id(queue2)
        self.handler.register(queue2)

        # - simulate feeding of messages into the message_queue
        for _ in range(10):
            for message in create_messages(stream_multiple, from_file=True):
                queue_id = queue_id0 if message.apid == "DA1" else queue_id1
                self.handler.message_queue.put((queue_id, message))
                # - simulate feeding of all messages for the queue with
                # no filter.
                self.handler.message_queue.put((queue_id2, message))

        try:
            da1_messages = []
            sys_messages = []
            all_messages = []
            for _ in range(10):
                da1_messages.append(queue0.get(timeout=0.01))
                sys_messages.append(queue1.get(timeout=0.01))
                all_messages.append(queue2.get(timeout=0.01))

            # these queues should not get any messages from other queues
            self.assertTrue(all(msg.apid == "DA1" for msg in da1_messages))
            self.assertTrue(all(msg.apid == "SYS" for msg in sys_messages))
            # this queues should get all messages
            self.assertFalse(
                all(msg.apid == "DA1" for msg in all_messages) or all(msg.apid == "SYS" for msg in all_messages)
            )
        except Empty:
            # - we should not get an Empty for at least 10 messages
            self.fail()
        finally:
            self.handler.stop()

    def test_run_unregister_with_unread_messages(self):
        self.handler.start()
        queue = Queue()
        queue_id = id(queue)
        filters = ("DA1", "DC1")
        self.handler.register(queue, filters)

        self.assertIn(queue_id, self.handler.context_map)
        self.handler.unregister(queue)

        # - simulate feeding of messages into the message_queue
        for _ in range(3):
            self.handler.message_queue.put((queue_id, create_messages(stream_one)))

        try:
            self.assertNotIn(queue_id, self.handler.context_map)
            # allow some time for the thread to read all messages
            time.sleep(0.5)
            self.assertTrue(self.handler.message_queue.empty())
            self.assertTrue(queue.empty())
        finally:
            self.handler.stop()