File: tracker.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (419 lines) | stat: -rw-r--r-- 14,697 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
package agent_tracker //nolint:stylecheck

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/redis/rueidis"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/errz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/redistool"
	otelmetric "go.opentelemetry.io/otel/metric"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"google.golang.org/protobuf/proto"
	"k8s.io/apimachinery/pkg/util/wait"
)

const (
	refreshOverlap                       = 5 * time.Second
	connectedAgentsKey             int64 = 0
	connectionsByAgentIDHashName         = "connections_by_agent_id"
	connectionsByProjectIDHashName       = "connections_by_project_id"
	connectedAgentsHashName              = "connected_agents"
	agentVersionsHashName                = "agent_versions"
	// agentVersionKey is not actually used as a key. See `agentVersionsHashKey` function.
	agentVersionKey                   int64 = 0
	connectionsByAgentVersionHashName       = "connections_by_agent_version"
)

type ConnectedAgentInfoCallback func(*ConnectedAgentInfo) (done bool, err error)

type Registerer interface {
	// RegisterConnection registers connection with the tracker.
	RegisterConnection(ctx context.Context, info *ConnectedAgentInfo) error
	// UnregisterConnection unregisters connection with the tracker.
	UnregisterConnection(ctx context.Context, info *ConnectedAgentInfo) error
}

type ExpiringRegisterer interface {
	// RegisterExpiring registers connection with the tracker.
	// Registration will expire if not refreshed using this method.
	RegisterExpiring(ctx context.Context, info *ConnectedAgentInfo) error
}

type Querier interface {
	GetConnectionsByAgentID(ctx context.Context, agentID int64, cb ConnectedAgentInfoCallback) error
	GetConnectionsByProjectID(ctx context.Context, projectID int64, cb ConnectedAgentInfoCallback) error
	GetConnectedAgentsCount(ctx context.Context) (int64, error)
	CountAgentsByAgentVersions(ctx context.Context) (map[string]int64, error)
}

type Tracker interface {
	Registerer
	ExpiringRegisterer
	Querier
	Run(ctx context.Context) error
}

type RedisTracker struct {
	log           *zap.Logger
	errRep        errz.ErrReporter
	ttl           time.Duration
	refreshPeriod time.Duration
	gcPeriod      time.Duration

	// mu protects fields below
	mu                     sync.Mutex
	connectionsByAgentID   redistool.ExpiringHash[int64, int64] // agentID -> connectionId -> info
	connectionsByProjectID redistool.ExpiringHash[int64, int64] // projectID -> connectionId -> info
	connectedAgents        redistool.ExpiringHash[int64, int64] // hash name -> agentID -> ""

	// agentVersions keeps track of the list of agent versions that have active agents.
	agentVersions redistool.ExpiringHash[int64, string] // hash name -> agentVersions -> ""
	// connectionsByAgentVersion stores connections data for each agent versions.
	connectionsByAgentVersion redistool.ExpiringHash[string, int64] // agentVersion -> connectionId -> hash
}

func NewRedisTracker(log *zap.Logger, errRep errz.ErrReporter, client rueidis.Client, agentKeyPrefix string,
	ttl, refreshPeriod, gcPeriod time.Duration, m otelmetric.Meter) (*RedisTracker, error) {
	connectionsByAgentID, err := redistool.NewRedisExpiringHash(connectionsByAgentIDHashName, client, connectionsByAgentIDHashKey(agentKeyPrefix), int64ToStr, ttl, m, true)
	if err != nil {
		return nil, err
	}
	connectionsByProjectID, err := redistool.NewRedisExpiringHash(connectionsByProjectIDHashName, client, connectionsByProjectIDHashKey(agentKeyPrefix), int64ToStr, ttl, m, true)
	if err != nil {
		return nil, err
	}
	connectedAgents, err := redistool.NewRedisExpiringHash(connectedAgentsHashName, client, connectedAgentsHashKey(agentKeyPrefix), int64ToStr, ttl, m, false)
	if err != nil {
		return nil, err
	}
	agentVersions, err := redistool.NewRedisExpiringHash(agentVersionsHashName, client, agentVersionsHashKey(agentKeyPrefix), strToStr, ttl, m, false)
	if err != nil {
		return nil, err
	}
	connectionsByAgentVersion, err := redistool.NewRedisExpiringHash(connectionsByAgentVersionHashName, client, connectionsByAgentVersionHashKey(agentKeyPrefix), int64ToStr, ttl, m, false)
	if err != nil {
		return nil, err
	}

	return &RedisTracker{
		log:                       log,
		errRep:                    errRep,
		ttl:                       ttl,
		refreshPeriod:             refreshPeriod,
		gcPeriod:                  gcPeriod,
		connectionsByAgentID:      connectionsByAgentID,
		connectionsByProjectID:    connectionsByProjectID,
		connectedAgents:           connectedAgents,
		agentVersions:             agentVersions,
		connectionsByAgentVersion: connectionsByAgentVersion,
	}, nil
}

func (t *RedisTracker) Run(ctx context.Context) error {
	refreshTicker := time.NewTicker(t.refreshPeriod)
	defer refreshTicker.Stop()
	gcTicker := time.NewTicker(t.gcPeriod)
	defer gcTicker.Stop()
	done := ctx.Done()
	for {
		select {
		case <-done:
			return nil
		case <-refreshTicker.C:
			t.refreshRegistrations(ctx, time.Now().Add(t.refreshPeriod-refreshOverlap))
		case <-gcTicker.C:
			keysDeleted := t.runGC(ctx)
			if keysDeleted > 0 {
				t.log.Info("Deleted expired agent connections records", logz.RemovedHashKeys(keysDeleted))
			}
		}
	}
}

func (t *RedisTracker) RegisterConnection(ctx context.Context, info *ConnectedAgentInfo) error {
	infoBytes, err := proto.Marshal(info)
	if err != nil {
		// This should never happen
		return fmt.Errorf("failed to marshal object: %w", err)
	}
	exp := time.Now().Add(t.ttl)

	t.mu.Lock()
	defer t.mu.Unlock()
	var wg errgroup.Group
	wg.Go(func() error {
		return t.connectionsByProjectID.Set(ctx, info.ProjectId, info.ConnectionId, infoBytes)
	})
	wg.Go(func() error {
		return t.connectionsByAgentID.Set(ctx, info.AgentId, info.ConnectionId, infoBytes)
	})
	wg.Go(func() error {
		return t.connectedAgents.Set(ctx, connectedAgentsKey, info.AgentId, nil)
	})
	wg.Go(func() error {
		agentPodInfoBytes, err := proto.Marshal(&AgentPodInfo{
			AgentId: info.AgentId,
			PodId:   info.ConnectionId,
		})
		if err != nil {
			// This should never happen
			return fmt.Errorf("failed to marshal AgentPodInfo object: %w", err)
		}
		return t.connectionsByAgentVersion.SetEX(ctx, info.AgentMeta.Version, info.ConnectionId, agentPodInfoBytes, exp)
	})
	wg.Go(func() error {
		return t.agentVersions.SetEX(ctx, agentVersionKey, info.AgentMeta.Version, nil, exp)
	})
	return wg.Wait()
}

func (t *RedisTracker) UnregisterConnection(ctx context.Context, info *ConnectedAgentInfo) error {
	t.mu.Lock()
	defer t.mu.Unlock()
	var wg errgroup.Group
	wg.Go(func() error {
		return t.connectionsByProjectID.Unset(ctx, info.ProjectId, info.ConnectionId)
	})
	wg.Go(func() error {
		return t.connectionsByAgentID.Unset(ctx, info.AgentId, info.ConnectionId)
	})
	t.connectedAgents.Forget(connectedAgentsKey, info.AgentId)
	wg.Go(func() error {
		return t.connectionsByAgentVersion.Unset(ctx, info.AgentMeta.Version, info.ConnectionId)
	})
	return wg.Wait()
}

func (t *RedisTracker) RegisterExpiring(ctx context.Context, info *ConnectedAgentInfo) error {
	infoBytes, err := proto.Marshal(info)
	if err != nil {
		// This should never happen
		return fmt.Errorf("failed to marshal object: %w", err)
	}
	exp := time.Now().Add(t.ttl)
	var wg errgroup.Group
	wg.Go(func() error {
		return t.connectionsByProjectID.SetEX(ctx, info.ProjectId, info.ConnectionId, infoBytes, exp)
	})
	wg.Go(func() error {
		return t.connectionsByAgentID.SetEX(ctx, info.AgentId, info.ConnectionId, infoBytes, exp)
	})
	wg.Go(func() error {
		return t.connectedAgents.SetEX(ctx, connectedAgentsKey, info.AgentId, nil, exp)
	})
	wg.Go(func() error {
		agentPodInfoBytes, err := proto.Marshal(&AgentPodInfo{
			AgentId: info.AgentId,
			PodId:   info.ConnectionId,
		})
		if err != nil {
			// This should never happen
			return fmt.Errorf("failed to marshal AgentPodInfo object: %w", err)
		}
		return t.connectionsByAgentVersion.SetEX(ctx, info.AgentMeta.Version, info.ConnectionId, agentPodInfoBytes, exp)
	})
	wg.Go(func() error {
		return t.agentVersions.SetEX(ctx, agentVersionKey, info.AgentMeta.Version, nil, exp)
	})
	return wg.Wait()
}

func (t *RedisTracker) GetConnectionsByAgentID(ctx context.Context, agentID int64, cb ConnectedAgentInfoCallback) error {
	return t.getConnectionsByKey(ctx, t.connectionsByAgentID, agentID, cb)
}

func (t *RedisTracker) GetConnectionsByProjectID(ctx context.Context, projectID int64, cb ConnectedAgentInfoCallback) error {
	return t.getConnectionsByKey(ctx, t.connectionsByProjectID, projectID, cb)
}

func (t *RedisTracker) GetConnectedAgentsCount(ctx context.Context) (int64, error) {
	return t.connectedAgents.Len(ctx, connectedAgentsKey)
}

func (t *RedisTracker) CountAgentsByAgentVersions(ctx context.Context) (map[string]int64, error) {
	agentVersions, err := t.getAgentVersions(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to get agent versions from Redis: %w", err)
	}

	counts := make(map[string]int64, len(agentVersions))
	for _, version := range agentVersions {
		count, err := t.connectionsByAgentVersion.Len(ctx, version)
		if err != nil {
			return nil, fmt.Errorf("failed to get hash length from connectionsByAgentVersion in Redis: %w", err)
		}
		counts[version] = count
	}
	return counts, nil
}

func (t *RedisTracker) refreshRegistrations(ctx context.Context, nextRefresh time.Time) {
	t.mu.Lock()
	defer t.mu.Unlock()
	// Run refreshes concurrently to release mu ASAP.
	var wg wait.Group
	t.refreshHash(ctx, &wg, t.connectionsByProjectID, nextRefresh)
	t.refreshHash(ctx, &wg, t.connectionsByAgentID, nextRefresh)
	t.refreshHash(ctx, &wg, t.connectedAgents, nextRefresh)
	wg.Wait()
}

func (t *RedisTracker) refreshHash(ctx context.Context, wg *wait.Group, h redistool.ExpiringHash[int64, int64], nextRefresh time.Time) {
	wg.Start(func() {
		err := h.Refresh(ctx, nextRefresh)
		if err != nil {
			t.errRep.HandleProcessingError(ctx, t.log, fmt.Sprintf("Failed to refresh %s hash data in Redis", h.GetName()), err)
		}
	})
}

func (t *RedisTracker) runGC(ctx context.Context) int {
	type gcFunc struct {
		hashName string
		gc       func(context.Context) (int, error)
	}
	var gcFuncs []gcFunc // slice ensures deterministic iteration order
	func() {
		t.mu.Lock()
		defer t.mu.Unlock()
		gcFuncs = []gcFunc{
			{
				hashName: connectionsByProjectIDHashName,
				gc:       t.connectionsByProjectID.GC(),
			},
			{
				hashName: connectionsByAgentIDHashName,
				gc:       t.connectionsByAgentID.GC(),
			},
			{
				hashName: connectedAgentsHashName,
				gc:       t.connectedAgents.GC(),
			},
			{
				hashName: agentVersionsHashName,
				gc:       t.agentVersions.GC(), // First GC agentVersions
			},
			{
				hashName: connectionsByAgentVersionHashName,
				gc:       t.gcConnectionsByAgentVersion, // Then GC hashes based on what is in agentVersions
			},
		}
	}()
	keysDeleted := 0
	// No rush so run GC sequentially to not stress RAM/CPU/Redis/network.
	// We have more important work to do that we shouldn't impact.
	for _, gf := range gcFuncs {
		deleted, err := gf.gc(ctx)
		keysDeleted += deleted
		if err != nil {
			if errz.ContextDone(err) {
				t.log.Debug("Redis GC interrupted", logz.Error(err))
				break
			}
			t.errRep.HandleProcessingError(ctx, t.log, fmt.Sprintf("Failed to GC data in %s Redis hash", gf.hashName), err)
			// continue anyway
		}
	}
	return keysDeleted
}

func (t *RedisTracker) getConnectionsByKey(ctx context.Context, hash redistool.ExpiringHash[int64, int64], key int64, cb ConnectedAgentInfoCallback) error {
	err := hash.Scan(ctx, key, func(rawHashKey string, value []byte, err error) (bool, error) {
		if err != nil {
			t.errRep.HandleProcessingError(ctx, t.log, fmt.Sprintf("Redis %s hash scan", hash.GetName()), err)
			return false, nil
		}
		var info ConnectedAgentInfo
		err = proto.Unmarshal(value, &info)
		if err != nil {
			t.errRep.HandleProcessingError(ctx, t.log, fmt.Sprintf("Redis %s hash scan: proto.Unmarshal(ConnectedAgentInfo)", hash.GetName()), err)
			return false, nil
		}
		return cb(&info)
	})
	return err
}

func (t *RedisTracker) gcConnectionsByAgentVersion(ctx context.Context) (int, error) {
	// Get a list of agent versions.
	agentVersions, err1 := t.getAgentVersions(ctx)

	// GC connectionsByAgentVersion for agent versions that we got from agentVersions hash.
	deletedConnections, err2 := t.connectionsByAgentVersion.GCFor(agentVersions)(ctx)

	return deletedConnections, errors.Join(err1, err2)
}

func (t *RedisTracker) getAgentVersions(ctx context.Context) ([]string, error) {
	var agentVersions []string
	err := t.agentVersions.Scan(ctx, agentVersionKey, func(rawHashKey string, value []byte, err error) (bool, error) {
		if err != nil {
			t.errRep.HandleProcessingError(ctx, t.log, "getAgentVersions: failed to scan redis hash", err)
			return false, nil
		}

		agentVersions = append(agentVersions, rawHashKey)
		return false, nil
	})
	return agentVersions, err
}

// connectionsByAgentIDHashKey returns a key for agentID -> (connectionId -> marshaled ConnectedAgentInfo).
func connectionsByAgentIDHashKey(agentKeyPrefix string) redistool.KeyToRedisKey[int64] {
	prefix := agentKeyPrefix + ":conn_by_agent_id:"
	return func(agentID int64) string {
		return redistool.PrefixedInt64Key(prefix, agentID)
	}
}

// connectionsByProjectIDHashKey returns a key for projectID -> (agentID ->marshaled ConnectedAgentInfo).
func connectionsByProjectIDHashKey(agentKeyPrefix string) redistool.KeyToRedisKey[int64] {
	prefix := agentKeyPrefix + ":conn_by_project_id:"
	return func(projectID int64) string {
		return redistool.PrefixedInt64Key(prefix, projectID)
	}
}

// connectedAgentsHashKey returns the key for the hash of connected agents.
func connectedAgentsHashKey(agentKeyPrefix string) redistool.KeyToRedisKey[int64] {
	prefix := agentKeyPrefix + ":connected_agents"
	return func(_ int64) string {
		return prefix
	}
}

func connectionsByAgentVersionHashKey(agentKeyPrefix string) redistool.KeyToRedisKey[string] {
	prefix := agentKeyPrefix + ":conn_by_agent_version:"
	return func(agentVersion string) string {
		return prefix + agentVersion
	}
}

func agentVersionsHashKey(agentKeyPrefix string) redistool.KeyToRedisKey[int64] {
	prefix := agentKeyPrefix + ":agent_versions"
	return func(_ int64) string {
		return prefix
	}
}

type ConnectedAgentInfoCollector []*ConnectedAgentInfo

func (c *ConnectedAgentInfoCollector) Collect(info *ConnectedAgentInfo) (bool, error) {
	*c = append(*c, info)
	return false, nil
}

func int64ToStr(key int64) string {
	return strconv.FormatInt(key, 10)
}

func strToStr(s string) string {
	return s
}