File: router_to_gateway_tunserver.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (96 lines) | stat: -rw-r--r-- 3,568 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package tunserver

import (
	"context"
	"errors"
	"time"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"go.opentelemetry.io/otel/attribute"
	otelcodes "go.opentelemetry.io/otel/codes"
	otelmetric "go.opentelemetry.io/otel/metric"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

const (
	routingStatusAttributeName attribute.Key = "status"
)

var (
	routingStatusSuccessAttrSet = attribute.NewSet(routingStatusAttributeName.String("success"))
	routingStatusAbortedAttrSet = attribute.NewSet(routingStatusAttributeName.String("aborted"))
)

// routeToGatewayTunserver is a gRPC handler that routes the request to a gateway tunserver instance.
// Must return a gRPC status-compatible error.
func (r *Router) routeToGatewayTunserver(srv interface{}, stream grpc.ServerStream) error {
	// 0. boilerplate
	ctx := stream.Context()
	rpcAPI := modshared.RPCAPIFromContext(ctx)

	// 1. find a ready, suitable gateway tunserver
	rg, log, err := r.findReadyGateway(ctx, stream, rpcAPI)
	if err != nil {
		return err
	}
	defer rg.Done()

	// 2. start streaming via the found gateway tunserver
	f := kasStreamForwarder{
		log:    log.With(logz.KASURL(rg.URL)),
		rpcAPI: rpcAPI,
	}
	return f.ForwardStream(rg.Stream, stream)
}

func (r *Router) findReadyGateway(ctx context.Context, stream grpc.ServerStream, rpcAPI modshared.RPCAPI) (ReadyGateway, *zap.Logger, error) {
	startRouting := time.Now()
	findCtx, span := r.tracer.Start(ctx, "router.findReadyGateway", trace.WithSpanKind(trace.SpanKindInternal))
	defer span.End()

	gf, log, agentID, err := r.plugin.GatewayFinderForStream(stream, rpcAPI)
	if err != nil {
		return ReadyGateway{}, nil, err
	}
	findCtx, findCancel := context.WithTimeout(findCtx, r.tunnelFindTimeout)
	defer findCancel()

	rg, err := gf.Find(findCtx)
	if err != nil {
		switch { // Order is important here.
		case ctx.Err() != nil: // Incoming stream canceled.
			r.routingDuration.Record( //nolint: contextcheck
				context.Background(),
				float64(time.Since(startRouting))/float64(time.Second),
				otelmetric.WithAttributeSet(routingStatusAbortedAttrSet),
			)
			span.SetStatus(otelcodes.Error, "Aborted")
			span.RecordError(ctx.Err())
			return ReadyGateway{}, nil, grpctool.StatusErrorFromContext(ctx, "request aborted")
		case findCtx.Err() != nil: // Find tunnel timed out.
			r.routingTimeout.Add(context.Background(), 1) //nolint: contextcheck
			findCtxErr := findCtx.Err()
			span.SetStatus(otelcodes.Error, "Timed out")
			span.RecordError(findCtxErr)
			rpcAPI.HandleProcessingError(log, agentID, "Agent connection not found", errors.New(findCtxErr.Error()))
			return ReadyGateway{}, nil, status.Error(codes.DeadlineExceeded, "agent connection not found. Is agent up to date and connected?")
		default: // This should never happen, but let's handle a non-ctx error for completeness and future-proofing.
			span.SetStatus(otelcodes.Error, "Failed")
			span.RecordError(err)
			return ReadyGateway{}, nil, status.Errorf(codes.Unavailable, "find tunnel failed: %v", err)
		}
	}
	r.routingDuration.Record( //nolint: contextcheck
		context.Background(),
		float64(time.Since(startRouting))/float64(time.Second),
		otelmetric.WithAttributeSet(routingStatusSuccessAttrSet),
	)
	span.SetStatus(otelcodes.Ok, "")
	return rg, log, nil
}