File: aggregating_querier_test.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (186 lines) | stat: -rw-r--r-- 6,703 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package tracker_test

import (
	"context"
	"testing"
	"time"

	"github.com/golang/mock/gomock"
	"github.com/stretchr/testify/assert"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/reverse_tunnel/tracker"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/testing/mock_modserver"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/testing/mock_reverse_tunnel_tracker"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/testing/testhelpers"
	"go.uber.org/zap/zaptest"
	"k8s.io/apimachinery/pkg/util/wait"
)

var (
	_ tracker.PollingQuerier = (*tracker.AggregatingQuerier)(nil)
)

func TestPollKasUrlsByAgentId_OnlyStartsSinglePoll(t *testing.T) {
	ctrl := gomock.NewController(t)
	q := mock_reverse_tunnel_tracker.NewMockQuerier(ctrl)
	q.EXPECT().
		KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any())
	api := mock_modserver.NewMockApi(ctrl)
	aq := tracker.NewAggregatingQuerier(zaptest.NewLogger(t), q, api, testhelpers.NewPollConfig(time.Minute), time.Minute)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	go aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		assert.Fail(t, "unexpected call")
	})
	aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		assert.Fail(t, "unexpected call")
	})
}

func TestPollKasUrlsByAgentId_PollingCycle(t *testing.T) {
	ctrl := gomock.NewController(t)
	q := mock_reverse_tunnel_tracker.NewMockQuerier(ctrl)
	q.EXPECT().
		KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
		Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
			done, err := cb("url1")
			assert.NoError(t, err)
			assert.False(t, done)
			done, err = cb("url2")
			assert.NoError(t, err)
			assert.False(t, done)
		})
	api := mock_modserver.NewMockApi(ctrl)
	aq := tracker.NewAggregatingQuerier(zaptest.NewLogger(t), q, api, testhelpers.NewPollConfig(time.Minute), time.Minute)
	call := 0
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		switch call {
		case 0:
			assert.Equal(t, []string{"url1", "url2"}, kasUrls)
		default:
			assert.FailNow(t, "unexpected invocation")
		}
		call++
	})
}

func TestPollKasUrlsByAgentId_CacheAfterStopped(t *testing.T) {
	ctrl := gomock.NewController(t)
	q := mock_reverse_tunnel_tracker.NewMockQuerier(ctrl)
	gomock.InOrder(
		q.EXPECT().
			KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
			Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
				_, err := cb("url1")
				assert.NoError(t, err)
			}),
		q.EXPECT().
			KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
			Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
				_, err := cb("url2")
				assert.NoError(t, err)
			}),
	)
	api := mock_modserver.NewMockApi(ctrl)
	aq := tracker.NewAggregatingQuerier(zaptest.NewLogger(t), q, api, testhelpers.NewPollConfig(time.Minute), time.Minute)
	ctx, cancel := context.WithCancel(context.Background())
	aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		assert.Equal(t, []string{"url1"}, kasUrls)
		cancel()
	})
	kasUrls := aq.CachedKasUrlsByAgentId(testhelpers.AgentId) // from cache
	assert.Equal(t, []string{"url1"}, kasUrls)
	ctx, cancel = context.WithCancel(context.Background())
	aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		assert.Equal(t, []string{"url2"}, kasUrls) // from redis
		cancel()
	})
}

func TestPollKasUrlsByAgentId_CacheWhenRunning(t *testing.T) {
	ctrl := gomock.NewController(t)
	q := mock_reverse_tunnel_tracker.NewMockQuerier(ctrl)
	start1 := make(chan struct{})
	start2 := make(chan struct{})
	gomock.InOrder(
		q.EXPECT().
			KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
			Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
				_, err := cb("url1")
				assert.NoError(t, err)
			}),
		q.EXPECT().
			KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
			Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
				close(start1)                      // start concurrent query
				<-start2                           // wait for the concurrent query to consume item from cache
				time.Sleep(100 * time.Millisecond) // wait for aq.PollKasUrlsByAgentId() to register second callback
				_, err := cb("url2")
				assert.NoError(t, err)
			}),
	)
	api := mock_modserver.NewMockApi(ctrl)
	aq := tracker.NewAggregatingQuerier(zaptest.NewLogger(t), q, api, testhelpers.NewPollConfig(time.Second), time.Minute)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	count1 := 0
	go aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		switch count1 {
		case 0:
			assert.Equal(t, []string{"url1"}, kasUrls) // first call
		case 1:
			assert.Equal(t, []string{"url2"}, kasUrls) // second call
		default:
			assert.FailNow(t, "unexpected invocation")
		}
		count1++
	})
	<-start1
	kasUrls := aq.CachedKasUrlsByAgentId(testhelpers.AgentId)
	assert.Equal(t, []string{"url1"}, kasUrls) // from cache
	close(start2)
	count2 := 0
	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()
	aq.PollKasUrlsByAgentId(ctx2, testhelpers.AgentId, func(kasUrls []string) {
		switch count2 {
		case 0:
			assert.Equal(t, []string{"url2"}, kasUrls) // from redis
			cancel2()
		default:
			assert.FailNow(t, "unexpected invocation")
		}
		count2++
	})
	assert.EqualValues(t, 1, count2)
}

func TestPollKasUrlsByAgentId_GcRemovesExpiredCache(t *testing.T) {
	ctrl := gomock.NewController(t)
	q := mock_reverse_tunnel_tracker.NewMockQuerier(ctrl)
	q.EXPECT().
		KasUrlsByAgentId(gomock.Any(), testhelpers.AgentId, gomock.Any()).
		Do(func(ctx context.Context, agentId int64, cb tracker.KasUrlsByAgentIdCallback) {
			_, err := cb("url1")
			assert.NoError(t, err)
		})
	api := mock_modserver.NewMockApi(ctrl)
	gcPeriod := time.Second
	aq := tracker.NewAggregatingQuerier(zaptest.NewLogger(t), q, api, testhelpers.NewPollConfig(time.Minute), gcPeriod)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	aq.PollKasUrlsByAgentId(ctx, testhelpers.AgentId, func(kasUrls []string) {
		cancel()
	})
	ctx, cancel = context.WithCancel(context.Background())
	var wg wait.Group
	defer wg.Wait()
	defer cancel()
	wg.Start(func() {
		_ = aq.Run(ctx)
	})
	time.Sleep(gcPeriod * 2)
	kasUrls := aq.CachedKasUrlsByAgentId(testhelpers.AgentId)
	assert.Empty(t, kasUrls)
}