1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
package manifestops
import (
"fmt"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"k8s.io/utils/clock"
)
type processResult byte
const (
// success means there was no error and output should be consumed.
success processResult = iota
// backoff means there was a retriable error, so the caller should try later.
backoff
// done means there is no output to be consumed. There may or may not have been an error.
done
)
type processFunc[IN any, OUT any] func(input IN) (OUT, processResult)
// retryPipeline takes a channel with input, a processor function, and a channel for output.
// It reads values from input, processes them, and, if/when successful, sends the result into the output channel.
// If processing fails, it is retried with backoff.
// Results are sent to the output eventually. The old result, if it hasn't been sent already, is discarded when a new
// one becomes available. I.e. level-based rather than edge-based behavior.
type retryPipeline[IN any, OUT any] struct {
inputCh <-chan IN
outputCh chan<- OUT
retryBackoff retry.BackoffManager
process processFunc[IN, OUT]
}
func (p *retryPipeline[IN, OUT]) run() {
var (
input IN
output OUT
outputCh chan<- OUT
attemptCh <-chan time.Time
attemptTimer clock.Timer
)
stopAttemptTimer := func() {
if attemptTimer != nil {
if !attemptTimer.Stop() {
<-attemptCh
}
}
}
defer stopAttemptTimer()
for {
var ok bool
select {
case input, ok = <-p.inputCh:
if !ok {
return // nolint: govet
}
stopAttemptTimer()
readyAttemptCh := make(chan time.Time, 1)
readyAttemptCh <- time.Time{}
attemptCh = readyAttemptCh // Enable and trigger the case below
case <-attemptCh:
newOutput, res := p.process(input)
switch res {
case success:
output = newOutput
outputCh = p.outputCh // Enable the 'output' select case
attemptTimer = nil
attemptCh = nil
case backoff:
attemptTimer = p.retryBackoff.Backoff()
attemptCh = attemptTimer.C()
case done:
// Nothing to do.
// If 'output' was already set, it remains set still.
attemptTimer = nil
attemptCh = nil
default:
panic(fmt.Errorf("unknown process result: %v", res))
}
case outputCh <- output:
// Success!
var empty OUT
output = empty // Erase contents to help GC
outputCh = nil // Disable this select case (send to nil channel blocks forever)
}
}
}
|