File: connection_manager_test.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (126 lines) | stat: -rw-r--r-- 3,436 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package tunclient

import (
	"context"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tunnel/info"
)

var (
	_ ConnectionInterface = (*mockConnection)(nil)
)

func TestConnManager_StartsMinIdleConnectionsOnRun(t *testing.T) {
	cm, conns, mu := setupConnManager(t)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go cm.Run(ctx)
	require.Eventually(t, func() bool {
		cm.mu.Lock()
		defer cm.mu.Unlock()
		mu.Lock()
		defer mu.Unlock()
		return len(*conns) == int(cm.minIdleConnections)
	}, time.Minute, 10*time.Millisecond)
	cancel()
	cm.wg.Wait()
	require.Len(t, *conns, int(cm.minIdleConnections))
}

func TestConnManager_ScalesIdleConnectionsToMaxAndThenToMin(t *testing.T) {
	cm, conns, mu := setupConnManager(t)
	cm.maxIdleTime = 50 * time.Millisecond
	var activated int
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go cm.Run(ctx)
	// Scale to max
	require.Eventually(t, func() bool {
		mu.Lock()
		lenConns := len(*conns)
		assert.LessOrEqual(t, lenConns, int(cm.maxConnections))
		toActivate := make([]*mockConnection, lenConns-activated)
		copy(toActivate, (*conns)[activated:])
		mu.Unlock()
		for _, c := range toActivate { // activate any new connections, must not hold the mutex
			activated++
			c.onActive(c)
		}
		return lenConns == int(cm.maxConnections)
	}, time.Minute, 10*time.Millisecond)
	// Scale to min
	cm.mu.Lock()
	cns := make([]ConnectionInterface, 0, len(cm.connections))
	for c, i := range cm.connections {
		if i.state == active {
			cns = append(cns, c)
		}
	}
	cm.mu.Unlock()
	for _, c := range cns {
		c.(*mockConnection).onIdle(c)
	}
	time.Sleep(cm.maxIdleTime + 10*time.Millisecond)
	for _, c := range cns {
		c.(*mockConnection).onIdle(c)
	}
	require.Eventually(t, func() bool { // poll while goroutines are shutting down
		cm.mu.Lock()
		defer cm.mu.Unlock()
		return cm.idleConnections == cm.minIdleConnections && cm.activeConnections == 0 && len(cm.connections) == int(cm.minIdleConnections)
	}, time.Minute, 10*time.Millisecond)
	cancel()
	cm.wg.Wait()
	require.Len(t, *conns, int(cm.maxConnections))
}

func setupConnManager(t *testing.T) (*ConnectionManager, *[]*mockConnection, *sync.Mutex) {
	t.Parallel()
	var conns []*mockConnection
	var mu sync.Mutex
	cm := &ConnectionManager{
		connections:        make(map[ConnectionInterface]connectionInfo),
		minIdleConnections: 1,
		maxConnections:     100,
		scaleUpStep:        2,
		maxIdleTime:        time.Minute,
		connectionFactory: func(descriptor *info.APIDescriptor, onActive, onIdle func(ConnectionInterface)) ConnectionInterface {
			c := &mockConnection{
				onActive: onActive,
				onIdle:   onIdle,
			}
			mu.Lock()
			defer mu.Unlock()
			conns = append(conns, c)
			return c
		},
	}
	t.Cleanup(func() {
		cm.wg.Wait()
		assert.Zero(t, cm.idleConnections)
		assert.Zero(t, cm.activeConnections)
		assert.Empty(t, cm.connections)
		for _, c := range conns {
			assert.EqualValues(t, 1, c.runCalled)
			assert.EqualValues(t, 1, c.stopped)
		}
	})
	return cm, &conns, &mu
}

type mockConnection struct {
	runCalled, stopped int32
	onActive, onIdle   func(ConnectionInterface)
}

func (m *mockConnection) Run(attemptCtx, pollCtx context.Context) {
	defer atomic.AddInt32(&m.stopped, 1)
	atomic.AddInt32(&m.runCalled, 1)
	<-pollCtx.Done()
}