File: module.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (141 lines) | stat: -rw-r--r-- 4,630 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package agent

import (
	"context"
	"fmt"

	notificationv1 "github.com/fluxcd/notification-controller/api/v1"
	sourcev1 "github.com/fluxcd/source-controller/api/v1"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/flux"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/prototool"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/syncz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/agentcfg"
	"go.uber.org/zap"
	apiextensionsv1api "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
	apiextensionsv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
	kubeerrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"
)

var (
	requiredFluxCrds = [...]schema.GroupResource{
		sourcev1.GroupVersion.WithResource("gitrepositories").GroupResource(),
		notificationv1.GroupVersion.WithResource("receivers").GroupResource(),
	}
)

type module struct {
	log               *zap.Logger
	k8sExtApiClient   apiextensionsv1client.ApiextensionsV1Interface
	informersFactory  func() (informers.GenericInformer, informers.GenericInformer, cache.Indexer)
	clientFactory     clientFactory
	controllerFactory controllerFactory
}

func (m *module) IsRunnableConfiguration(cfg *agentcfg.AgentConfiguration) bool {
	// NOTE: always running Flux module for now, but check in `Run()` if Flux is installed
	return true
}

func (m *module) Run(ctx context.Context, cfg <-chan *agentcfg.AgentConfiguration) error {
	if !m.isFluxInstalled(ctx) {
		m.log.Debug("Flux is not installed, skipping module")
		<-ctx.Done()
		return nil
	}

	wh := syncz.NewProtoWorkerHolder[*agentcfg.FluxCF](
		func(config *agentcfg.FluxCF) syncz.Worker {
			return syncz.WorkerFunc(func(ctx context.Context) {
				if err := m.run(ctx, config); err != nil {
					m.log.Error("Failed to run module", logz.Error(err))
				}
			})
		},
	)
	defer wh.StopAndWait()

	for config := range cfg {
		wh.ApplyConfig(ctx, config.Flux)
	}
	return nil
}

func (m *module) run(ctx context.Context, cfg *agentcfg.FluxCF) error {
	var wg wait.Group
	defer wg.Wait()

	runCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	gitRepositoryInformer, receiverInformer, receiverIndexer := m.informersFactory()

	cl, err := m.clientFactory(runCtx, cfg.WebhookReceiverUrl, receiverIndexer)
	if err != nil {
		return fmt.Errorf("unable to create receiver: %w", err)
	}
	c, err := m.controllerFactory(runCtx, gitRepositoryInformer, receiverInformer, cl)
	if err != nil {
		return fmt.Errorf("unable to start controller: %w", err)
	}

	wg.StartWithChannel(runCtx.Done(), gitRepositoryInformer.Informer().Run)
	wg.StartWithChannel(runCtx.Done(), receiverInformer.Informer().Run)
	wg.StartWithContext(runCtx, cl.RunProjectReconciliation)

	c.Run(runCtx)
	return nil
}

func (m *module) DefaultAndValidateConfiguration(cfg *agentcfg.AgentConfiguration) error {
	prototool.NotNil(&cfg.Flux)
	prototool.String(&cfg.Flux.WebhookReceiverUrl, defaultServiceApiBaseUrl)
	return nil
}

func (m *module) Name() string {
	return flux.ModuleName
}

func (m *module) isFluxInstalled(ctx context.Context) bool {
	for _, crd := range requiredFluxCrds {
		ok, err := checkCRDExistsAndEstablished(ctx, m.k8sExtApiClient, crd)
		if err != nil {
			m.log.Error("Unable to check if CRD is installed", logz.K8sGroup(crd.Group), logz.Error(err))
			return false
		}
		if !ok {
			m.log.Debug("Required Flux CRD is not established", logz.K8sResource(crd.Resource))
			return false
		}
	}
	return true
}

func checkCRDExistsAndEstablished(ctx context.Context, client apiextensionsv1client.ApiextensionsV1Interface, crd schema.GroupResource) (bool, error) {
	obj, err := client.CustomResourceDefinitions().Get(ctx, crd.String(), metav1.GetOptions{})
	if err != nil {
		if kubeerrors.IsNotFound(err) {
			return false, nil
		}
		return false, fmt.Errorf("unable to get CRD %s: %w", crd.String(), err)
	}

	established := false
	for _, cond := range obj.Status.Conditions {
		switch cond.Type { // nolint:exhaustive
		case apiextensionsv1api.Established:
			if cond.Status == apiextensionsv1api.ConditionTrue {
				established = true
			}
			// we don't really care about any other conditions for now, because we don't own this CRD
			// and expect the owner to make sure it becomes established.
		}
	}
	return established, nil
}