File: cardinality_limit_test.cc

package info (click to toggle)
opentelemetry-cpp 1.19.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,744 kB
  • sloc: cpp: 79,029; sh: 1,640; makefile: 43; python: 31
file content (169 lines) | stat: -rw-r--r-- 7,250 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include <gtest/gtest.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common.h"

#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h"
#include "opentelemetry/sdk/metrics/state/metric_collector.h"
#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"

#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
#  include "opentelemetry/sdk/metrics/exemplar/filter_type.h"
#endif

using namespace opentelemetry::sdk::metrics;
using namespace opentelemetry::common;
namespace nostd = opentelemetry::nostd;

TEST(CardinalityLimit, AttributesHashMapBasicTests)
{
  AttributesHashMap hash_map(10);
  std::function<std::unique_ptr<Aggregation>()> aggregation_callback =
      []() -> std::unique_ptr<Aggregation> {
    return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
  };
  // add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow.
  int64_t record_value = 100;
  for (auto i = 0; i < 10; i++)
  {
    FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
    auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
    static_cast<LongSumAggregation *>(
        hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
        ->Aggregate(record_value);
  }
  EXPECT_EQ(hash_map.Size(), 10);
  // add 5 unique metric points above limit, they all should get consolidated as single
  // overflowmetric point.
  for (auto i = 10; i < 15; i++)
  {
    FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
    auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
    static_cast<LongSumAggregation *>(
        hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
        ->Aggregate(record_value);
  }
  EXPECT_EQ(hash_map.Size(), 10);  // only one more metric point should be added as overflow.
  // record 5 more measurements to already existing (and not-overflow) metric points. They
  // should get aggregated to these existing metric points.
  for (auto i = 0; i < 5; i++)
  {
    FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
    auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
    static_cast<LongSumAggregation *>(
        hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
        ->Aggregate(record_value);
  }
  EXPECT_EQ(hash_map.Size(), 10);  // no new metric point added

  // get the overflow metric point
  auto agg1 = hash_map.GetOrSetDefault(
      FilteredOrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
      aggregation_callback, kOverflowAttributesHash);
  EXPECT_NE(agg1, nullptr);
  auto sum_agg1 = static_cast<LongSumAggregation *>(agg1);
  EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg1->ToPoint()).value_),
            record_value * 6);  // 1 from previous 10, 5 from current 5.
  // get remaining metric points
  for (auto i = 0; i < 9; i++)
  {
    FilteredOrderedAttributeMap attributes = {{"key", std::to_string(i)}};
    auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
    auto agg2 = hash_map.GetOrSetDefault(
        FilteredOrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
        aggregation_callback, hash);
    EXPECT_NE(agg2, nullptr);
    auto sum_agg2 = static_cast<LongSumAggregation *>(agg2);
    if (i < 5)
    {
      EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg2->ToPoint()).value_),
                record_value * 2);  // 1 from first recording, 1 from third recording
    }
    else
    {
      EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg2->ToPoint()).value_),
                record_value);  // 1 from first recording
    }
  }
}

class WritableMetricStorageCardinalityLimitTestFixture
    : public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation)
{
  auto sdk_start_ts               = std::chrono::system_clock::now();
  const size_t attributes_limit   = 10;
  InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
                                     InstrumentValueType::kLong};
  std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
      new DefaultAttributesProcessor{}};
  SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
                            ExemplarFilterType::kAlwaysOff,
                            ExemplarReservoir::GetNoExemplarReservoir(),
#endif
                            nullptr, attributes_limit);

  int64_t record_value = 100;
  // add 9 unique metric points, and 6 more above limit.
  for (auto i = 0; i < 15; i++)
  {
    std::map<std::string, std::string> attributes = {{"key", std::to_string(i)}};
    storage.RecordLong(record_value,
                       KeyValueIterableView<std::map<std::string, std::string>>(attributes),
                       opentelemetry::context::Context{});
  }
  AggregationTemporality temporality = GetParam();
  std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
  std::vector<std::shared_ptr<CollectorHandle>> collectors;
  collectors.push_back(collector);
  //... Some computation here
  auto collection_ts      = std::chrono::system_clock::now();
  size_t count_attributes = 0;
  bool overflow_present   = false;
  storage.Collect(
      collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
        for (const auto &data_attr : metric_data.point_data_attr_)
        {
          const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
          count_attributes++;
          if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey)
          {
            EXPECT_EQ(nostd::get<int64_t>(data.value_), record_value * 6);
            overflow_present = true;
          }
        }
        return true;
      });
  EXPECT_EQ(count_attributes, attributes_limit);
  EXPECT_EQ(overflow_present, true);
}
INSTANTIATE_TEST_SUITE_P(All,
                         WritableMetricStorageCardinalityLimitTestFixture,
                         ::testing::Values(AggregationTemporality::kDelta));