File: k8s_informer.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (103 lines) | stat: -rw-r--r-- 2,737 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package agent

import (
	"context"
	"errors"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"go.uber.org/zap"
	"k8s.io/apimachinery/pkg/api/equality"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/client-go/tools/cache"
)

// https://caiorcferreira.github.io/post/the-kubernetes-dynamic-client/

type k8sInformer struct {
	informer       cache.SharedIndexInformer
	log            *zap.Logger
	backgroundTask stoppableTask
}

func newK8sInformer(log *zap.Logger, informer cache.SharedIndexInformer) (*k8sInformer, error) {
	_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			// Handler logic
			u := obj.(*unstructured.Unstructured)
			log.Debug("Received add event", extractEventFields(u)...)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			// Handler logic
			newU := newObj.(*unstructured.Unstructured)
			oldU := oldObj.(*unstructured.Unstructured)
			if equality.Semantic.DeepEqual(oldU, newU) {
				return
			}
			log.Debug("Received update event", extractEventFields(newU)...)
		},
		DeleteFunc: func(obj interface{}) {
			// Handler logic
			switch u := obj.(type) {
			case *unstructured.Unstructured:
				log.Debug("Received delete event", extractEventFields(u)...)
			default:
				log.Debug("Received unknown delete event")
			}
		},
	})
	if err != nil {
		return nil, err
	}
	return &k8sInformer{
		informer: informer,
		log:      log,
	}, nil
}

func extractEventFields(event *unstructured.Unstructured) []zap.Field {
	return []zap.Field{
		logz.WorkspaceNamespace(event.GetNamespace()),
		logz.WorkspaceName(event.GetName()),
	}
}

func (i *k8sInformer) Start(ctx context.Context) error {
	i.backgroundTask = newStoppableTask(ctx, func(ctx context.Context) {
		i.informer.Run(ctx.Done())
	})

	isSynced := cache.WaitForCacheSync(ctx.Done(), i.informer.HasSynced)

	if !isSynced {
		return errors.New("failed to sync informer during init")
	}

	return nil
}

func (i *k8sInformer) List() []*parsedWorkspace {
	list := i.informer.GetIndexer().List()
	result := make([]*parsedWorkspace, 0, len(list))

	for _, raw := range list {
		result = append(result, i.parseUnstructuredToWorkspace(raw.(*unstructured.Unstructured)))
	}

	return result
}

func (i *k8sInformer) parseUnstructuredToWorkspace(rawWorkspace *unstructured.Unstructured) *parsedWorkspace {
	return &parsedWorkspace{
		Name:              rawWorkspace.GetName(),
		Namespace:         rawWorkspace.GetNamespace(),
		ResourceVersion:   rawWorkspace.GetResourceVersion(),
		K8sDeploymentInfo: rawWorkspace.Object,
	}
}

func (i *k8sInformer) Stop() {
	if i.backgroundTask != nil {
		i.backgroundTask.StopAndWait()
		i.log.Info("informer stopped")
	}
}