File: configuration_watcher.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (75 lines) | stat: -rw-r--r-- 2,404 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package rpc

import (
	"context"
	"io"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/agentcfg"
	"go.uber.org/zap"
)

type ConfigurationData struct {
	CommitId string
	Config   *agentcfg.AgentConfiguration
}

type ConfigurationCallback func(context.Context, ConfigurationData)

// ConfigurationWatcherInterface abstracts ConfigurationWatcher.
type ConfigurationWatcherInterface interface {
	Watch(context.Context, ConfigurationCallback)
}

type ConfigurationWatcher struct {
	Log                *zap.Logger
	AgentMeta          *modshared.AgentMeta
	Client             AgentConfigurationClient
	PollConfig         retry.PollConfigFactory
	ConfigPreProcessor func(ConfigurationData) error
}

func (w *ConfigurationWatcher) Watch(ctx context.Context, callback ConfigurationCallback) {
	var lastProcessedCommitId string
	_ = retry.PollWithBackoff(ctx, w.PollConfig(), func(ctx context.Context) (error, retry.AttemptResult) {
		ctx, cancel := context.WithCancel(ctx) // nolint:govet
		defer cancel()                         // ensure streaming call is canceled
		res, err := w.Client.GetConfiguration(ctx, &ConfigurationRequest{
			CommitId:  lastProcessedCommitId,
			AgentMeta: w.AgentMeta,
		})
		if err != nil {
			if !grpctool.RequestCanceledOrTimedOut(err) {
				w.Log.Warn("GetConfiguration failed", logz.Error(err))
			}
			return nil, retry.Backoff
		}
		for {
			config, err := res.Recv()
			if err != nil {
				switch {
				case err == io.EOF: // nolint:errorlint
					return nil, retry.ContinueImmediately // immediately reconnect after a clean close
				case grpctool.RequestCanceledOrTimedOut(err):
				default:
					w.Log.Warn("GetConfiguration.Recv failed", logz.Error(err))
				}
				return nil, retry.Backoff
			}
			data := ConfigurationData{
				CommitId: config.CommitId,
				Config:   config.Configuration,
			}
			err = w.ConfigPreProcessor(data)
			if err != nil {
				w.Log.Error("Failed to preprocess configuration", logz.Error(err))
				continue
			}
			callback(ctx, data)
			lastProcessedCommitId = config.CommitId
		}
	})
}