1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
package grpctool
import (
"context"
"strings"
"sync"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"google.golang.org/grpc/stats"
)
const (
grpcServiceAttr attribute.Key = "grpc_service"
grpcMethodAttr attribute.Key = "grpc_method"
namespace = "grpc_"
)
type rpcTagCtxKey struct{}
type RequestsInFlightStatsHandler struct {
mu sync.Mutex // protects attrs only
attrs map[string]attribute.Set // full method name -> attribute set
counter otelmetric.Int64UpDownCounter
}
func NewClientRequestsInFlightStatsHandler(m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
return NewRequestsInFlightStatsHandler("client", m)
}
func NewServerRequestsInFlightStatsHandler(m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
return NewRequestsInFlightStatsHandler("server", m)
}
func NewRequestsInFlightStatsHandler(sub string, m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
c, err := m.Int64UpDownCounter(namespace+sub+"_requests_in_flight",
otelmetric.WithDescription("Number of requests in flight"),
)
if err != nil {
return nil, err
}
return &RequestsInFlightStatsHandler{
counter: c,
}, nil
}
func (h *RequestsInFlightStatsHandler) TagRPC(ctx context.Context, inf *stats.RPCTagInfo) context.Context {
h.mu.Lock()
defer h.mu.Unlock()
attrs, ok := h.attrs[inf.FullMethodName]
if !ok {
service, method := parseMethod(inf.FullMethodName)
attrs = attribute.NewSet(grpcServiceAttr.String(service), grpcMethodAttr.String(method))
}
return context.WithValue(ctx, rpcTagCtxKey{}, attrs)
}
func (h *RequestsInFlightStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (h *RequestsInFlightStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
}
func (h *RequestsInFlightStatsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) {
switch stat.(type) {
case *stats.Begin:
attrs := ctx.Value(rpcTagCtxKey{}).(attribute.Set)
h.counter.Add(context.Background(), 1, otelmetric.WithAttributeSet(attrs)) //nolint:contextcheck
case *stats.End:
attrs := ctx.Value(rpcTagCtxKey{}).(attribute.Set)
h.counter.Add(context.Background(), -1, otelmetric.WithAttributeSet(attrs)) //nolint:contextcheck
}
}
func parseMethod(name string) (string, string) {
if !strings.HasPrefix(name, "/") {
return "unknown", "unknown"
}
name = name[1:]
pos := strings.LastIndex(name, "/")
if pos < 0 {
return "unknown", "unknown"
}
return name[:pos], name[pos+1:]
}
|