1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
package ratelimit
import (
"fmt"
"time"
"github.com/vulcand/oxy/v2/internal/holsterv4/clock"
)
// UndefinedDelay default delay.
const UndefinedDelay = -1
// rate defines token bucket parameters.
type rate struct {
period time.Duration
average int64
burst int64
}
func (r *rate) String() string {
return fmt.Sprintf("rate(%v/%v, burst=%v)", r.average, r.period, r.burst)
}
// tokenBucket Implements token bucket algorithm (http://en.wikipedia.org/wiki/Token_bucket)
type tokenBucket struct {
// The time period controlled by the bucket in nanoseconds.
period time.Duration
// The number of nanoseconds that takes to add one more token to the total
// number of available tokens. It effectively caches the value that could
// have been otherwise deduced from refillRate.
timePerToken time.Duration
// The maximum number of tokens that can be accumulate in the bucket.
burst int64
// The number of tokens available for consumption at the moment. It can
// nether be larger then capacity.
availableTokens int64
// Tells when tokensAvailable was updated the last time.
lastRefresh clock.Time
// The number of tokens consumed the last time.
lastConsumed int64
}
// newTokenBucket crates a `tokenBucket` instance for the specified `Rate`.
func newTokenBucket(rate *rate) *tokenBucket {
period := rate.period
if period == 0 {
period = clock.Nanosecond
}
return &tokenBucket{
period: period,
timePerToken: time.Duration(int64(period) / rate.average),
burst: rate.burst,
lastRefresh: clock.Now().UTC(),
availableTokens: rate.burst,
}
}
// consume makes an attempt to consume the specified number of tokens from the
// bucket. If there are enough tokens available then `0, nil` is returned; if
// tokens to consume is larger than the burst size, then an error is returned
// and the delay is not defined; otherwise returned a none zero delay that tells
// how much time the caller needs to wait until the desired number of tokens
// will become available for consumption.
func (tb *tokenBucket) consume(tokens int64) (time.Duration, error) {
tb.updateAvailableTokens()
tb.lastConsumed = 0
if tokens > tb.burst {
return UndefinedDelay, fmt.Errorf("requested tokens larger than max tokens")
}
if tb.availableTokens < tokens {
return tb.timeTillAvailable(tokens), nil
}
tb.availableTokens -= tokens
tb.lastConsumed = tokens
return 0, nil
}
// rollback reverts effect of the most recent consumption. If the most recent
// `consume` resulted in an error or a burst overflow, and therefore did not
// modify the number of available tokens, then `rollback` won't do that either.
// It is safe to call this method multiple times, for the second and all
// following calls have no effect.
func (tb *tokenBucket) rollback() {
tb.availableTokens += tb.lastConsumed
tb.lastConsumed = 0
}
// update modifies `average` and `burst` fields of the token bucket according
// to the provided `Rate`.
func (tb *tokenBucket) update(rate *rate) error {
if rate.period != tb.period {
return fmt.Errorf("period mismatch: %v != %v", tb.period, rate.period)
}
tb.timePerToken = time.Duration(int64(tb.period) / rate.average)
tb.burst = rate.burst
if tb.availableTokens > rate.burst {
tb.availableTokens = rate.burst
}
return nil
}
// timeTillAvailable returns the number of nanoseconds that we need to
// wait until the specified number of tokens becomes available for consumption.
func (tb *tokenBucket) timeTillAvailable(tokens int64) time.Duration {
missingTokens := tokens - tb.availableTokens
return time.Duration(missingTokens) * tb.timePerToken
}
// updateAvailableTokens updates the number of tokens available for consumption.
// It is calculated based on the refill rate, the time passed since last refresh,
// and is limited by the bucket capacity.
func (tb *tokenBucket) updateAvailableTokens() {
now := clock.Now().UTC()
timePassed := now.Sub(tb.lastRefresh)
if tb.timePerToken == 0 {
return
}
tokens := tb.availableTokens + int64(timePassed/tb.timePerToken)
// If we haven't added any tokens that means that not enough time has passed,
// in this case do not adjust last refill checkpoint, otherwise it will be
// always moving in time in case of frequent requests that exceed the rate
if tokens != tb.availableTokens {
tb.lastRefresh = now
tb.availableTokens = tokens
}
if tb.availableTokens > tb.burst {
tb.availableTokens = tb.burst
}
}
|