1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
#include <gtest/gtest.h>
#include <stddef.h>
#include <algorithm>
#include <chrono>
#include <memory>
#include <ratio>
#include <thread>
#include <utility>
#include <vector>
#include "opentelemetry/sdk/common/exporter_utils.h"
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;
class MockPushMetricExporter : public PushMetricExporter
{
public:
MockPushMetricExporter(std::chrono::milliseconds wait) : wait_(wait) {}
opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override
{
if (wait_ > std::chrono::milliseconds::zero())
{
std::this_thread::sleep_for(wait_);
}
records_.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; }
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType /* instrument_type */) const noexcept override
{
return sdk::metrics::AggregationTemporality::kCumulative;
}
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return true; }
size_t GetDataCount() { return records_.size(); }
private:
std::vector<ResourceMetrics> records_;
std::chrono::milliseconds wait_;
};
class MockMetricProducer : public MetricProducer
{
public:
MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero())
: sleep_ms_{sleep_ms}
{}
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data;
return {data, MetricProducer::Status::kSuccess};
}
size_t GetDataCount() { return data_sent_size_; }
private:
std::chrono::microseconds sleep_ms_;
size_t data_sent_size_{0};
};
TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{0}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
reader->ForceFlush();
reader->Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}
TEST(PeriodicExporingMetricReader, Timeout)
{
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{2000}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
reader->Shutdown();
}
|