1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
package kasapp
import (
"context"
"net"
"github.com/ash2k/stager"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/observability"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type internalServer struct {
server *grpc.Server
inMemConn *grpc.ClientConn
inMemListener net.Listener
ready func()
}
func newInternalServer(tp trace.TracerProvider, p propagation.TextMapPropagator,
factory modserver.RpcApiFactory, probeRegistry *observability.ProbeRegistry) (*internalServer, error) {
// In-memory gRPC client->listener pipe
listener := grpctool.NewDialListener()
// Construct connection to internal gRPC server
conn, err := grpc.DialContext(context.Background(), "pipe", // nolint: contextcheck
grpc.WithContextDialer(listener.DialContext),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPropagators(p)),
grpctool.StreamClientValidatingInterceptor,
),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPropagators(p)),
grpctool.UnaryClientValidatingInterceptor,
),
)
if err != nil {
return nil, err
}
return &internalServer{
server: grpc.NewServer(
grpc.StatsHandler(grpctool.ServerNoopMaxConnAgeStatsHandler{}),
grpc.ChainStreamInterceptor(
otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPropagators(p)), // 1. trace
modserver.StreamRpcApiInterceptor(factory), // 2. inject RPC API
),
grpc.ChainUnaryInterceptor(
otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPropagators(p)), // 1. trace
modserver.UnaryRpcApiInterceptor(factory), // 2. inject RPC API
),
grpc.ForceServerCodec(grpctool.RawCodec{}),
),
inMemConn: conn,
inMemListener: listener,
ready: probeRegistry.RegisterReadinessToggle("internalServer"),
}, nil
}
func (s *internalServer) Start(stage stager.Stage) {
grpctool.StartServer(stage, s.server, func() (net.Listener, error) {
s.ready()
return s.inMemListener, nil
}, func() {})
}
|