File: retry_pipeline_test.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (91 lines) | stat: -rw-r--r-- 2,328 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package manifestops

import (
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/gitops/rpc"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/utils/clock"
)

func TestRetryPipeline_LastInputOnly(t *testing.T) {
	inputCh := make(chan rpc.ObjectsToSynchronizeData)
	outputCh := make(chan applyJob)
	in1 := rpc.ObjectsToSynchronizeData{
		ProjectId: 1,
	}
	in2 := rpc.ObjectsToSynchronizeData{
		ProjectId: 2,
	}
	out2 := applyJob{
		commitId: "2",
	}
	p := retryPipeline[rpc.ObjectsToSynchronizeData, applyJob]{
		inputCh:      inputCh,
		outputCh:     outputCh,
		retryBackoff: backoffMgr(),
		process: func(input rpc.ObjectsToSynchronizeData) (applyJob, processResult) {
			switch input.ProjectId { // we can receive either value because `select` is not deterministic.
			case in1.ProjectId:
				return applyJob{}, backoff // pretend there was an issue
			case in2.ProjectId:
				return out2, success
			default:
				panic(input)
			}
		},
	}
	go p.run()
	inputCh <- in1
	inputCh <- in2
	out := <-outputCh
	close(inputCh) // stops the goroutine
	assert.Equal(t, out2, out)
}

func TestRetryPipeline_LastOutputOnly(t *testing.T) {
	inputCh := make(chan rpc.ObjectsToSynchronizeData)
	outputCh := make(chan applyJob)
	in2wait := make(chan struct{})
	in1 := rpc.ObjectsToSynchronizeData{
		ProjectId: 1,
	}
	in2 := rpc.ObjectsToSynchronizeData{
		ProjectId: 2,
	}
	out1 := applyJob{
		commitId: "1",
	}
	out2 := applyJob{
		commitId: "2",
	}
	p := retryPipeline[rpc.ObjectsToSynchronizeData, applyJob]{
		inputCh:      inputCh,
		outputCh:     outputCh,
		retryBackoff: backoffMgr(),
		process: func(input rpc.ObjectsToSynchronizeData) (applyJob, processResult) {
			switch input.ProjectId { // we can receive either value because `select` is not deterministic.
			case in1.ProjectId:
				return out1, success
			case in2.ProjectId:
				close(in2wait)
				return out2, success
			default:
				panic(input)
			}
		},
	}
	go p.run()
	inputCh <- in1
	inputCh <- in2
	<-in2wait // wait for in2 to have been processed
	out := <-outputCh
	close(inputCh) // stops the goroutine
	assert.Equal(t, out2, out)
}

func backoffMgr() wait.BackoffManager {
	return wait.NewExponentialBackoffManager(time.Minute, time.Minute, time.Minute, 2, 1, clock.RealClock{})
}