1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
package kasapp
import (
"context"
"errors"
"sync"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab"
gapi "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/cache"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type serverAgentRpcApi struct {
modserver.RpcApi
Token api.AgentToken
GitLabClient gitlab.ClientInterface
AgentInfoCache *cache.CacheWithErr[api.AgentToken, *api.AgentInfo]
agentIdAttrOnce sync.Once
}
func (a *serverAgentRpcApi) AgentToken() api.AgentToken {
return a.Token
}
func (a *serverAgentRpcApi) AgentInfo(ctx context.Context, log *zap.Logger) (*api.AgentInfo, error) {
agentInfo, err := a.getAgentInfoCached(ctx)
switch {
case err == nil:
a.agentIdAttrOnce.Do(func() {
trace.SpanFromContext(ctx).SetAttributes(api.TraceAgentIdAttr.Int64(agentInfo.Id))
})
return agentInfo, nil
case errors.Is(err, context.Canceled):
err = status.Error(codes.Canceled, err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.Error(codes.DeadlineExceeded, err.Error())
case gitlab.IsForbidden(err):
err = status.Error(codes.PermissionDenied, "forbidden")
case gitlab.IsUnauthorized(err):
err = status.Error(codes.Unauthenticated, "unauthenticated")
case gitlab.IsNotFound(err):
err = status.Error(codes.NotFound, "agent not found")
default:
a.HandleProcessingError(log, modshared.NoAgentId, "AgentInfo()", err)
err = status.Error(codes.Unavailable, "unavailable")
}
return nil, err
}
func (a *serverAgentRpcApi) getAgentInfoCached(ctx context.Context) (*api.AgentInfo, error) {
return a.AgentInfoCache.GetItem(ctx, a.Token, func() (*api.AgentInfo, error) {
return gapi.GetAgentInfo(ctx, a.GitLabClient, a.Token, gitlab.WithoutRetries())
})
}
type serverAgentRpcApiFactory struct {
rpcApiFactory modserver.RpcApiFactory
gitLabClient gitlab.ClientInterface
agentInfoCache *cache.CacheWithErr[api.AgentToken, *api.AgentInfo]
}
func (f *serverAgentRpcApiFactory) New(ctx context.Context, fullMethodName string) (modserver.AgentRpcApi, error) {
token, err := grpc_auth.AuthFromMD(ctx, "bearer")
if err != nil {
return nil, err
}
return &serverAgentRpcApi{
RpcApi: f.rpcApiFactory(ctx, fullMethodName),
Token: api.AgentToken(token),
GitLabClient: f.gitLabClient,
AgentInfoCache: f.agentInfoCache,
}, nil
}
|