1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#include "iceoryx_binding_c/enums.h"
#include "iceoryx_binding_c/internal/c2cpp_enum_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_enum_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_service_description_translation.hpp"
#include "iceoryx_binding_c/internal/cpp2c_subscriber.hpp"
#include "iceoryx_binding_c/internal/cpp2c_waitset.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp"
#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
using namespace iox;
using namespace iox::cxx;
using namespace iox::popo;
using namespace iox::capro;
using namespace iox::mepoo;
using namespace iox::runtime;
extern "C" {
#include "iceoryx_binding_c/subscriber.h"
}
/// @todo iox-#1221 remove this workaround needed for CycloneDDS due to our change to use the head for storage
struct SubscriberWithStoragePointer
{
void* subscriberStorage{nullptr};
cpp2c_Subscriber subscriber;
};
constexpr uint64_t SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT = 543212345;
void iox_sub_options_init(iox_sub_options_t* options)
{
if (options == nullptr)
{
LogWarn() << "subscriber options initialization skipped - null pointer provided";
return;
}
SubscriberOptions subscriberOptions;
options->queueCapacity = subscriberOptions.queueCapacity;
options->historyRequest = subscriberOptions.historyRequest;
options->nodeName = nullptr;
options->subscribeOnCreate = subscriberOptions.subscribeOnCreate;
options->queueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.queueFullPolicy);
options->requirePublisherHistorySupport = false;
options->initCheck = SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
}
bool iox_sub_options_is_initialized(const iox_sub_options_t* const options)
{
return options && options->initCheck == SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
}
iox_sub_t iox_sub_init(iox_sub_storage_t* self,
const char* const service,
const char* const instance,
const char* const event,
const iox_sub_options_t* const options)
{
if (self == nullptr)
{
LogWarn() << "subscriber initialization skipped - null pointer provided for iox_sub_storage_t";
return nullptr;
}
SubscriberOptions subscriberOptions;
// use default options otherwise
if (options != nullptr)
{
if (!iox_sub_options_is_initialized(options))
{
// note that they may have been initialized but the initCheck
// pattern overwritten afterwards, we cannot be sure but it is a misuse
LogFatal() << "subscriber options may not have been initialized with iox_sub_init";
errorHandler(Error::kBINDING_C__SUBSCRIBER_OPTIONS_NOT_INITIALIZED);
}
subscriberOptions.queueCapacity = options->queueCapacity;
subscriberOptions.historyRequest = options->historyRequest;
if (options->nodeName != nullptr)
{
subscriberOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
}
subscriberOptions.subscribeOnCreate = options->subscribeOnCreate;
subscriberOptions.queueFullPolicy = c2cpp::queueFullPolicy(options->queueFullPolicy);
subscriberOptions.requiresPublisherHistorySupport = options->requirePublisherHistorySupport;
}
// this is required for CycloneDDS to limit the fallout of our change to use the heap for storage
// it should be removed with #1221
auto meWithStoragePointer = new SubscriberWithStoragePointer();
meWithStoragePointer->subscriberStorage = self;
auto me = &meWithStoragePointer->subscriber;
assert(reinterpret_cast<uint64_t>(me) - reinterpret_cast<uint64_t>(meWithStoragePointer) == sizeof(void*)
&& "Size mismatch for SubscriberWithStoragePointer!");
me->m_portData =
PoshRuntime::getInstance().getMiddlewareSubscriber(ServiceDescription{IdString_t(TruncateToCapacity, service),
IdString_t(TruncateToCapacity, instance),
IdString_t(TruncateToCapacity, event)},
subscriberOptions);
self->do_not_touch_me[0] = reinterpret_cast<uint64_t>(me);
return me;
}
void iox_sub_deinit(iox_sub_t const self)
{
iox::cxx::Expects(self != nullptr);
auto addressOfSelf = reinterpret_cast<uint64_t>(self);
auto* selfWithStoragePointer = reinterpret_cast<SubscriberWithStoragePointer*>(addressOfSelf - sizeof(void*));
delete selfWithStoragePointer;
}
void iox_sub_subscribe(iox_sub_t const self)
{
SubscriberPortUser(self->m_portData).subscribe();
}
void iox_sub_unsubscribe(iox_sub_t const self)
{
SubscriberPortUser(self->m_portData).unsubscribe();
}
iox_SubscribeState iox_sub_get_subscription_state(iox_sub_t const self)
{
return cpp2c::subscribeState(SubscriberPortUser(self->m_portData).getSubscriptionState());
}
iox_ChunkReceiveResult iox_sub_take_chunk(iox_sub_t const self, const void** const userPayload)
{
auto result = SubscriberPortUser(self->m_portData).tryGetChunk();
if (result.has_error())
{
return cpp2c::chunkReceiveResult(result.get_error());
}
*userPayload = result.value()->userPayload();
return ChunkReceiveResult_SUCCESS;
}
void iox_sub_release_chunk(iox_sub_t const self, const void* const userPayload)
{
SubscriberPortUser(self->m_portData).releaseChunk(ChunkHeader::fromUserPayload(userPayload));
}
void iox_sub_release_queued_chunks(iox_sub_t const self)
{
SubscriberPortUser(self->m_portData).releaseQueuedChunks();
}
bool iox_sub_has_chunks(iox_sub_t const self)
{
return SubscriberPortUser(self->m_portData).hasNewChunks();
}
bool iox_sub_has_lost_chunks(iox_sub_t const self)
{
return SubscriberPortUser(self->m_portData).hasLostChunksSinceLastCall();
}
iox_service_description_t iox_sub_get_service_description(iox_sub_t const self)
{
return TranslateServiceDescription(SubscriberPortUser(self->m_portData).getCaProServiceDescription());
}
|