1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
package server
import (
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/api"
gapi "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux/rpc"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/cache"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/prototool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/redistool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
)
const (
reconcileProjectsInitBackoff = 10 * time.Second
reconcileProjectsMaxBackoff = 5 * time.Minute
reconcileProjectsResetDuration = 10 * time.Minute
reconcileProjectsBackoffFactor = 2.0
reconcileProjectsJitter = 1.0
projectAccessCacheTtl = 5 * time.Minute
projectAccessCacheErrTtl = 1 * time.Minute
fluxDroppedNotificationsCounterMetricName = "flux_dropped_git_push_notifications_total"
)
type Factory struct {
}
func (f *Factory) New(config *modserver.Config) (modserver.Module, error) {
droppedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: fluxDroppedNotificationsCounterMetricName,
Help: "The total number of dropped Git push notifications in Flux module",
})
err := config.Registerer.Register(droppedCounter)
if err != nil {
return nil, err
}
rpc.RegisterGitLabFluxServer(config.AgentServer, &server{
serverApi: config.Api,
droppedCounter: droppedCounter,
pollCfgFactory: retry.NewPollConfigFactory(0, retry.NewExponentialBackoffFactory(
reconcileProjectsInitBackoff, reconcileProjectsMaxBackoff, reconcileProjectsResetDuration, reconcileProjectsBackoffFactor, reconcileProjectsJitter)),
projectAccessClient: &projectAccessClient{
gitLabClient: config.GitLabClient,
projectAccessCache: cache.NewWithError[projectAccessCacheKey, bool](
projectAccessCacheTtl,
projectAccessCacheErrTtl,
&redistool.ErrCacher[projectAccessCacheKey]{
Log: config.Log,
ErrRep: modshared.ApiToErrReporter(config.Api),
Client: config.RedisClient,
ErrMarshaler: prototool.ProtoErrMarshaler{},
KeyToRedisKey: func(cacheKey projectAccessCacheKey) string {
var result strings.Builder
result.WriteString(config.Config.Redis.KeyPrefix)
result.WriteString(":verify_project_access_errs:")
result.Write(api.AgentToken2key(cacheKey.agentToken))
result.WriteByte(':')
result.WriteString(cacheKey.projectId)
return result.String()
},
},
config.TraceProvider.Tracer(flux.ModuleName),
gapi.IsCacheableError,
),
},
})
return &module{}, nil
}
func (f *Factory) Name() string {
return flux.ModuleName
}
func (f *Factory) StartStopPhase() modshared.ModuleStartStopPhase {
return modshared.ModuleStartBeforeServers
}
|