1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
package server
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab"
gapi "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux/rpc"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/cache"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
maxBufferedNotifications = 10
)
type server struct {
rpc.UnimplementedGitLabFluxServer
serverApi modserver.Api
droppedCounter prometheus.Counter
pollCfgFactory retry.PollConfigFactory
projectAccessClient *projectAccessClient
}
func (s *server) ReconcileProjects(req *rpc.ReconcileProjectsRequest, server rpc.GitLabFlux_ReconcileProjectsServer) error {
ctx := server.Context()
rpcApi := modserver.AgentRpcApiFromContext(ctx)
log := rpcApi.Log()
var agentInfo *api.AgentInfo
var err error
err = rpcApi.PollWithBackoff(s.pollCfgFactory(), func() (error, retry.AttemptResult) {
agentInfo, err = rpcApi.AgentInfo(ctx, log)
if err != nil {
if status.Code(err) == codes.Unavailable {
return nil, retry.Backoff
}
return err, retry.Done // no wrap
}
return nil, retry.Done
})
if agentInfo == nil {
return err // ctx done, err may be nil but must return
}
log = log.With(logz.AgentId(agentInfo.Id))
pipe := make(chan *modserver.Project, maxBufferedNotifications)
var wg wait.Group
defer wg.Wait()
defer close(pipe)
wg.Start(func() {
for project := range pipe {
hasAccess, err := s.verifyProjectAccess(ctx, log, rpcApi, agentInfo.Id, project.FullPath)
if err != nil {
rpcApi.HandleProcessingError(log, agentInfo.Id, fmt.Sprintf("Failed to check if project %s is accessible by agent", project.FullPath), err)
continue
}
if !hasAccess {
continue
}
err = server.Send(&rpc.ReconcileProjectsResponse{
Project: &rpc.Project{Id: project.FullPath},
})
if err != nil {
_ = rpcApi.HandleIoError(log, fmt.Sprintf("Failed to send reconcile message for project %s", project.FullPath), err)
}
}
})
log.Debug("Started reconcile projects ...")
defer log.Debug("Stopped reconcile projects ...")
projects := req.ToProjectSet()
s.serverApi.OnGitPushEvent(ctx, func(ctx context.Context, project *modserver.Project) {
if _, ok := projects[project.FullPath]; !ok {
// NOTE: it's probably not a good idea to log here as we'd get one for every event,
// which on GitLab.com is thousands per minute.
return
}
select {
case pipe <- project:
default:
s.droppedCounter.Inc()
// NOTE: if for whatever reason the other goroutine isn't able to keep up with the events,
// we just drop them for now.
log.Debug("Dropping Git push event", logz.ProjectId(project.FullPath))
}
})
return nil
}
// verifyProjectAccess verifies if the given agent has access to the given project.
// If this is not the case `false` is returned, otherwise `true`.
// If the error has the code Unavailable a caller my retry.
func (s *server) verifyProjectAccess(ctx context.Context, log *zap.Logger, rpcApi modserver.AgentRpcApi, agentId int64,
projectId string) (bool, error) {
hasAccess, err := s.projectAccessClient.VerifyProjectAccess(ctx, rpcApi.AgentToken(), projectId)
switch {
case err == nil:
return hasAccess, nil
case errors.Is(err, context.Canceled):
err = status.Error(codes.Canceled, err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.Error(codes.DeadlineExceeded, err.Error())
default:
rpcApi.HandleProcessingError(log, agentId, "VerifyProjectAccess()", err)
err = status.Error(codes.Unavailable, "unavailable")
}
return false, err
}
type projectAccessClient struct {
gitLabClient gitlab.ClientInterface
projectAccessCache *cache.CacheWithErr[projectAccessCacheKey, bool]
}
func (c *projectAccessClient) VerifyProjectAccess(ctx context.Context, agentToken api.AgentToken, projectId string) (bool, error) {
key := projectAccessCacheKey{agentToken: agentToken, projectId: projectId}
return c.projectAccessCache.GetItem(ctx, key, func() (bool, error) {
return gapi.VerifyProjectAccess(ctx, c.gitLabClient, agentToken, projectId, gitlab.WithoutRetries())
})
}
type projectAccessCacheKey struct {
agentToken api.AgentToken
projectId string
}
|