1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
|
# Listener (or how to use callbacks with iceoryx)
## Thread Safety
The Listener is thread-safe and can be used without restrictions.
But be aware that all provided callbacks are executed concurrently
in the background thread of the Listener. If you access structures
inside this callback you have to either ensure that you are the only
one accessing it or that it is accessed with a guard like a `std::mutex`.
## Introduction
For an introduction into the terminology please read the Glossary in the
[WaitSet C++ example](https://github.com/eclipse-iceoryx/iceoryx/tree/v2.0.0/iceoryx_examples/waitset).
The Listener is a completely thread-safe construct that reacts to events by
executing registered callbacks in a background thread. Events can be emitted by
_EventOrigins_ like a subscriber or a user trigger. Some of the _EventOrigins_
like the subscriber can hereby emit more than one event type.
The interface of a listener consists of two methods: `attachEvent` to attach a
new event with a callback and `detachEvent`. These two methods can be called
concurrently, even from inside a callback that was triggered by an event!
## Expected Output
[](https://asciinema.org/a/407365)
## Code Walkthrough
!!! attention
Please be aware of the thread-safety restrictions of the _Listener_ and
read the [Thread Safety](#thread-safety) chapter carefully.
Let's say we have an application that offers us two distinct services:
`Radar.FrontLeft.Counter` and `Rader.FrontRight.Counter`. Every time we have
received a sample from left and right we would like to calculate the sum with
the newest values and print it out. If we have received only one of the samples,
we store it until we received the other side.
### ice_callbacks_publisher.cpp
The publisher of this example does not contain any new features but if you have
some questions take a look at the
[icedelivery example](https://github.com/eclipse-iceoryx/iceoryx/tree/v2.0.0/iceoryx_examples/icedelivery).
### ice_callbacks_subscriber.cpp
#### int main()
The subscriber main function starts as usual and after registering the runtime
we create the listener that starts a background thread.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][create listener]-->
```cpp
iox::popo::Listener listener;
```
Because it is fun, we also create a heartbeat trigger that will be triggered
every 4 seconds so that `heartbeat received` can be printed to the console.
Furthermore, we have to create two subscribers to receive samples for the two
services.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][create heartbeat and subscribers]-->
```cpp
iox::popo::UserTrigger heartbeat;
iox::popo::Subscriber<CounterTopic> subscriberLeft({"Radar", "FrontLeft", "Counter"});
iox::popo::Subscriber<CounterTopic> subscriberRight({"Radar", "FrontRight", "Counter"});
```
Next thing is a `heartbeatThread` which will trigger our heartbeat trigger every
4 seconds.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][create heartbeat]-->
```cpp
std::thread heartbeatThread([&] {
while (!iox::posix::hasTerminationRequested())
{
heartbeat.trigger();
std::this_thread::sleep_for(std::chrono::seconds(4));
}
});
```
Now that everything is set up, we can attach the subscribers to the listener so that
every time a new sample (`iox::popo::SubscriberEvent::DATA_RECEIVED`) is received our callback
(`onSampleReceivedCallback`) will be called. We also attach
our `heartbeat` user trigger to print the heartbeat message to the console via another
callback (`heartbeatCallback`).
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][attach everything]-->
```cpp
listener.attachEvent(heartbeat, iox::popo::createNotificationCallback(heartbeatCallback)).or_else([](auto) {
std::cerr << "unable to attach heartbeat event" << std::endl;
std::exit(EXIT_FAILURE);
});
// It is possible to attach any c function here with a signature of void(iox::popo::Subscriber<CounterTopic> *).
// But please be aware that the listener does not take ownership of the callback, therefore it has to exist as
// long as the event is attached. Furthermore, it excludes lambdas which are capturing data since they are not
// convertable to a c function pointer.
// to simplify the example we attach the same callback onSampleReceivedCallback again
listener
.attachEvent(subscriberLeft,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberLeft" << std::endl;
std::exit(EXIT_FAILURE);
});
listener
.attachEvent(subscriberRight,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberRight" << std::endl;
std::exit(EXIT_FAILURE);
});
```
Since a user trigger has only one event, we do not have to specify an event when we attach
it to the listener. `attachEvent` returns a `cxx::expected` to inform us if the attachment
succeeded. When this is not the case the error handling is performed in the `.or_else([](auto){` part
after each `attachEvent` call.
In this example, we choose to attach the same callback twice to make things easier
but you are free to attach any callback with the signature `void(iox::popo::Subscriber<CounterTopic> *)`.
The setup is complete, but it would terminate right away since we have no blocker which
waits until `SIGINT` or `SIGTERM` was send. In the other examples, we had not that problem
since we pulled all the events in a while true loop but working only with callbacks
requires something like our `SignalWatcher` which waits until `SIGINT` or `SIGTERM`
was signaled.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][wait for sigterm]-->
```cpp
iox::posix::waitForTerminationRequest();
```
When `waitForTerminationRequest` unblocks we clean up all resources and terminate the process
gracefully.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][cleanup]-->
```cpp
listener.detachEvent(heartbeat);
listener.detachEvent(subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED);
listener.detachEvent(subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED);
heartbeatThread.join();
```
Hint: You do not have to detach an _EventOrigin_ like a subscriber or user trigger
before it goes out of scope. This also goes for the _Listener_, the implemented
RAII-based design takes care of the resource cleanup.
#### The Callbacks
The callbacks must have a signature like `void(PointerToEventOrigin*)`.
Our `heartbeatCallback` for instance, just prints the message `heartbeat received`.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][heartbeat callback]-->
```cpp
void heartbeatCallback(iox::popo::UserTrigger*)
{
std::cout << "heartbeat received " << std::endl;
}
```
The `onSampleReceivedCallback` is more complex. We first acquire the received
sample and check which subscriber signaled the event by acquiring the subscriber's
service description. If the instance is equal to `FrontLeft` we store the sample
in the `leftCache` otherwise in the `rightCache`.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][[subscriber callback][get data]]-->
```cpp
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
subscriber->take().and_then([subscriber](auto& sample) {
auto instanceString = subscriber->getServiceDescription().getInstanceIDString();
// store the sample in the corresponding cache
if (instanceString == iox::capro::IdString_t("FrontLeft"))
{
leftCache.emplace(*sample);
}
else if (instanceString == iox::capro::IdString_t("FrontRight"))
{
rightCache.emplace(*sample);
}
std::cout << "received: " << sample->counter << std::endl;
});
// ...
}
```
In the next step, we check if both caches are filled. If this is the case, we print
an extra message which states the result of the sum of both received values.
Afterward, we reset both caches to start fresh again.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp][[subscriber callback][process data]]-->
```cpp
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
// ...
// if both caches are filled we can process them
if (leftCache && rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << leftCache->counter << " + "
<< rightCache->counter << " = " << leftCache->counter + rightCache->counter << std::endl;
leftCache.reset();
rightCache.reset();
}
}
```
### Additional context data for callbacks (ice_callbacks_listener_as_class_member.cpp)
Here we demonstrate how you can provide virtually everything as an additional argument to the callbacks.
You just have to provide a reference to a value as additional argument in the `attachEvent` method
which is then provided as argument in your callback. One of the use cases is to get access
to members and methods of an object inside a static method which we demonstrate here.
This example is identical to the [ice_callbacks_subscriber.cpp](#ice_callbacks_subscribercpp)
one, except that we left out the cyclic heartbeat trigger. The key difference is that
the listener is now a class member and in every callback we would like to change
some member variables. For this we require an additional pointer to the object
since the listener requires c function references which do not allow the usage
of lambda expressions with capturing. Here we can use the userType feature which allows us
to provide the this pointer as additional argument to the callback.
The main function is now pretty short, we instantiate our object of type `CounterService`
and call `waitForTerminationRequest` like in the
previous example to wait for the control c event from the user.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp][init]-->
```cpp
iox::runtime::PoshRuntime::initRuntime(APP_NAME);
CounterService counterService;
iox::posix::waitForTerminationRequest();
```
Our `CounterService` has the following members:
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp][members]-->
```cpp
iox::popo::Subscriber<CounterTopic> m_subscriberLeft;
iox::popo::Subscriber<CounterTopic> m_subscriberRight;
iox::cxx::optional<CounterTopic> m_leftCache;
iox::cxx::optional<CounterTopic> m_rightCache;
iox::popo::Listener m_listener;
```
And their purposes are the same as in the previous example. In the constructor
we initialize the two subscribers and attach them to our listener. But now we
add an additional parameter in the `iox::popo::createNotificationCallback`, the
dereferenced `this` pointer. It has to be dereferenced since we require a reference
as argument.
!!! attention
The user has to ensure that the contextData (`*this`) in `attachEvent`
lives as long as the attachment, with its callback, is attached otherwise
the callback context data pointer is dangling.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp][ctor]-->
```cpp
CounterService()
: m_subscriberLeft({"Radar", "FrontLeft", "Counter"})
, m_subscriberRight({"Radar", "FrontRight", "Counter"})
{
/// Attach the static method onSampleReceivedCallback and provide this as additional argument
/// to the callback to gain access to the object whenever the callback is called.
/// It is not possible to use a lambda with capturing here since they are not convertable to
/// a C function pointer.
/// important: the user has to ensure that the contextData (*this) lives as long as
/// the subscriber with its callback is attached to the listener
m_listener
.attachEvent(m_subscriberLeft,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback, *this))
.or_else([](auto) {
std::cerr << "unable to attach subscriberLeft" << std::endl;
std::exit(EXIT_FAILURE);
});
m_listener
.attachEvent(m_subscriberRight,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback, *this))
.or_else([](auto) {
std::cerr << "unable to attach subscriberRight" << std::endl;
std::exit(EXIT_FAILURE);
});
}
```
The `onSampleReceivedCallback` is now a static method instead of a free function. It
has to be static since we require a C function reference as callback argument and a
static method can be converted into such a type. But in a static method we do not
have access to the members of an object, therefore we have to add an additional
argument, the pointer to the object itself, called `self`.
<!--[geoffrey][iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp][callback]-->
```cpp
static void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber, CounterService* self)
{
subscriber->take().and_then([subscriber, self](auto& sample) {
auto instanceString = subscriber->getServiceDescription().getInstanceIDString();
// store the sample in the corresponding cache
if (instanceString == iox::capro::IdString_t("FrontLeft"))
{
self->m_leftCache.emplace(*sample);
}
else if (instanceString == iox::capro::IdString_t("FrontRight"))
{
self->m_rightCache.emplace(*sample);
}
std::cout << "received: " << sample->counter << std::endl;
});
// if both caches are filled we can process them
if (self->m_leftCache && self->m_rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << self->m_leftCache->counter
<< " + " << self->m_rightCache->counter << " = "
<< self->m_leftCache->counter + self->m_rightCache->counter << std::endl;
self->m_leftCache.reset();
self->m_rightCache.reset();
}
}
```
<center>
[Check out callbacks on GitHub :fontawesome-brands-github:](https://github.com/eclipse-iceoryx/iceoryx/tree/v2.0.0/iceoryx_examples/callbacks){ .md-button }
</center>
|