1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
package tunserver
import (
"context"
"errors"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
routingStatusAttributeName attribute.Key = "status"
)
var (
routingStatusSuccessAttrSet = attribute.NewSet(routingStatusAttributeName.String("success"))
routingStatusAbortedAttrSet = attribute.NewSet(routingStatusAttributeName.String("aborted"))
)
// routeToGatewayTunserver is a gRPC handler that routes the request to a gateway tunserver instance.
// Must return a gRPC status-compatible error.
func (r *Router) routeToGatewayTunserver(srv interface{}, stream grpc.ServerStream) error {
// 0. boilerplate
ctx := stream.Context()
rpcAPI := modshared.RPCAPIFromContext(ctx)
// 1. find a ready, suitable gateway tunserver
rg, log, err := r.findReadyGateway(ctx, stream, rpcAPI)
if err != nil {
return err
}
defer rg.Done()
// 2. start streaming via the found gateway tunserver
f := kasStreamForwarder{
log: log.With(logz.KASURL(rg.URL)),
rpcAPI: rpcAPI,
}
return f.ForwardStream(rg.Stream, stream)
}
func (r *Router) findReadyGateway(ctx context.Context, stream grpc.ServerStream, rpcAPI modshared.RPCAPI) (ReadyGateway, *zap.Logger, error) {
startRouting := time.Now()
findCtx, span := r.tracer.Start(ctx, "router.findReadyGateway", trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()
gf, log, agentID, err := r.plugin.GatewayFinderForStream(stream, rpcAPI)
if err != nil {
return ReadyGateway{}, nil, err
}
findCtx, findCancel := context.WithTimeout(findCtx, r.tunnelFindTimeout)
defer findCancel()
rg, err := gf.Find(findCtx)
if err != nil {
switch { // Order is important here.
case ctx.Err() != nil: // Incoming stream canceled.
r.routingDuration.Record( //nolint: contextcheck
context.Background(),
float64(time.Since(startRouting))/float64(time.Second),
otelmetric.WithAttributeSet(routingStatusAbortedAttrSet),
)
span.SetStatus(otelcodes.Error, "Aborted")
span.RecordError(ctx.Err())
return ReadyGateway{}, nil, grpctool.StatusErrorFromContext(ctx, "request aborted")
case findCtx.Err() != nil: // Find tunnel timed out.
r.routingTimeout.Add(context.Background(), 1) //nolint: contextcheck
findCtxErr := findCtx.Err()
span.SetStatus(otelcodes.Error, "Timed out")
span.RecordError(findCtxErr)
rpcAPI.HandleProcessingError(log, agentID, "Agent connection not found", errors.New(findCtxErr.Error()))
return ReadyGateway{}, nil, status.Error(codes.DeadlineExceeded, "agent connection not found. Is agent up to date and connected?")
default: // This should never happen, but let's handle a non-ctx error for completeness and future-proofing.
span.SetStatus(otelcodes.Error, "Failed")
span.RecordError(err)
return ReadyGateway{}, nil, status.Errorf(codes.Unavailable, "find tunnel failed: %v", err)
}
}
r.routingDuration.Record( //nolint: contextcheck
context.Background(),
float64(time.Since(startRouting))/float64(time.Second),
otelmetric.WithAttributeSet(routingStatusSuccessAttrSet),
)
span.SetStatus(otelcodes.Ok, "")
return rg, log, nil
}
|