1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"sync/atomic"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
)
// MeterProvider handles the creation and coordination of Meters. All Meters
// created by a MeterProvider will be associated with the same Resource, have
// the same Views applied to them, and have their produced metric telemetry
// passed to the configured Readers.
type MeterProvider struct {
embedded.MeterProvider
pipes pipelines
meters cache[instrumentation.Scope, *meter]
forceFlush, shutdown func(context.Context) error
stopped atomic.Bool
}
// Compile-time check MeterProvider implements metric.MeterProvider.
var _ metric.MeterProvider = (*MeterProvider)(nil)
// NewMeterProvider returns a new and configured MeterProvider.
//
// By default, the returned MeterProvider is configured with the default
// Resource and no Readers. Readers cannot be added after a MeterProvider is
// created. This means the returned MeterProvider, one created with no
// Readers, will perform no operations.
func NewMeterProvider(options ...Option) *MeterProvider {
conf := newConfig(options)
flush, sdown := conf.readerSignals()
mp := &MeterProvider{
pipes: newPipelines(conf.res, conf.readers, conf.views),
forceFlush: flush,
shutdown: sdown,
}
// Log after creation so all readers show correctly they are registered.
global.Info("MeterProvider created",
"Resource", conf.res,
"Readers", conf.readers,
"Views", len(conf.views),
)
return mp
}
// Meter returns a Meter with the given name and configured with options.
//
// The name should be the name of the instrumentation scope creating
// telemetry. This name may be the same as the instrumented code only if that
// code provides built-in instrumentation.
//
// Calls to the Meter method after Shutdown has been called will return Meters
// that perform no operations.
//
// This method is safe to call concurrently.
func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metric.Meter {
if name == "" {
global.Warn("Invalid Meter name.", "name", name)
}
if mp.stopped.Load() {
return noop.Meter{}
}
c := metric.NewMeterConfig(options...)
s := instrumentation.Scope{
Name: name,
Version: c.InstrumentationVersion(),
SchemaURL: c.SchemaURL(),
}
global.Info("Meter created",
"Name", s.Name,
"Version", s.Version,
"SchemaURL", s.SchemaURL,
)
return mp.meters.Lookup(s, func() *meter {
return newMeter(s, mp.pipes)
})
}
// ForceFlush flushes all pending telemetry.
//
// This method honors the deadline or cancellation of ctx. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// ForceFlush calls ForceFlush(context.Context) error
// on all Readers that implements this method.
//
// This method is safe to call concurrently.
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
if mp.forceFlush != nil {
return mp.forceFlush(ctx)
}
return nil
}
// Shutdown shuts down the MeterProvider flushing all pending telemetry and
// releasing any held computational resources.
//
// This call is idempotent. The first call will perform all flush and
// releasing operations. Subsequent calls will perform no action and will
// return an error stating this.
//
// Measurements made by instruments from meters this MeterProvider created
// will not be exported after Shutdown is called.
//
// This method honors the deadline or cancellation of ctx. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// This method is safe to call concurrently.
func (mp *MeterProvider) Shutdown(ctx context.Context) error {
// Even though it may seem like there is a synchronization issue between the
// call to `Store` and checking `shutdown`, the Go concurrency model ensures
// that is not the case, as all the atomic operations executed in a program
// behave as though executed in some sequentially consistent order. This
// definition provides the same semantics as C++'s sequentially consistent
// atomics and Java's volatile variables.
// See https://go.dev/ref/mem#atomic and https://pkg.go.dev/sync/atomic.
mp.stopped.Store(true)
if mp.shutdown != nil {
return mp.shutdown(ctx)
}
return nil
}
|