1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
#include <gtest/gtest.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common.h"
#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h"
#include "opentelemetry/sdk/metrics/state/metric_collector.h"
#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
# include "opentelemetry/sdk/metrics/exemplar/filter_type.h"
#endif
using namespace opentelemetry::sdk::metrics;
using namespace opentelemetry::common;
namespace nostd = opentelemetry::nostd;
TEST(CardinalityLimit, AttributesHashMapBasicTests)
{
AttributesHashMap hash_map(10);
std::function<std::unique_ptr<Aggregation>()> aggregation_callback =
[]() -> std::unique_ptr<Aggregation> {
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
};
// add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow.
int64_t record_value = 100;
for (auto i = 0; i < 10; i++)
{
FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10);
// add 5 unique metric points above limit, they all should get consolidated as single
// overflowmetric point.
for (auto i = 10; i < 15; i++)
{
FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10); // only one more metric point should be added as overflow.
// record 5 more measurements to already existing (and not-overflow) metric points. They
// should get aggregated to these existing metric points.
for (auto i = 0; i < 5; i++)
{
FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10); // no new metric point added
// get the overflow metric point
auto agg1 = hash_map.GetOrSetDefault(
FilteredOrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
aggregation_callback, kOverflowAttributesHash);
EXPECT_NE(agg1, nullptr);
auto sum_agg1 = static_cast<LongSumAggregation *>(agg1);
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg1->ToPoint()).value_),
record_value * 6); // 1 from previous 10, 5 from current 5.
// get remaining metric points
for (auto i = 0; i < 9; i++)
{
FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
auto agg2 = hash_map.GetOrSetDefault(
FilteredOrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
aggregation_callback, hash);
EXPECT_NE(agg2, nullptr);
auto sum_agg2 = static_cast<LongSumAggregation *>(agg2);
if (i < 5)
{
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg2->ToPoint()).value_),
record_value * 2); // 1 from first recording, 1 from third recording
}
else
{
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg2->ToPoint()).value_),
record_value); // 1 from first recording
}
}
}
class WritableMetricStorageCardinalityLimitTestFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};
TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation)
{
auto sdk_start_ts = std::chrono::system_clock::now();
const size_t attributes_limit = 10;
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
ExemplarFilterType::kAlwaysOff,
ExemplarReservoir::GetNoExemplarReservoir(),
#endif
nullptr, attributes_limit);
int64_t record_value = 100;
// add 9 unique metric points, and 6 more above limit.
for (auto i = 0; i < 15; i++)
{
std::map<std::string, std::string> attributes = {{"key", std::to_string(i)}};
storage.RecordLong(record_value,
KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});
}
AggregationTemporality temporality = GetParam();
std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);
//... Some computation here
auto collection_ts = std::chrono::system_clock::now();
size_t count_attributes = 0;
bool overflow_present = false;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
count_attributes++;
if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey)
{
EXPECT_EQ(nostd::get<int64_t>(data.value_), record_value * 6);
overflow_present = true;
}
}
return true;
});
EXPECT_EQ(count_attributes, attributes_limit);
EXPECT_EQ(overflow_present, true);
}
INSTANTIATE_TEST_SUITE_P(All,
WritableMetricStorageCardinalityLimitTestFixture,
::testing::Values(AggregationTemporality::kDelta));
|