1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
package retry
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
)
type AttemptResult int
const (
// Continue means there was no error and polling can continue normally.
Continue AttemptResult = iota
// ContinueImmediately means there was no error and polling can continue normally.
ContinueImmediately
// Backoff means there was a retriable error, so the caller should try later.
Backoff
// Done means the polling should stop. There may or may not have been an error.
Done
)
var (
ErrWaitTimeout = wait.ErrWaitTimeout
)
type BackoffManager = wait.BackoffManager
type BackoffManagerFactory func() BackoffManager
// PollWithBackoffFunc is a function that is called to perform polling.
// Signature is unusual because AttemptResult must be checked, not the error.
type PollWithBackoffFunc func() (error, AttemptResult)
// PollWithBackoffCtxFunc is a function that is called to perform polling.
// Signature is unusual because AttemptResult must be checked, not the error.
type PollWithBackoffCtxFunc func(ctx context.Context) (error, AttemptResult)
type PollConfig struct {
Backoff BackoffManager
Interval time.Duration
Sliding bool
pokeC chan struct{}
}
func (c *PollConfig) Poke() {
select {
case c.pokeC <- struct{}{}:
default:
}
}
type PollConfigFactory func() PollConfig
// PollWithBackoff runs f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
// It returns when:
// - context signals done. ErrWaitTimeout is returned in this case.
// - f returns Done
func PollWithBackoff(ctx context.Context, cfg PollConfig, f PollWithBackoffCtxFunc) error {
var t clock.Timer
defer func() {
if t != nil && !t.Stop() {
<-t.C()
}
}()
done := ctx.Done()
for {
if !cfg.Sliding {
t = cfg.Backoff.Backoff()
}
attempt:
for {
select {
case <-done:
return ErrWaitTimeout
default:
}
err, result := f(ctx)
switch result {
case Continue: // sleep and continue
timer := time.NewTimer(cfg.Interval)
select {
case <-done:
timer.Stop()
return ErrWaitTimeout
case <-cfg.pokeC:
timer.Stop()
case <-timer.C:
}
case ContinueImmediately: // immediately call f again
continue
case Backoff: // do an outer loop to backoff
break attempt
case Done: // f is done. A success or a terminal failure.
return err
default:
panic(fmt.Errorf("unexpected poll attempt result: %v", result))
}
}
if cfg.Sliding {
t = cfg.Backoff.Backoff()
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-done:
return ErrWaitTimeout
case <-cfg.pokeC:
if !t.Stop() {
<-t.C()
}
t = nil
case <-t.C():
t = nil
}
}
}
func NewExponentialBackoffFactory(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64) BackoffManagerFactory {
return func() BackoffManager {
return wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock.RealClock{})
}
}
func NewPollConfigFactory(interval time.Duration, backoff BackoffManagerFactory) PollConfigFactory {
return func() PollConfig {
return PollConfig{
Backoff: backoff(),
Interval: interval,
Sliding: true,
// Size 1 to ensure we preserve the fact that there was a poke.
// We don't care how many notifications there have been, just care that there was at least one.
pokeC: make(chan struct{}, 1),
}
}
}
|