File: FlowControllerPublishModesOnPureSyncTests.cpp

package info (click to toggle)
fastdds 3.1.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 58,132 kB
  • sloc: cpp: 779,516; xml: 15,119; python: 4,356; sh: 190; makefile: 93; ansic: 12
file content (102 lines) | stat: -rw-r--r-- 4,215 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "FlowControllerPublishModesTests.hpp"

#include <thread>

#include <fastdds/rtps/attributes/ThreadSettings.hpp>

using namespace eprosima::fastdds::rtps;
using namespace testing;

TYPED_TEST(FlowControllerPublishModes, pure_sync_publish_mode)
{
    FlowControllerDescriptor flow_controller_descr;
    FlowControllerImpl<FlowControllerPureSyncPublishMode, TypeParam> pure_sync(nullptr,
            &flow_controller_descr, 0, ThreadSettings{});
    pure_sync.init();

    // Initialize callback to get info.
    auto send_functor = [&](
        CacheChange_t*,
        RTPSMessageGroup&,
        LocatorSelectorSender&,
        const std::chrono::time_point<std::chrono::steady_clock>&)
            {
                this->last_thread_delivering_sample = std::this_thread::get_id();
            };


    // Instantiate writers.
    BaseWriter writer1;
    BaseWriter writer2;

    // Register writers.
    pure_sync.register_writer(&writer1);

    CacheChange_t change_writer1;
    INIT_CACHE_CHANGE(change_writer1, writer1, 1);

    CacheChange_t change_writer2;
    INIT_CACHE_CHANGE(change_writer2, writer2, 1);

    // Testing add_new_sample. Writer will be able to deliver it.
    EXPECT_CALL(writer1,
            deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)).
            WillOnce(DoAll(send_functor, Return(DeliveryRetCode::DELIVERED)));
    writer1.getMutex().lock();
    ASSERT_TRUE(pure_sync.add_new_sample(&writer1, &change_writer1,
            std::chrono::steady_clock::now() + std::chrono::hours(24)));
    writer1.getMutex().unlock();
    EXPECT_EQ(std::this_thread::get_id(), this->last_thread_delivering_sample);

    EXPECT_CALL(writer2,
            deliver_sample_nts(&change_writer2, _, Ref(writer2.general_locator_selector_), _)).
            WillOnce(DoAll(send_functor, Return(DeliveryRetCode::DELIVERED)));
    writer2.getMutex().lock();
    ASSERT_TRUE(pure_sync.add_new_sample(&writer2, &change_writer2,
            std::chrono::steady_clock::now() + std::chrono::hours(24)));
    writer2.getMutex().unlock();
    EXPECT_EQ(std::this_thread::get_id(), this->last_thread_delivering_sample);

    // Testing add_new_sample. Writer will not be able to deliver it.
    EXPECT_CALL(writer1,
            deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)).
            WillOnce(DoAll(send_functor, Return(DeliveryRetCode::NOT_DELIVERED)));
    writer1.getMutex().lock();
    ASSERT_FALSE(pure_sync.add_new_sample(&writer1, &change_writer1,
            std::chrono::steady_clock::now() + std::chrono::hours(24)));
    writer1.getMutex().unlock();
    EXPECT_EQ(std::this_thread::get_id(), this->last_thread_delivering_sample);

    EXPECT_CALL(writer2,
            deliver_sample_nts(&change_writer2, _, Ref(writer2.general_locator_selector_), _)).
            WillOnce(DoAll(send_functor, Return(DeliveryRetCode::NOT_DELIVERED)));
    writer2.getMutex().lock();
    ASSERT_FALSE(pure_sync.add_new_sample(&writer2, &change_writer2,
            std::chrono::steady_clock::now() + std::chrono::hours(24)));
    writer2.getMutex().unlock();
    EXPECT_EQ(std::this_thread::get_id(), this->last_thread_delivering_sample);

    // Testing add_old_sample. Writers never be called.
    writer1.getMutex().lock();
    ASSERT_FALSE(pure_sync.add_old_sample(&writer1, &change_writer1));
    writer1.getMutex().unlock();
    writer2.getMutex().lock();
    ASSERT_FALSE(pure_sync.add_old_sample(&writer2, &change_writer2));
    writer2.getMutex().unlock();

    pure_sync.unregister_writer(&writer1);
}