File: worker_manager.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (127 lines) | stat: -rw-r--r-- 3,753 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package modagent

import (
	"context"
	"fmt"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/agentcfg"
	"go.uber.org/zap"
	"google.golang.org/protobuf/proto"
	"k8s.io/apimachinery/pkg/util/wait"
)

type WorkSource[C proto.Message] interface {
	ID() string
	Configuration() C
}

type WorkerFactory[C proto.Message] interface {
	New(agentId int64, source WorkSource[C]) Worker
	SourcesFromConfiguration(*agentcfg.AgentConfiguration) []WorkSource[C]
}

type Worker interface {
	Run(context.Context)
}

type WorkerManager[C proto.Message] struct {
	log           *zap.Logger
	workerFactory WorkerFactory[C]
	workers       map[string]*workerHolder[C] // source id -> worker holder instance
}

func NewWorkerManager[C proto.Message](log *zap.Logger, workerFactory WorkerFactory[C]) *WorkerManager[C] {
	return &WorkerManager[C]{
		log:           log,
		workerFactory: workerFactory,
		workers:       map[string]*workerHolder[C]{},
	}
}

func (m *WorkerManager[C]) startNewWorker(agentId int64, source WorkSource[C]) {
	id := source.ID()
	m.log.Info("Starting worker", logz.WorkerId(id))
	worker := m.workerFactory.New(agentId, source)
	ctx, cancel := context.WithCancel(context.Background())
	holder := &workerHolder[C]{
		sourceId: id,
		config:   source.Configuration(),
		stop:     cancel,
	}
	holder.wg.StartWithContext(ctx, worker.Run)
	m.workers[id] = holder
}

func (m *WorkerManager[C]) ApplyConfiguration(agentId int64, cfg *agentcfg.AgentConfiguration) error {
	sources := m.workerFactory.SourcesFromConfiguration(cfg)
	newSetOfSources := make(map[string]struct{}, len(sources))
	var sourcesToStartWorkersFor []WorkSource[C]
	var workersToStop []*workerHolder[C] // nolint:prealloc

	// Collect sources without workers or with updated configuration.
	for _, source := range sources {
		id := source.ID()
		if _, ok := newSetOfSources[id]; ok {
			return fmt.Errorf("duplicate source id: %s", id)
		}
		newSetOfSources[id] = struct{}{}
		holder := m.workers[id]
		if holder == nil { // New source added
			sourcesToStartWorkersFor = append(sourcesToStartWorkersFor, source)
		} else { // We have a worker for this source already
			if proto.Equal(source.Configuration(), holder.config) {
				// Worker's configuration hasn't changed, nothing to do here
				continue
			}
			m.log.Info("Configuration has been updated, restarting worker", logz.WorkerId(id))
			workersToStop = append(workersToStop, holder)
			sourcesToStartWorkersFor = append(sourcesToStartWorkersFor, source)
		}
	}

	// Stop workers for sources which have been removed from the list.
	for sourceId, holder := range m.workers {
		if _, ok := newSetOfSources[sourceId]; ok {
			continue
		}
		workersToStop = append(workersToStop, holder)
	}

	// Tell workers that should be stopped to stop.
	for _, holder := range workersToStop {
		m.log.Info("Stopping worker", logz.WorkerId(holder.sourceId))
		holder.stop()
		delete(m.workers, holder.sourceId)
	}

	// Wait for stopped workers to finish.
	for _, holder := range workersToStop {
		m.log.Info("Waiting for worker to stop", logz.WorkerId(holder.sourceId))
		holder.wg.Wait()
	}

	// Start new workers for new sources or because of updated configuration.
	for _, source := range sourcesToStartWorkersFor {
		m.startNewWorker(agentId, source) // nolint: contextcheck
	}
	return nil
}

func (m *WorkerManager[C]) StopAllWorkers() {
	// Tell all workers to stop
	for _, holder := range m.workers {
		holder.stop()
	}
	// Wait for all workers to stop
	for _, holder := range m.workers {
		holder.wg.Wait()
	}
}

type workerHolder[C proto.Message] struct {
	sourceId string
	config   C
	wg       wait.Group
	stop     context.CancelFunc
}