1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#include "iceoryx_hoofs/cxx/optional.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/popo/listener.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/popo/user_trigger.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "topic_data.hpp"
#include <chrono>
#include <csignal>
#include <iostream>
constexpr char APP_NAME[] = "iox-cpp-callbacks-subscriber";
iox::cxx::optional<CounterTopic> leftCache;
iox::cxx::optional<CounterTopic> rightCache;
//! [heartbeat callback]
void heartbeatCallback(iox::popo::UserTrigger*)
{
std::cout << "heartbeat received " << std::endl;
}
//! [heartbeat callback]
//! [subscriber callback]
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
//! [get data]
subscriber->take().and_then([subscriber](auto& sample) {
auto instanceString = subscriber->getServiceDescription().getInstanceIDString();
// store the sample in the corresponding cache
if (instanceString == iox::capro::IdString_t("FrontLeft"))
{
leftCache.emplace(*sample);
}
else if (instanceString == iox::capro::IdString_t("FrontRight"))
{
rightCache.emplace(*sample);
}
std::cout << "received: " << sample->counter << std::endl;
});
//! [get data]
//! [process data]
// if both caches are filled we can process them
if (leftCache && rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << leftCache->counter << " + "
<< rightCache->counter << " = " << leftCache->counter + rightCache->counter << std::endl;
leftCache.reset();
rightCache.reset();
}
//! [process data]
}
//! [subscriber callback]
int main()
{
iox::runtime::PoshRuntime::initRuntime(APP_NAME);
// the listener starts a background thread and the callbacks of the attached events
// will be called in this background thread when they are triggered
//! [create listener]
iox::popo::Listener listener;
//! [create listener]
//! [create heartbeat and subscribers]
iox::popo::UserTrigger heartbeat;
iox::popo::Subscriber<CounterTopic> subscriberLeft({"Radar", "FrontLeft", "Counter"});
iox::popo::Subscriber<CounterTopic> subscriberRight({"Radar", "FrontRight", "Counter"});
//! [create heartbeat and subscribers]
// send a heartbeat every 4 seconds
//! [create heartbeat]
std::thread heartbeatThread([&] {
while (!iox::posix::hasTerminationRequested())
{
heartbeat.trigger();
std::this_thread::sleep_for(std::chrono::seconds(4));
}
});
//! [create heartbeat]
// attach everything to the listener, from here on the callbacks are called when the corresponding event is occuring
//! [attach everything]
listener.attachEvent(heartbeat, iox::popo::createNotificationCallback(heartbeatCallback)).or_else([](auto) {
std::cerr << "unable to attach heartbeat event" << std::endl;
std::exit(EXIT_FAILURE);
});
// It is possible to attach any c function here with a signature of void(iox::popo::Subscriber<CounterTopic> *).
// But please be aware that the listener does not take ownership of the callback, therefore it has to exist as
// long as the event is attached. Furthermore, it excludes lambdas which are capturing data since they are not
// convertable to a c function pointer.
// to simplify the example we attach the same callback onSampleReceivedCallback again
listener
.attachEvent(subscriberLeft,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberLeft" << std::endl;
std::exit(EXIT_FAILURE);
});
listener
.attachEvent(subscriberRight,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberRight" << std::endl;
std::exit(EXIT_FAILURE);
});
//! [attach everything]
// wait until someone presses CTRL+C
//! [wait for sigterm]
iox::posix::waitForTerminationRequest();
//! [wait for sigterm]
// optional detachEvent, but not required.
// when the listener goes out of scope it will detach all events and when a
// subscriber goes out of scope it will detach itself from the listener
//! [cleanup]
listener.detachEvent(heartbeat);
listener.detachEvent(subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED);
listener.detachEvent(subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED);
heartbeatThread.join();
//! [cleanup]
return (EXIT_SUCCESS);
}
|