File: MessageBus.cpp

package info (click to toggle)
darkradiant 3.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 41,080 kB
  • sloc: cpp: 264,743; ansic: 10,659; python: 1,852; xml: 1,650; sh: 92; makefile: 21
file content (172 lines) | stat: -rw-r--r-- 4,512 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include "RadiantTest.h"

#include <future>
#include "imessagebus.h"

namespace test
{

using MessageBusTest = RadiantTest;

class CustomMessage1 : 
    public radiant::IMessage
{
public:
    static const std::size_t Id = radiant::IMessage::Type::UserDefinedMessagesGoHigherThanThis + 1000;

    std::size_t getId() const override
    {
        return Id;
    }
};

class CustomMessage2 :
    public radiant::IMessage
{
public:
    static const std::size_t Id = radiant::IMessage::Type::UserDefinedMessagesGoHigherThanThis + 1001;
    
    std::size_t getId() const override
    {
        return Id;
    }
};

TEST_F(MessageBusTest, Registration)
{
    auto counter1 = 0;
    auto& messageBus = GlobalRadiantCore().getMessageBus();

    auto listenerId1 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&) { ++counter1; });

    CustomMessage1 msg1;
    messageBus.sendMessage(msg1);

    // Counter should have been increased
    EXPECT_EQ(counter1, 1);

    messageBus.removeListener(listenerId1);

    // Send another message, counter should not increase 
    messageBus.sendMessage(msg1);
    
    EXPECT_EQ(counter1, 1);
}

TEST_F(MessageBusTest, ChannelHandling)
{
    auto counter1 = 0;
    auto counter2 = 0;
    auto& messageBus = GlobalRadiantCore().getMessageBus();

    auto listenerId1 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&) { ++counter1; });
    auto listenerId2 = messageBus.addListener(CustomMessage2::Id, [&](radiant::IMessage&) { ++counter2; });

    EXPECT_NE(listenerId1, listenerId2);

    CustomMessage1 msg1;
    messageBus.sendMessage(msg1);

    // Only one counter should have been increased
    EXPECT_EQ(counter1, 1);
    EXPECT_EQ(counter2, 0);
}

TEST_F(MessageBusTest, MultipleListenersOnChannel)
{
    auto counter1 = 0;
    auto& messageBus = GlobalRadiantCore().getMessageBus();

    auto listenerId1 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&) { ++counter1; });
    auto listenerId2 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&) { ++counter1; });

    CustomMessage1 msg1;
    messageBus.sendMessage(msg1);

    // Counter should have been increased twice
    EXPECT_EQ(counter1, 2);

    messageBus.removeListener(listenerId1);

    // Send another message, counter should increase only once now
    messageBus.sendMessage(msg1);

    EXPECT_EQ(counter1, 3);
}

TEST_F(MessageBusTest, DeregistrationDuringCallback)
{
    auto counter1 = 0;
    auto& messageBus = GlobalRadiantCore().getMessageBus();

    // Two listeners, unsubscribing during callback
    std::size_t listenerId1 = 0;
    std::size_t listenerId2 = 0;

    listenerId1 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&)
    {
        ++counter1;
        messageBus.removeListener(listenerId1);
    });

    listenerId2 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&)
    {
        ++counter1;
        messageBus.removeListener(listenerId2);
    });

    CustomMessage1 msg1;
    messageBus.sendMessage(msg1);

    // Counter should have been increased twice
    EXPECT_EQ(counter1, 2);

    // Send another message, counter should not increase anymore
    messageBus.sendMessage(msg1);

    EXPECT_EQ(counter1, 2);
}

TEST_F(MessageBusTest, MultipleThreadsCanSendMessages)
{
    std::size_t counter = 0;
    auto& messageBus = GlobalRadiantCore().getMessageBus();

    // Add a handler that launches another thread which in turn is sending messages
    auto listenerId1 = messageBus.addListener(CustomMessage1::Id, [&](radiant::IMessage&) 
    {
        ++counter;

        // Launch a separate thread which is sending messages
        std::thread innerThread([&]()
        {
            CustomMessage2 msg2;
            messageBus.sendMessage(msg2);
        });

        // If the sendMessage(CustomMessage2) deadlocks, this will never return
        innerThread.join();

        ++counter; // finish => counter == 2
    });

    auto task = std::async(std::launch::async, [&]()
    {
        CustomMessage1 msg1;
        messageBus.sendMessage(msg1);
    });

    auto result = task.wait_for(std::chrono::seconds(2));

    EXPECT_EQ(counter, 2);
    EXPECT_EQ(result, std::future_status::ready) << "Inner thread doesn't respond, possibly dead-locked";

    if (result == std::future_status::timeout)
    {
        // In case of a deadlock the destructor of the above task
        // would wait forever, abort the whole test application
        std::terminate();
    }
}

}