File: ice_waitset_grouping.cpp

package info (click to toggle)
iceoryx 2.0.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 11,260 kB
  • sloc: cpp: 94,127; ansic: 1,443; sh: 1,436; python: 1,377; xml: 80; makefile: 61
file content (134 lines) | stat: -rw-r--r-- 5,043 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/posix_wrapper/signal_handler.hpp"
#include "iceoryx_posh/popo/untyped_subscriber.hpp"
#include "iceoryx_posh/popo/user_trigger.hpp"
#include "iceoryx_posh/popo/wait_set.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "topic_data.hpp"

#include <chrono>
#include <iostream>

std::atomic_bool keepRunning{true};
iox::popo::UserTrigger shutdownTrigger;

static void sigHandler(int f_sig IOX_MAYBE_UNUSED)
{
    shutdownTrigger.trigger();
}

int main()
{
    constexpr uint64_t NUMBER_OF_SUBSCRIBERS = 4U;
    constexpr uint64_t ONE_SHUTDOWN_TRIGGER = 1U;

    // register sigHandler
    auto signalIntGuard = iox::posix::registerSignalHandler(iox::posix::Signal::INT, sigHandler);
    auto signalTermGuard = iox::posix::registerSignalHandler(iox::posix::Signal::TERM, sigHandler);

    iox::runtime::PoshRuntime::initRuntime("iox-cpp-waitset-grouping");
    //! [create waitset]
    iox::popo::WaitSet<NUMBER_OF_SUBSCRIBERS + ONE_SHUTDOWN_TRIGGER> waitset;

    // attach shutdownTrigger to handle CTRL+C
    waitset.attachEvent(shutdownTrigger).or_else([](auto) {
        std::cerr << "failed to attach shutdown trigger" << std::endl;
        std::exit(EXIT_FAILURE);
    });
    //! [create waitset]

    // create subscriber and subscribe them to our service
    //! [create subscribers]
    iox::cxx::vector<iox::popo::UntypedSubscriber, NUMBER_OF_SUBSCRIBERS> subscriberVector;
    for (auto i = 0U; i < NUMBER_OF_SUBSCRIBERS; ++i)
    {
        subscriberVector.emplace_back(iox::capro::ServiceDescription{"Radar", "FrontLeft", "Counter"});
    }
    //! [create subscribers]

    constexpr uint64_t FIRST_GROUP_ID = 123U;
    constexpr uint64_t SECOND_GROUP_ID = 456U;

    //! [configure subscribers]
    // attach the first two subscribers to waitset with a id of FIRST_GROUP_ID
    for (auto i = 0U; i < NUMBER_OF_SUBSCRIBERS / 2; ++i)
    {
        waitset.attachState(subscriberVector[i], iox::popo::SubscriberState::HAS_DATA, FIRST_GROUP_ID)
            .or_else([&](auto) {
                std::cerr << "failed to attach subscriber" << i << std::endl;
                std::exit(EXIT_FAILURE);
            });
    }

    // attach the remaining subscribers to waitset with a id of SECOND_GROUP_ID
    for (auto i = NUMBER_OF_SUBSCRIBERS / 2; i < NUMBER_OF_SUBSCRIBERS; ++i)
    {
        waitset.attachState(subscriberVector[i], iox::popo::SubscriberState::HAS_DATA, SECOND_GROUP_ID)
            .or_else([&](auto) {
                std::cerr << "failed to attach subscriber" << i << std::endl;
                std::exit(EXIT_FAILURE);
            });
    }
    //! [configure subscribers]

    //! [event loop]
    while (keepRunning)
    {
        auto notificationVector = waitset.wait();

        for (auto& notification : notificationVector)
        {
            //! [shutdown path]
            if (notification->doesOriginateFrom(&shutdownTrigger))
            {
                keepRunning = false;
            }
            //! [shutdown path]

            //! [data path]
            // we print the received data for the first group
            else if (notification->getNotificationId() == FIRST_GROUP_ID)
            {
                auto subscriber = notification->getOrigin<iox::popo::UntypedSubscriber>();
                subscriber->take().and_then([&](auto& userPayload) {
                    const CounterTopic* data = static_cast<const CounterTopic*>(userPayload);
                    auto flags = std::cout.flags();
                    std::cout << "received: " << std::dec << data->counter << std::endl;
                    std::cout.setf(flags);
                    subscriber->release(userPayload);
                });
            }
            // dismiss the received data for the second group
            else if (notification->getNotificationId() == SECOND_GROUP_ID)
            {
                std::cout << "dismiss data\n";
                auto subscriber = notification->getOrigin<iox::popo::UntypedSubscriber>();
                // We need to release the data to reset the trigger hasData
                // otherwise the WaitSet would notify us in `waitset.wait()` again
                // instantly.
                subscriber->releaseQueuedData();
            }
            //! [data path]
        }

        std::cout << std::endl;
    }
    //! [event loop]

    return (EXIT_SUCCESS);
}