1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
package manifestops
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/gitops/rpc"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
)
func TestRetryPipeline_LastInputOnly(t *testing.T) {
inputCh := make(chan rpc.ObjectsToSynchronizeData)
outputCh := make(chan applyJob)
in1 := rpc.ObjectsToSynchronizeData{
ProjectId: 1,
}
in2 := rpc.ObjectsToSynchronizeData{
ProjectId: 2,
}
out2 := applyJob{
commitId: "2",
}
p := retryPipeline[rpc.ObjectsToSynchronizeData, applyJob]{
inputCh: inputCh,
outputCh: outputCh,
retryBackoff: backoffMgr(),
process: func(input rpc.ObjectsToSynchronizeData) (applyJob, processResult) {
switch input.ProjectId { // we can receive either value because `select` is not deterministic.
case in1.ProjectId:
return applyJob{}, backoff // pretend there was an issue
case in2.ProjectId:
return out2, success
default:
panic(input)
}
},
}
go p.run()
inputCh <- in1
inputCh <- in2
out := <-outputCh
close(inputCh) // stops the goroutine
assert.Equal(t, out2, out)
}
func TestRetryPipeline_LastOutputOnly(t *testing.T) {
inputCh := make(chan rpc.ObjectsToSynchronizeData)
outputCh := make(chan applyJob)
in2wait := make(chan struct{})
in1 := rpc.ObjectsToSynchronizeData{
ProjectId: 1,
}
in2 := rpc.ObjectsToSynchronizeData{
ProjectId: 2,
}
out1 := applyJob{
commitId: "1",
}
out2 := applyJob{
commitId: "2",
}
p := retryPipeline[rpc.ObjectsToSynchronizeData, applyJob]{
inputCh: inputCh,
outputCh: outputCh,
retryBackoff: backoffMgr(),
process: func(input rpc.ObjectsToSynchronizeData) (applyJob, processResult) {
switch input.ProjectId { // we can receive either value because `select` is not deterministic.
case in1.ProjectId:
return out1, success
case in2.ProjectId:
close(in2wait)
return out2, success
default:
panic(input)
}
},
}
go p.run()
inputCh <- in1
inputCh <- in2
<-in2wait // wait for in2 to have been processed
out := <-outputCh
close(inputCh) // stops the goroutine
assert.Equal(t, out2, out)
}
func backoffMgr() wait.BackoffManager {
return wait.NewExponentialBackoffManager(time.Minute, time.Minute, time.Minute, 2, 1, clock.RealClock{})
}
|