File: adaptive.go

package info (click to toggle)
golang-github-aws-aws-sdk-go-v2 1.30.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 662,428 kB
  • sloc: java: 16,875; makefile: 432; sh: 175
file content (156 lines) | stat: -rw-r--r-- 5,395 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package retry

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/internal/sdk"
)

const (
	// DefaultRequestCost is the cost of a single request from the adaptive
	// rate limited token bucket.
	DefaultRequestCost uint = 1
)

// DefaultThrottles provides the set of errors considered throttle errors that
// are checked by default.
var DefaultThrottles = []IsErrorThrottle{
	ThrottleErrorCode{
		Codes: DefaultThrottleErrorCodes,
	},
}

// AdaptiveModeOptions provides the functional options for configuring the
// adaptive retry mode, and delay behavior.
type AdaptiveModeOptions struct {
	// If the adaptive token bucket is empty, when an attempt will be made
	// AdaptiveMode will sleep until a token is available. This can occur when
	// attempts fail with throttle errors. Use this option to disable the sleep
	// until token is available, and return error immediately.
	FailOnNoAttemptTokens bool

	// The cost of an attempt from the AdaptiveMode's adaptive token bucket.
	RequestCost uint

	// Set of strategies to determine if the attempt failed due to a throttle
	// error.
	//
	// It is safe to append to this list in NewAdaptiveMode's functional options.
	Throttles []IsErrorThrottle

	// Set of options for standard retry mode that AdaptiveMode is built on top
	// of. AdaptiveMode may apply its own defaults to Standard retry mode that
	// are different than the defaults of NewStandard. Use these options to
	// override the default options.
	StandardOptions []func(*StandardOptions)
}

// AdaptiveMode provides an experimental retry strategy that expands on the
// Standard retry strategy, adding client attempt rate limits. The attempt rate
// limit is initially unrestricted, but becomes restricted when the attempt
// fails with for a throttle error. When restricted AdaptiveMode may need to
// sleep before an attempt is made, if too many throttles have been received.
// AdaptiveMode's sleep can be canceled with context cancel. Set
// AdaptiveModeOptions FailOnNoAttemptTokens to change the behavior from sleep,
// to fail fast.
//
// Eventually unrestricted attempt rate limit will be restored once attempts no
// longer are failing due to throttle errors.
type AdaptiveMode struct {
	options   AdaptiveModeOptions
	throttles IsErrorThrottles

	retryer   aws.RetryerV2
	rateLimit *adaptiveRateLimit
}

// NewAdaptiveMode returns an initialized AdaptiveMode retry strategy.
func NewAdaptiveMode(optFns ...func(*AdaptiveModeOptions)) *AdaptiveMode {
	o := AdaptiveModeOptions{
		RequestCost: DefaultRequestCost,
		Throttles:   append([]IsErrorThrottle{}, DefaultThrottles...),
	}
	for _, fn := range optFns {
		fn(&o)
	}

	return &AdaptiveMode{
		options:   o,
		throttles: IsErrorThrottles(o.Throttles),
		retryer:   NewStandard(o.StandardOptions...),
		rateLimit: newAdaptiveRateLimit(),
	}
}

// IsErrorRetryable returns if the failed attempt is retryable. This check
// should determine if the error can be retried, or if the error is
// terminal.
func (a *AdaptiveMode) IsErrorRetryable(err error) bool {
	return a.retryer.IsErrorRetryable(err)
}

// MaxAttempts returns the maximum number of attempts that can be made for
// an attempt before failing. A value of 0 implies that the attempt should
// be retried until it succeeds if the errors are retryable.
func (a *AdaptiveMode) MaxAttempts() int {
	return a.retryer.MaxAttempts()
}

// RetryDelay returns the delay that should be used before retrying the
// attempt. Will return error if the if the delay could not be determined.
func (a *AdaptiveMode) RetryDelay(attempt int, opErr error) (
	time.Duration, error,
) {
	return a.retryer.RetryDelay(attempt, opErr)
}

// GetRetryToken attempts to deduct the retry cost from the retry token pool.
// Returning the token release function, or error.
func (a *AdaptiveMode) GetRetryToken(ctx context.Context, opErr error) (
	releaseToken func(error) error, err error,
) {
	return a.retryer.GetRetryToken(ctx, opErr)
}

// GetInitialToken returns the initial attempt token that can increment the
// retry token pool if the attempt is successful.
//
// Deprecated: This method does not provide a way to block using Context,
// nor can it return an error. Use RetryerV2, and GetAttemptToken instead. Only
// present to implement Retryer interface.
func (a *AdaptiveMode) GetInitialToken() (releaseToken func(error) error) {
	return nopRelease
}

// GetAttemptToken returns the attempt token that can be used to rate limit
// attempt calls. Will be used by the SDK's retry package's Attempt
// middleware to get an attempt token prior to calling the temp and releasing
// the attempt token after the attempt has been made.
func (a *AdaptiveMode) GetAttemptToken(ctx context.Context) (func(error) error, error) {
	for {
		acquiredToken, waitTryAgain := a.rateLimit.AcquireToken(a.options.RequestCost)
		if acquiredToken {
			break
		}
		if a.options.FailOnNoAttemptTokens {
			return nil, fmt.Errorf(
				"unable to get attempt token, and FailOnNoAttemptTokens enables")
		}

		if err := sdk.SleepWithContext(ctx, waitTryAgain); err != nil {
			return nil, fmt.Errorf("failed to wait for token to be available, %w", err)
		}
	}

	return a.handleResponse, nil
}

func (a *AdaptiveMode) handleResponse(opErr error) error {
	throttled := a.throttles.IsErrorThrottle(opErr).Bool()

	a.rateLimit.Update(throttled)
	return nil
}