File: router.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (139 lines) | stat: -rw-r--r-- 5,320 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package tunserver

import (
	"context"
	"time"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
	otelmetric "go.opentelemetry.io/otel/metric"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
	"google.golang.org/grpc"
)

const (
	// RoutingHopPrefix is a metadata key prefix that is used for metadata keys that should be consumed by
	// the gateway kas instances and not passed along to agentk.
	RoutingHopPrefix = "kas-hop-"

	routingDurationMetricName = "tunnel_routing_duration"
	routingTimeoutMetricName  = "tunnel_routing_timeout_total"

	routerTracerName = "tunnel-router"
)

type FindHandle interface {
	// Get finds a tunnel to an agentk.
	// It waits for a matching tunnel to proxy a connection through. When a matching tunnel is found, it is returned.
	// It returns gRPC status errors only, ready to return from RPC handler.
	Get(ctx context.Context) (Tunnel, error)
	// Done must be called to free resources of this FindHandle instance.
	// ctx is used for tracing only.
	Done(ctx context.Context)
}

type RouterPlugin interface {
	// GatewayFinderForStream
	// It returns an error, compatible with gRPC status package.
	GatewayFinderForStream(grpc.ServerStream, modshared.RPCAPI) (GatewayFinder, *zap.Logger, int64 /* agentID */, error)
	// FindTunnel starts searching for a tunnel to a matching tunclient.
	// Tunnel found boolean indicates whether a suitable tunnel is immediately available from the
	// returned FindHandle object.
	FindTunnel(grpc.ServerStream, modshared.RPCAPI) (bool, *zap.Logger, FindHandle, error)
}

// Router routes traffic from kas to another kas to agentk.
// routing kas -> gateway kas -> agentk
type Router struct {
	plugin RouterPlugin
	// internalServer is the internal gRPC server for use inside of kas.
	// Request handlers can obtain the per-request logger using grpctool.LoggerFromContext(requestContext).
	internalServer grpc.ServiceRegistrar
	// privateAPIServer is the gRPC server that other kas instances can talk to.
	// Request handlers can obtain the per-request logger using grpctool.LoggerFromContext(requestContext).
	privateAPIServer  grpc.ServiceRegistrar
	tracer            trace.Tracer
	routingDuration   otelmetric.Float64Histogram
	routingTimeout    otelmetric.Int64Counter
	tunnelFindTimeout time.Duration
}

func NewRouter(plugin RouterPlugin,
	internalServer, privateAPIServer grpc.ServiceRegistrar,
	tunnelFindTimeout time.Duration, tp trace.TracerProvider, dm otelmetric.Meter) (*Router, error) {
	routingDuration, timeoutCounter, err := constructKASRoutingMetrics(dm)
	if err != nil {
		return nil, err
	}
	return &Router{
		plugin:            plugin,
		internalServer:    internalServer,
		privateAPIServer:  privateAPIServer,
		tracer:            tp.Tracer(routerTracerName),
		routingDuration:   routingDuration,
		routingTimeout:    timeoutCounter,
		tunnelFindTimeout: tunnelFindTimeout,
	}, nil
}

func constructKASRoutingMetrics(dm otelmetric.Meter) (otelmetric.Float64Histogram, otelmetric.Int64Counter, error) {
	hist, err := dm.Float64Histogram(
		routingDurationMetricName,
		otelmetric.WithUnit("s"),
		otelmetric.WithDescription("The time it takes the tunnel router to find a suitable tunnel in seconds"),
		otelmetric.WithExplicitBucketBoundaries(0.001, 0.004, 0.016, 0.064, 0.256, 1.024, 4.096, 16.384),
	)
	if err != nil {
		return nil, nil, err
	}
	timeoutCounter, err := dm.Int64Counter(
		routingTimeoutMetricName,
		otelmetric.WithDescription("The total number of times routing timed out i.e. didn't find a suitable tunnel within allocated time"),
	)
	if err != nil {
		return nil, nil, err
	}
	return hist, timeoutCounter, nil
}

func (r *Router) RegisterTunclientAPI(desc *grpc.ServiceDesc) {
	// 1. Munge the descriptor into the right shape:
	//    - turn all unary calls into streaming calls
	//    - all streaming calls, including the ones from above, are handled by routing handlers
	internalServerDesc := mungeDescriptor(desc, r.routeToGatewayTunserver)
	privateAPIServerDesc := mungeDescriptor(desc, r.routeToTunclient)

	// 2. Register on InternalServer gRPC server so that ReverseTunnelClient can be used in kas to send data to
	//    this API within this kas instance. This kas instance then routes the stream to the gateway kas instance.
	r.internalServer.RegisterService(internalServerDesc, nil)

	// 3. Register on PrivateApiServer gRPC server so that this kas instance can act as the gateway kas instance
	//    from above and then route to one of the matching connected agentk instances.
	r.privateAPIServer.RegisterService(privateAPIServerDesc, nil)
}

func mungeDescriptor(in *grpc.ServiceDesc, handler grpc.StreamHandler) *grpc.ServiceDesc {
	streams := make([]grpc.StreamDesc, 0, len(in.Streams)+len(in.Methods))
	for _, stream := range in.Streams {
		streams = append(streams, grpc.StreamDesc{
			StreamName:    stream.StreamName,
			Handler:       handler,
			ServerStreams: true,
			ClientStreams: true,
		})
	}
	// Turn all methods into streams
	for _, method := range in.Methods {
		streams = append(streams, grpc.StreamDesc{
			StreamName:    method.MethodName,
			Handler:       handler,
			ServerStreams: true,
			ClientStreams: true,
		})
	}
	return &grpc.ServiceDesc{
		ServiceName: in.ServiceName,
		Streams:     streams,
		Metadata:    in.Metadata,
	}
}