File: retry_pipeline.go

package info (click to toggle)
gitlab-agent 16.1.3-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 6,324 kB
  • sloc: makefile: 175; sh: 52; ruby: 3
file content (89 lines) | stat: -rw-r--r-- 2,473 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package manifestops

import (
	"fmt"
	"time"

	"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
	"k8s.io/utils/clock"
)

type processResult byte

const (
	// success means there was no error and output should be consumed.
	success processResult = iota
	// backoff means there was a retriable error, so the caller should try later.
	backoff
	// done means there is no output to be consumed. There may or may not have been an error.
	done
)

type processFunc[IN any, OUT any] func(input IN) (OUT, processResult)

// retryPipeline takes a channel with input, a processor function, and a channel for output.
// It reads values from input, processes them, and, if/when successful, sends the result into the output channel.
// If processing fails, it is retried with backoff.
// Results are sent to the output eventually. The old result, if it hasn't been sent already, is discarded when a new
// one becomes available. I.e. level-based rather than edge-based behavior.
type retryPipeline[IN any, OUT any] struct {
	inputCh      <-chan IN
	outputCh     chan<- OUT
	retryBackoff retry.BackoffManager
	process      processFunc[IN, OUT]
}

func (p *retryPipeline[IN, OUT]) run() {
	var (
		input        IN
		output       OUT
		outputCh     chan<- OUT
		attemptCh    <-chan time.Time
		attemptTimer clock.Timer
	)
	stopAttemptTimer := func() {
		if attemptTimer != nil {
			if !attemptTimer.Stop() {
				<-attemptCh
			}
		}
	}
	defer stopAttemptTimer()
	for {
		var ok bool
		select {
		case input, ok = <-p.inputCh:
			if !ok {
				return // nolint: govet
			}
			stopAttemptTimer()
			readyAttemptCh := make(chan time.Time, 1)
			readyAttemptCh <- time.Time{}
			attemptCh = readyAttemptCh // Enable and trigger the case below
		case <-attemptCh:
			newOutput, res := p.process(input)
			switch res {
			case success:
				output = newOutput
				outputCh = p.outputCh // Enable the 'output' select case
				attemptTimer = nil
				attemptCh = nil
			case backoff:
				attemptTimer = p.retryBackoff.Backoff()
				attemptCh = attemptTimer.C()
			case done:
				// Nothing to do.
				// If 'output' was already set, it remains set still.
				attemptTimer = nil
				attemptCh = nil
			default:
				panic(fmt.Errorf("unknown process result: %v", res))
			}
		case outputCh <- output:
			// Success!
			var empty OUT
			output = empty // Erase contents to help GC
			outputCh = nil // Disable this select case (send to nil channel blocks forever)
		}
	}
}