1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
package grpctool
import (
"context"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/mathz"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
)
// Load balancer in front of kas typically has a timeout parameter, limiting the maximum connection age.
// After it elapses, the load balancer aborts the connection and the client sees a connection reset.
// This is not good. To mitigate the problem all long-running server-side poll loops should stop beforehand.
//
// After gRPC max connection age time elapses, gRPC will send a HTTP/2 GOAWAY frame to the client, to let it know that it
// should not use this TCP connection for any new RPCs and, after all in-flight requests had finished, close it.
// After gRPC max connection age time elapses, max connection grace period starts, allowing in-flight requests to finish cleanly.
// After this second time interval elapses, gRPC aborts the connection.
// https://gitlab.com/gitlab-org/cluster-integration/gitlab-agent/-/issues/138 is the example of the above
// behavior.
// We tag each TCP connection with a timeout context set to expire after gRPC max connection age duration to be able to
// return from the request handler just after gRPC max connection age duration had elapsed. Returning after and not
// before is important so that the client receives GOAWAY first and makes no further requests on this TCP connection.
// https://github.com/grpc/grpc/issues/26703 feature request to help get rid of the messy stuff below.
type maxConnAgeCtxKeyType int
const (
maxConnAgeCtxKey maxConnAgeCtxKeyType = iota
maxConnAgeConnStartKey
)
const (
maxConnectionAgeJitterPercent = 5
// gRPC applies +/- 10% jitter to MaxConnectionAge parameter.
// https://github.com/grpc/grpc-go/blob/v1.39.0/internal/transport/http2_server.go#L1339-L1347
maxConnectionAgeGrpcJitterPercent = 10
maxConnectionAgeGracePercent = 20
)
func MaxConnectionAge2GrpcKeepalive(auxCtx context.Context, maxConnectionAge time.Duration) (grpc.ServerOption, stats.Handler) {
kp, sh := maxConnectionAge2GrpcKeepalive(auxCtx, maxConnectionAge)
return grpc.KeepaliveParams(kp), sh
}
func maxConnectionAge2GrpcKeepalive(auxCtx context.Context, maxConnectionAge time.Duration) (keepalive.ServerParameters, stats.Handler) {
// See https://github.com/grpc/grpc-go/blob/v1.33.1/internal/transport/http2_server.go#L949-L1047
// to better understand how keepalive works.
kp := keepalive.ServerParameters{
MaxConnectionAge: maxConnectionAge * (100 - maxConnectionAgeGracePercent - maxConnectionAgeGrpcJitterPercent - maxConnectionAgeJitterPercent) / 100, // nolint: durationcheck
// Give pending RPCs some time to complete.
MaxConnectionAgeGrace: maxConnectionAge * maxConnectionAgeGracePercent / 100,
// Trying to stay below 60 seconds (typical load-balancer timeout)
Time: 50 * time.Second,
}
sh := NewServerMaxConnAgeStatsHandler(auxCtx, maxConnectionAge*(100-maxConnectionAgeGracePercent-maxConnectionAgeJitterPercent)/100) // nolint: durationcheck
return kp, sh
}
func MaxConnectionAgeContextFromStreamContext(streamCtx context.Context) context.Context {
return streamCtx.Value(maxConnAgeCtxKey).(context.Context)
}
func AddMaxConnectionAgeContext(ctx, ageCtx context.Context) context.Context {
return context.WithValue(ctx, maxConnAgeCtxKey, ageCtx)
}
type serverMaxConnAgeStatsHandler struct {
auxCtx context.Context
maxConnectionAge time.Duration
}
func NewServerMaxConnAgeStatsHandler(auxCtx context.Context, maxConnectionAge time.Duration) stats.Handler {
return serverMaxConnAgeStatsHandler{
auxCtx: auxCtx,
maxConnectionAge: maxConnectionAge,
}
}
func (m serverMaxConnAgeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
var (
ageCtx context.Context
ageCancel context.CancelFunc
)
if m.maxConnectionAge == 0 {
ageCtx, ageCancel = context.WithCancel(m.auxCtx)
} else {
remainingConnAge := m.maxConnectionAge - time.Since(ctx.Value(maxConnAgeConnStartKey).(time.Time))
ageCtx, ageCancel = context.WithTimeout(m.auxCtx, mathz.DurationWithPositiveJitter(remainingConnAge, maxConnectionAgeJitterPercent))
}
go func() {
select {
case <-ageCtx.Done():
case <-ctx.Done():
ageCancel()
}
}()
return AddMaxConnectionAgeContext(ctx, ageCtx)
}
func (m serverMaxConnAgeStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
}
func (m serverMaxConnAgeStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, maxConnAgeConnStartKey, time.Now())
}
func (m serverMaxConnAgeStatsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) {
}
type ServerNoopMaxConnAgeStatsHandler struct {
}
func (m ServerNoopMaxConnAgeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return AddMaxConnectionAgeContext(ctx, context.Background())
}
func (m ServerNoopMaxConnAgeStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
}
func (m ServerNoopMaxConnAgeStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}
func (m ServerNoopMaxConnAgeStatsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) {
}
|