File: max_conn_age.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (129 lines) | stat: -rw-r--r-- 5,309 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package grpctool

import (
	"context"
	"time"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/mathz"
	"google.golang.org/grpc"
	"google.golang.org/grpc/keepalive"
	"google.golang.org/grpc/stats"
)

// Load balancer in front of kas typically has a timeout parameter, limiting the maximum connection age.
// After it elapses, the load balancer aborts the connection and the client sees a connection reset.
// This is not good. To mitigate the problem all long-running server-side poll loops should stop beforehand.
//
// After gRPC max connection age time elapses, gRPC will send a HTTP/2 GOAWAY frame to the client, to let it know that it
// should not use this TCP connection for any new RPCs and, after all in-flight requests had finished, close it.
// After gRPC max connection age time elapses, max connection grace period starts, allowing in-flight requests to finish cleanly.
// After this second time interval elapses, gRPC aborts the connection.
// https://gitlab.com/gitlab-org/cluster-integration/gitlab-agent/-/issues/138 is the example of the above
// behavior.
// We tag each TCP connection with a timeout context set to expire after gRPC max connection age duration to be able to
// return from the request handler just after gRPC max connection age duration had elapsed. Returning after and not
// before is important so that the client receives GOAWAY first and makes no further requests on this TCP connection.

// https://github.com/grpc/grpc/issues/26703 feature request to help get rid of the messy stuff below.

type maxConnAgeCtxKeyType int

const (
	maxConnAgeCtxKey maxConnAgeCtxKeyType = iota
	maxConnAgeConnStartKey
)

const (
	maxConnectionAgeJitterPercent = 5
	// gRPC applies +/- 10% jitter to MaxConnectionAge parameter.
	// https://github.com/grpc/grpc-go/blob/v1.39.0/internal/transport/http2_server.go#L1339-L1347
	maxConnectionAgeGrpcJitterPercent = 10
	maxConnectionAgeGracePercent      = 20
)

func MaxConnectionAge2GrpcKeepalive(auxCtx context.Context, maxConnectionAge time.Duration) (grpc.ServerOption, stats.Handler) {
	kp, sh := maxConnectionAge2GrpcKeepalive(auxCtx, maxConnectionAge)
	return grpc.KeepaliveParams(kp), sh
}

func maxConnectionAge2GrpcKeepalive(auxCtx context.Context, maxConnectionAge time.Duration) (keepalive.ServerParameters, stats.Handler) {
	// See https://github.com/grpc/grpc-go/blob/v1.33.1/internal/transport/http2_server.go#L949-L1047
	// to better understand how keepalive works.
	kp := keepalive.ServerParameters{
		MaxConnectionAge: maxConnectionAge * (100 - maxConnectionAgeGracePercent - maxConnectionAgeGrpcJitterPercent - maxConnectionAgeJitterPercent) / 100, // nolint: durationcheck
		// Give pending RPCs some time to complete.
		MaxConnectionAgeGrace: maxConnectionAge * maxConnectionAgeGracePercent / 100,
		// Trying to stay below 60 seconds (typical load-balancer timeout)
		Time: 50 * time.Second,
	}
	sh := NewServerMaxConnAgeStatsHandler(auxCtx, maxConnectionAge*(100-maxConnectionAgeGracePercent-maxConnectionAgeJitterPercent)/100) // nolint: durationcheck
	return kp, sh
}

func MaxConnectionAgeContextFromStreamContext(streamCtx context.Context) context.Context {
	return streamCtx.Value(maxConnAgeCtxKey).(context.Context)
}

func AddMaxConnectionAgeContext(ctx, ageCtx context.Context) context.Context {
	return context.WithValue(ctx, maxConnAgeCtxKey, ageCtx)
}

type serverMaxConnAgeStatsHandler struct {
	auxCtx           context.Context
	maxConnectionAge time.Duration
}

func NewServerMaxConnAgeStatsHandler(auxCtx context.Context, maxConnectionAge time.Duration) stats.Handler {
	return serverMaxConnAgeStatsHandler{
		auxCtx:           auxCtx,
		maxConnectionAge: maxConnectionAge,
	}
}

func (m serverMaxConnAgeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
	var (
		ageCtx    context.Context
		ageCancel context.CancelFunc
	)
	if m.maxConnectionAge == 0 {
		ageCtx, ageCancel = context.WithCancel(m.auxCtx)
	} else {
		remainingConnAge := m.maxConnectionAge - time.Since(ctx.Value(maxConnAgeConnStartKey).(time.Time))
		ageCtx, ageCancel = context.WithTimeout(m.auxCtx, mathz.DurationWithPositiveJitter(remainingConnAge, maxConnectionAgeJitterPercent))
	}
	go func() {
		select {
		case <-ageCtx.Done():
		case <-ctx.Done():
			ageCancel()
		}
	}()
	return AddMaxConnectionAgeContext(ctx, ageCtx)
}

func (m serverMaxConnAgeStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
}

func (m serverMaxConnAgeStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
	return context.WithValue(ctx, maxConnAgeConnStartKey, time.Now())
}

func (m serverMaxConnAgeStatsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) {
}

type ServerNoopMaxConnAgeStatsHandler struct {
}

func (m ServerNoopMaxConnAgeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
	return AddMaxConnectionAgeContext(ctx, context.Background())
}

func (m ServerNoopMaxConnAgeStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
}

func (m ServerNoopMaxConnAgeStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
	return ctx
}

func (m ServerNoopMaxConnAgeStatsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) {
}