File: requests_in_flight_stats_handler.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (87 lines) | stat: -rw-r--r-- 2,564 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package grpctool

import (
	"context"
	"strings"
	"sync"

	"go.opentelemetry.io/otel/attribute"
	otelmetric "go.opentelemetry.io/otel/metric"
	"google.golang.org/grpc/stats"
)

const (
	grpcServiceAttr attribute.Key = "grpc_service"
	grpcMethodAttr  attribute.Key = "grpc_method"
	namespace                     = "grpc_"
)

type rpcTagCtxKey struct{}

type RequestsInFlightStatsHandler struct {
	mu      sync.Mutex               // protects attrs only
	attrs   map[string]attribute.Set // full method name -> attribute set
	counter otelmetric.Int64UpDownCounter
}

func NewClientRequestsInFlightStatsHandler(m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
	return NewRequestsInFlightStatsHandler("client", m)
}

func NewServerRequestsInFlightStatsHandler(m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
	return NewRequestsInFlightStatsHandler("server", m)
}

func NewRequestsInFlightStatsHandler(sub string, m otelmetric.Meter) (*RequestsInFlightStatsHandler, error) {
	c, err := m.Int64UpDownCounter(namespace+sub+"_requests_in_flight",
		otelmetric.WithDescription("Number of requests in flight"),
	)
	if err != nil {
		return nil, err
	}
	return &RequestsInFlightStatsHandler{
		counter: c,
	}, nil
}

func (h *RequestsInFlightStatsHandler) TagRPC(ctx context.Context, inf *stats.RPCTagInfo) context.Context {
	h.mu.Lock()
	defer h.mu.Unlock()
	attrs, ok := h.attrs[inf.FullMethodName]
	if !ok {
		service, method := parseMethod(inf.FullMethodName)
		attrs = attribute.NewSet(grpcServiceAttr.String(service), grpcMethodAttr.String(method))
	}
	return context.WithValue(ctx, rpcTagCtxKey{}, attrs)
}

func (h *RequestsInFlightStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
	return ctx
}

func (h *RequestsInFlightStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
}

func (h *RequestsInFlightStatsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) {
	switch stat.(type) {
	case *stats.Begin:
		attrs := ctx.Value(rpcTagCtxKey{}).(attribute.Set)
		h.counter.Add(context.Background(), 1, otelmetric.WithAttributeSet(attrs)) //nolint:contextcheck
	case *stats.End:
		attrs := ctx.Value(rpcTagCtxKey{}).(attribute.Set)
		h.counter.Add(context.Background(), -1, otelmetric.WithAttributeSet(attrs)) //nolint:contextcheck
	}
}

func parseMethod(name string) (string, string) {
	if !strings.HasPrefix(name, "/") {
		return "unknown", "unknown"
	}
	name = name[1:]

	pos := strings.LastIndex(name, "/")
	if pos < 0 {
		return "unknown", "unknown"
	}
	return name[:pos], name[pos+1:]
}