1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
package manifestops
import (
"context"
"fmt"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/gitops/rpc"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
)
func (w *worker) decode(desiredState <-chan rpc.ObjectsToSynchronizeData, jobs chan<- applyJob) {
var jobCancel context.CancelFunc
defer func() {
if jobCancel != nil {
jobCancel()
}
}()
d := syncDecoder{
restClientGetter: w.restClientGetter,
defaultNamespace: w.project.DefaultNamespace,
}
p := retryPipeline[rpc.ObjectsToSynchronizeData, applyJob]{
inputCh: desiredState,
outputCh: jobs,
retryBackoff: w.decodeRetryPolicy,
process: func(input rpc.ObjectsToSynchronizeData) (applyJob, processResult) {
objs, err := d.Decode(input.Sources)
if err != nil {
w.log.Error("Failed to decode manifest objects", logz.Error(err), logz.CommitId(input.CommitId))
return applyJob{}, backoff
}
invObj, objs, err := w.splitObjects(input.ProjectId, objs)
if err != nil {
w.log.Error("Failed to locate inventory object in manifest objects", logz.Error(err), logz.CommitId(input.CommitId))
return applyJob{}, done
}
if jobCancel != nil {
jobCancel() // Cancel running/pending job ASAP
}
newJob := applyJob{
commitId: input.CommitId,
invInfo: inventory.WrapInventoryInfoObj(invObj),
objects: objs,
}
newJob.ctx, jobCancel = context.WithCancel(context.Background()) // nolint: govet
return newJob, success
},
}
p.run()
}
func (w *worker) splitObjects(projectId int64, objs []*unstructured.Unstructured) (*unstructured.Unstructured, []*unstructured.Unstructured, error) {
invs := make([]*unstructured.Unstructured, 0, 1)
resources := make([]*unstructured.Unstructured, 0, len(objs))
for _, obj := range objs {
if inventory.IsInventoryObject(obj) {
invs = append(invs, obj)
} else {
resources = append(resources, obj)
}
}
switch len(invs) {
case 0:
return w.defaultInventoryObjTemplate(projectId), resources, nil
case 1:
return invs[0], resources, nil
default:
return nil, nil, fmt.Errorf("expecting zero or one inventory object, found %d", len(invs))
}
}
func (w *worker) defaultInventoryObjTemplate(projectId int64) *unstructured.Unstructured {
id := inventoryId(w.agentId, projectId)
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"name": "inventory-" + id,
"namespace": w.project.DefaultNamespace,
"labels": map[string]interface{}{
common.InventoryLabel: id,
},
},
},
}
}
func inventoryId(agentId, projectId int64) string {
return fmt.Sprintf("%d-%d", agentId, projectId)
}
|