1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
package agent
import (
"context"
"net/http"
"time"
notificationv1 "github.com/fluxcd/notification-controller/api/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux/rpc"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modagent"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
apiextensionsv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
)
const (
// resyncDuration defines the duration for the shared informer cache resync interval.
resyncDuration = 10 * time.Minute
reconcileProjectsInitBackoff = 10 * time.Second
reconcileProjectsMaxBackoff = 5 * time.Minute
reconcileProjectsResetDuration = 10 * time.Minute
reconcileProjectsBackoffFactor = 2.0
reconcileProjectsJitter = 1.0
)
type Factory struct {
}
func (f *Factory) New(config *modagent.Config) (modagent.Module, error) {
restConfig, err := config.K8sUtilFactory.ToRESTConfig()
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
extApiClient, err := apiextensionsv1client.NewForConfig(restConfig)
if err != nil {
return nil, err
}
receiverClient := dynamicClient.Resource(notificationv1.GroupVersion.WithResource("receivers"))
kubeApiUrl, _, err := defaultServerUrlFor(restConfig)
if err != nil {
return nil, err
}
transportCfg, err := restConfig.TransportConfig()
if err != nil {
return nil, err
}
kubeApiRoundTripper, err := transport.New(transportCfg)
if err != nil {
return nil, err
}
return &module{
log: config.Log,
k8sExtApiClient: extApiClient,
informersFactory: func() (informers.GenericInformer, informers.GenericInformer, cache.Indexer) {
informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, resyncDuration)
gitRepositoryInformer := informerFactory.ForResource(sourcev1.GroupVersion.WithResource("gitrepositories"))
receiverInformer := informerFactory.ForResource(notificationv1.GroupVersion.WithResource("receivers"))
receiverIndexer := receiverInformer.Informer().GetIndexer()
return gitRepositoryInformer, receiverInformer, receiverIndexer
},
clientFactory: func(ctx context.Context, cfgUrl string, receiverIndexer cache.Indexer) (*client, error) {
agentId, err := config.Api.GetAgentId(ctx)
if err != nil {
return nil, err
}
rt, err := newGitRepositoryReconcileTrigger(cfgUrl, kubeApiUrl, kubeApiRoundTripper, http.DefaultTransport)
if err != nil {
return nil, err
}
return newClient(
config.Log,
config.Api,
agentId,
rpc.NewGitLabFluxClient(config.KasConn),
retry.NewPollConfigFactory(0, retry.NewExponentialBackoffFactory(
reconcileProjectsInitBackoff, reconcileProjectsMaxBackoff, reconcileProjectsResetDuration, reconcileProjectsBackoffFactor, reconcileProjectsJitter),
),
receiverIndexer,
rt,
)
},
controllerFactory: func(ctx context.Context, gitRepositoryInformer informers.GenericInformer, receiverInformer informers.GenericInformer, projectReconciler projectReconciler) (controller, error) {
agentId, err := config.Api.GetAgentId(ctx)
if err != nil {
return nil, err
}
gitLabExternalUrl, err := config.Api.GetGitLabExternalUrl(ctx)
if err != nil {
return nil, err
}
return newGitRepositoryController(ctx, config.Log, config.Api, agentId, gitLabExternalUrl, gitRepositoryInformer, receiverInformer, projectReconciler, receiverClient, clientset.CoreV1())
},
}, nil
}
func (f *Factory) Name() string {
return flux.ModuleName
}
func (f *Factory) StartStopPhase() modshared.ModuleStartStopPhase {
return modshared.ModuleStartBeforeServers
}
|