1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
package rpc
import (
"context"
"io"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/agentcfg"
"go.uber.org/zap"
)
type ConfigurationData struct {
CommitId string
Config *agentcfg.AgentConfiguration
}
type ConfigurationCallback func(context.Context, ConfigurationData)
// ConfigurationWatcherInterface abstracts ConfigurationWatcher.
type ConfigurationWatcherInterface interface {
Watch(context.Context, ConfigurationCallback)
}
type ConfigurationWatcher struct {
Log *zap.Logger
AgentMeta *modshared.AgentMeta
Client AgentConfigurationClient
PollConfig retry.PollConfigFactory
ConfigPreProcessor func(ConfigurationData) error
}
func (w *ConfigurationWatcher) Watch(ctx context.Context, callback ConfigurationCallback) {
var lastProcessedCommitId string
_ = retry.PollWithBackoff(ctx, w.PollConfig(), func(ctx context.Context) (error, retry.AttemptResult) {
ctx, cancel := context.WithCancel(ctx) // nolint:govet
defer cancel() // ensure streaming call is canceled
res, err := w.Client.GetConfiguration(ctx, &ConfigurationRequest{
CommitId: lastProcessedCommitId,
AgentMeta: w.AgentMeta,
})
if err != nil {
if !grpctool.RequestCanceledOrTimedOut(err) {
w.Log.Warn("GetConfiguration failed", logz.Error(err))
}
return nil, retry.Backoff
}
for {
config, err := res.Recv()
if err != nil {
switch {
case err == io.EOF: // nolint:errorlint
return nil, retry.ContinueImmediately // immediately reconnect after a clean close
case grpctool.RequestCanceledOrTimedOut(err):
default:
w.Log.Warn("GetConfiguration.Recv failed", logz.Error(err))
}
return nil, retry.Backoff
}
data := ConfigurationData{
CommitId: config.CommitId,
Config: config.Configuration,
}
err = w.ConfigPreProcessor(data)
if err != nil {
w.Log.Error("Failed to preprocess configuration", logz.Error(err))
continue
}
callback(ctx, data)
lastProcessedCommitId = config.CommitId
}
})
}
|