1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
package tunserver
import (
"context"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const (
// RoutingHopPrefix is a metadata key prefix that is used for metadata keys that should be consumed by
// the gateway kas instances and not passed along to agentk.
RoutingHopPrefix = "kas-hop-"
routingDurationMetricName = "tunnel_routing_duration"
routingTimeoutMetricName = "tunnel_routing_timeout_total"
routerTracerName = "tunnel-router"
)
type FindHandle interface {
// Get finds a tunnel to an agentk.
// It waits for a matching tunnel to proxy a connection through. When a matching tunnel is found, it is returned.
// It returns gRPC status errors only, ready to return from RPC handler.
Get(ctx context.Context) (Tunnel, error)
// Done must be called to free resources of this FindHandle instance.
// ctx is used for tracing only.
Done(ctx context.Context)
}
type RouterPlugin interface {
// GatewayFinderForStream
// It returns an error, compatible with gRPC status package.
GatewayFinderForStream(grpc.ServerStream, modshared.RPCAPI) (GatewayFinder, *zap.Logger, int64 /* agentID */, error)
// FindTunnel starts searching for a tunnel to a matching tunclient.
// Tunnel found boolean indicates whether a suitable tunnel is immediately available from the
// returned FindHandle object.
FindTunnel(grpc.ServerStream, modshared.RPCAPI) (bool, *zap.Logger, FindHandle, error)
}
// Router routes traffic from kas to another kas to agentk.
// routing kas -> gateway kas -> agentk
type Router struct {
plugin RouterPlugin
// internalServer is the internal gRPC server for use inside of kas.
// Request handlers can obtain the per-request logger using grpctool.LoggerFromContext(requestContext).
internalServer grpc.ServiceRegistrar
// privateAPIServer is the gRPC server that other kas instances can talk to.
// Request handlers can obtain the per-request logger using grpctool.LoggerFromContext(requestContext).
privateAPIServer grpc.ServiceRegistrar
tracer trace.Tracer
routingDuration otelmetric.Float64Histogram
routingTimeout otelmetric.Int64Counter
tunnelFindTimeout time.Duration
}
func NewRouter(plugin RouterPlugin,
internalServer, privateAPIServer grpc.ServiceRegistrar,
tunnelFindTimeout time.Duration, tp trace.TracerProvider, dm otelmetric.Meter) (*Router, error) {
routingDuration, timeoutCounter, err := constructKASRoutingMetrics(dm)
if err != nil {
return nil, err
}
return &Router{
plugin: plugin,
internalServer: internalServer,
privateAPIServer: privateAPIServer,
tracer: tp.Tracer(routerTracerName),
routingDuration: routingDuration,
routingTimeout: timeoutCounter,
tunnelFindTimeout: tunnelFindTimeout,
}, nil
}
func constructKASRoutingMetrics(dm otelmetric.Meter) (otelmetric.Float64Histogram, otelmetric.Int64Counter, error) {
hist, err := dm.Float64Histogram(
routingDurationMetricName,
otelmetric.WithUnit("s"),
otelmetric.WithDescription("The time it takes the tunnel router to find a suitable tunnel in seconds"),
otelmetric.WithExplicitBucketBoundaries(0.001, 0.004, 0.016, 0.064, 0.256, 1.024, 4.096, 16.384),
)
if err != nil {
return nil, nil, err
}
timeoutCounter, err := dm.Int64Counter(
routingTimeoutMetricName,
otelmetric.WithDescription("The total number of times routing timed out i.e. didn't find a suitable tunnel within allocated time"),
)
if err != nil {
return nil, nil, err
}
return hist, timeoutCounter, nil
}
func (r *Router) RegisterTunclientAPI(desc *grpc.ServiceDesc) {
// 1. Munge the descriptor into the right shape:
// - turn all unary calls into streaming calls
// - all streaming calls, including the ones from above, are handled by routing handlers
internalServerDesc := mungeDescriptor(desc, r.routeToGatewayTunserver)
privateAPIServerDesc := mungeDescriptor(desc, r.routeToTunclient)
// 2. Register on InternalServer gRPC server so that ReverseTunnelClient can be used in kas to send data to
// this API within this kas instance. This kas instance then routes the stream to the gateway kas instance.
r.internalServer.RegisterService(internalServerDesc, nil)
// 3. Register on PrivateApiServer gRPC server so that this kas instance can act as the gateway kas instance
// from above and then route to one of the matching connected agentk instances.
r.privateAPIServer.RegisterService(privateAPIServerDesc, nil)
}
func mungeDescriptor(in *grpc.ServiceDesc, handler grpc.StreamHandler) *grpc.ServiceDesc {
streams := make([]grpc.StreamDesc, 0, len(in.Streams)+len(in.Methods))
for _, stream := range in.Streams {
streams = append(streams, grpc.StreamDesc{
StreamName: stream.StreamName,
Handler: handler,
ServerStreams: true,
ClientStreams: true,
})
}
// Turn all methods into streams
for _, method := range in.Methods {
streams = append(streams, grpc.StreamDesc{
StreamName: method.MethodName,
Handler: handler,
ServerStreams: true,
ClientStreams: true,
})
}
return &grpc.ServiceDesc{
ServiceName: in.ServiceName,
Streams: streams,
Metadata: in.Metadata,
}
}
|