File: server.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (156 lines) | stat: -rw-r--r-- 5,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package server

import (
	"context"
	"errors"
	"fmt"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/api"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab"
	gapi "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab/api"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux/rpc"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/usage_metrics"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/cache"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/event"
	otelmetric "go.opentelemetry.io/otel/metric"
	"go.uber.org/zap"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"k8s.io/apimachinery/pkg/util/wait"
)

const (
	maxBufferedNotifications = 10
)

type server struct {
	rpc.UnimplementedGitLabFluxServer
	serverAPI               modserver.API
	notifiedCounter         otelmetric.Int64Counter
	notifiedUsageCounter    usage_metrics.Counter
	notifiedProjectsCounter usage_metrics.UniqueCounter
	droppedCounter          otelmetric.Int64Counter
	pollCfgFactory          retry.PollConfigFactory
	projectAccessClient     *projectAccessClient
}

func (s *server) ReconcileProjects(req *rpc.ReconcileProjectsRequest, server rpc.GitLabFlux_ReconcileProjectsServer) error {
	ctx := server.Context()
	rpcAPI := modserver.AgentRPCAPIFromContext(ctx)
	log := rpcAPI.Log()
	var agentInfo *api.AgentInfo
	var err error

	err = rpcAPI.PollWithBackoff(s.pollCfgFactory(), func() (error, retry.AttemptResult) {
		agentInfo, err = rpcAPI.AgentInfo(ctx, log)
		if err != nil {
			if status.Code(err) == codes.Unavailable {
				return nil, retry.Backoff
			}
			return err, retry.Done // no wrap
		}
		return nil, retry.Done
	})
	if agentInfo == nil {
		return err // ctx done, err may be nil but must return
	}

	log = log.With(logz.AgentID(agentInfo.ID))

	pipe := make(chan *event.Project, maxBufferedNotifications)
	var wg wait.Group
	defer wg.Wait()
	defer close(pipe)
	wg.Start(func() {
		for project := range pipe {
			hasAccess, err := s.verifyProjectAccess(ctx, log, rpcAPI, agentInfo.ID, project.FullPath)
			if err != nil {
				rpcAPI.HandleProcessingError(log, agentInfo.ID, fmt.Sprintf("Failed to check if project %s is accessible by agent", project.FullPath), err)
				continue
			}
			if !hasAccess {
				continue
			}

			// increase Flux git push event notification counter
			s.notifiedCounter.Add(context.Background(), 1) //nolint: contextcheck
			s.notifiedUsageCounter.Inc()
			s.notifiedProjectsCounter.Add(project.Id)

			err = server.Send(&rpc.ReconcileProjectsResponse{
				Project: &rpc.Project{Id: project.FullPath},
			})
			if err != nil {
				_ = rpcAPI.HandleIOError(log, fmt.Sprintf("failed to send reconcile message for project %s", project.FullPath), err)
			}
		}
	})

	log.Debug("Started reconcile projects ...")
	defer log.Debug("Stopped reconcile projects ...")
	projects := req.ToProjectSet()
	// Stop listening for push events when:
	// - server is shutting down
	// - the RPC connection is done
	// - max connection age is reached
	ageCtx := grpctool.MaxConnectionAgeContextFromStreamContext(ctx)
	s.serverAPI.OnGitPushEvent(ageCtx, func(ctx context.Context, e *event.GitPushEvent) {
		if _, ok := projects[e.Project.FullPath]; !ok {
			// NOTE: it's probably not a good idea to log here as we'd get one for every event,
			// which on GitLab.com is thousands per minute.
			return
		}

		select {
		case pipe <- e.Project:
		default:
			s.droppedCounter.Add(context.Background(), 1) //nolint: contextcheck
			// NOTE: if for whatever reason the other goroutine isn't able to keep up with the events,
			// we just drop them for now.
			log.Debug("Dropping git push event", logz.ProjectID(e.Project.FullPath))
		}
	})

	return nil
}

// verifyProjectAccess verifies if the given agent has access to the given project.
// If this is not the case `false` is returned, otherwise `true`.
// If the error has the code Unavailable a caller my retry.
func (s *server) verifyProjectAccess(ctx context.Context, log *zap.Logger, rpcAPI modserver.AgentRPCAPI, agentID int64,
	projectID string) (bool, error) {
	hasAccess, err := s.projectAccessClient.VerifyProjectAccess(ctx, rpcAPI.AgentToken(), projectID)
	switch {
	case err == nil:
		return hasAccess, nil
	case errors.Is(err, context.Canceled):
		err = status.Error(codes.Canceled, err.Error())
	case errors.Is(err, context.DeadlineExceeded):
		err = status.Error(codes.DeadlineExceeded, err.Error())
	default:
		rpcAPI.HandleProcessingError(log, agentID, "VerifyProjectAccess()", err)
		err = status.Error(codes.Unavailable, "unavailable")
	}
	return false, err
}

type projectAccessClient struct {
	gitLabClient       gitlab.ClientInterface
	projectAccessCache *cache.CacheWithErr[projectAccessCacheKey, bool]
}

func (c *projectAccessClient) VerifyProjectAccess(ctx context.Context, agentToken api.AgentToken, projectID string) (bool, error) {
	key := projectAccessCacheKey{agentToken: agentToken, projectID: projectID}
	return c.projectAccessCache.GetItem(ctx, key, func() (bool, error) {
		return gapi.VerifyProjectAccess(ctx, c.gitLabClient, agentToken, projectID, gitlab.WithoutRetries())
	})
}

type projectAccessCacheKey struct {
	agentToken api.AgentToken
	projectID  string
}