File: c_subscriber.cpp

package info (click to toggle)
iceoryx 2.0.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 11,260 kB
  • sloc: cpp: 94,127; ansic: 1,443; sh: 1,436; python: 1,377; xml: 80; makefile: 61
file content (187 lines) | stat: -rw-r--r-- 7,042 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0


#include "iceoryx_binding_c/enums.h"
#include "iceoryx_binding_c/internal/c2cpp_enum_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_enum_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_service_description_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_subscriber.hpp"
#include "iceoryx_binding_c/internal/cpp2c_waitset.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp"
#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"

using namespace iox;
using namespace iox::cxx;
using namespace iox::popo;
using namespace iox::capro;
using namespace iox::mepoo;
using namespace iox::runtime;

extern "C" {
#include "iceoryx_binding_c/subscriber.h"
}

/// @todo iox-#1221 remove this workaround needed for CycloneDDS due to our change to use the head for storage
struct SubscriberWithStoragePointer
{
    void* subscriberStorage{nullptr};
    cpp2c_Subscriber subscriber;
};

constexpr uint64_t SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT = 543212345;

void iox_sub_options_init(iox_sub_options_t* options)
{
    if (options == nullptr)
    {
        LogWarn() << "subscriber options initialization skipped - null pointer provided";
        return;
    }

    SubscriberOptions subscriberOptions;
    options->queueCapacity = subscriberOptions.queueCapacity;
    options->historyRequest = subscriberOptions.historyRequest;
    options->nodeName = nullptr;
    options->subscribeOnCreate = subscriberOptions.subscribeOnCreate;
    options->queueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.queueFullPolicy);
    options->requirePublisherHistorySupport = false;

    options->initCheck = SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
}

bool iox_sub_options_is_initialized(const iox_sub_options_t* const options)
{
    return options && options->initCheck == SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
}

iox_sub_t iox_sub_init(iox_sub_storage_t* self,
                       const char* const service,
                       const char* const instance,
                       const char* const event,
                       const iox_sub_options_t* const options)
{
    if (self == nullptr)
    {
        LogWarn() << "subscriber initialization skipped - null pointer provided for iox_sub_storage_t";
        return nullptr;
    }

    SubscriberOptions subscriberOptions;

    // use default options otherwise
    if (options != nullptr)
    {
        if (!iox_sub_options_is_initialized(options))
        {
            // note that they may have been initialized but the initCheck
            // pattern overwritten afterwards, we cannot be sure but it is a misuse
            LogFatal() << "subscriber options may not have been initialized with iox_sub_init";
            errorHandler(Error::kBINDING_C__SUBSCRIBER_OPTIONS_NOT_INITIALIZED);
        }
        subscriberOptions.queueCapacity = options->queueCapacity;
        subscriberOptions.historyRequest = options->historyRequest;
        if (options->nodeName != nullptr)
        {
            subscriberOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
        }
        subscriberOptions.subscribeOnCreate = options->subscribeOnCreate;
        subscriberOptions.queueFullPolicy = c2cpp::queueFullPolicy(options->queueFullPolicy);
        subscriberOptions.requiresPublisherHistorySupport = options->requirePublisherHistorySupport;
    }

    // this is required for CycloneDDS to limit the fallout of our change to use the heap for storage
    // it should be removed with #1221
    auto meWithStoragePointer = new SubscriberWithStoragePointer();
    meWithStoragePointer->subscriberStorage = self;
    auto me = &meWithStoragePointer->subscriber;
    assert(reinterpret_cast<uint64_t>(me) - reinterpret_cast<uint64_t>(meWithStoragePointer) == sizeof(void*)
           && "Size mismatch for SubscriberWithStoragePointer!");

    me->m_portData =
        PoshRuntime::getInstance().getMiddlewareSubscriber(ServiceDescription{IdString_t(TruncateToCapacity, service),
                                                                              IdString_t(TruncateToCapacity, instance),
                                                                              IdString_t(TruncateToCapacity, event)},
                                                           subscriberOptions);

    self->do_not_touch_me[0] = reinterpret_cast<uint64_t>(me);
    return me;
}

void iox_sub_deinit(iox_sub_t const self)
{
    iox::cxx::Expects(self != nullptr);

    auto addressOfSelf = reinterpret_cast<uint64_t>(self);
    auto* selfWithStoragePointer = reinterpret_cast<SubscriberWithStoragePointer*>(addressOfSelf - sizeof(void*));

    delete selfWithStoragePointer;
}

void iox_sub_subscribe(iox_sub_t const self)
{
    SubscriberPortUser(self->m_portData).subscribe();
}

void iox_sub_unsubscribe(iox_sub_t const self)
{
    SubscriberPortUser(self->m_portData).unsubscribe();
}

iox_SubscribeState iox_sub_get_subscription_state(iox_sub_t const self)
{
    return cpp2c::subscribeState(SubscriberPortUser(self->m_portData).getSubscriptionState());
}

iox_ChunkReceiveResult iox_sub_take_chunk(iox_sub_t const self, const void** const userPayload)
{
    auto result = SubscriberPortUser(self->m_portData).tryGetChunk();
    if (result.has_error())
    {
        return cpp2c::chunkReceiveResult(result.get_error());
    }

    *userPayload = result.value()->userPayload();
    return ChunkReceiveResult_SUCCESS;
}

void iox_sub_release_chunk(iox_sub_t const self, const void* const userPayload)
{
    SubscriberPortUser(self->m_portData).releaseChunk(ChunkHeader::fromUserPayload(userPayload));
}

void iox_sub_release_queued_chunks(iox_sub_t const self)
{
    SubscriberPortUser(self->m_portData).releaseQueuedChunks();
}

bool iox_sub_has_chunks(iox_sub_t const self)
{
    return SubscriberPortUser(self->m_portData).hasNewChunks();
}

bool iox_sub_has_lost_chunks(iox_sub_t const self)
{
    return SubscriberPortUser(self->m_portData).hasLostChunksSinceLastCall();
}

iox_service_description_t iox_sub_get_service_description(iox_sub_t const self)
{
    return TranslateServiceDescription(SubscriberPortUser(self->m_portData).getCaProServiceDescription());
}