File: bucket.go

package info (click to toggle)
golang-github-vulcand-oxy 2.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 728 kB
  • sloc: makefile: 14
file content (132 lines) | stat: -rw-r--r-- 4,448 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package ratelimit

import (
	"fmt"
	"time"

	"github.com/vulcand/oxy/v2/internal/holsterv4/clock"
)

// UndefinedDelay  default delay.
const UndefinedDelay = -1

// rate defines token bucket parameters.
type rate struct {
	period  time.Duration
	average int64
	burst   int64
}

func (r *rate) String() string {
	return fmt.Sprintf("rate(%v/%v, burst=%v)", r.average, r.period, r.burst)
}

// tokenBucket Implements token bucket algorithm (http://en.wikipedia.org/wiki/Token_bucket)
type tokenBucket struct {
	// The time period controlled by the bucket in nanoseconds.
	period time.Duration
	// The number of nanoseconds that takes to add one more token to the total
	// number of available tokens. It effectively caches the value that could
	// have been otherwise deduced from refillRate.
	timePerToken time.Duration
	// The maximum number of tokens that can be accumulate in the bucket.
	burst int64
	// The number of tokens available for consumption at the moment. It can
	// nether be larger then capacity.
	availableTokens int64
	// Tells when tokensAvailable was updated the last time.
	lastRefresh clock.Time
	// The number of tokens consumed the last time.
	lastConsumed int64
}

// newTokenBucket crates a `tokenBucket` instance for the specified `Rate`.
func newTokenBucket(rate *rate) *tokenBucket {
	period := rate.period
	if period == 0 {
		period = clock.Nanosecond
	}

	return &tokenBucket{
		period:          period,
		timePerToken:    time.Duration(int64(period) / rate.average),
		burst:           rate.burst,
		lastRefresh:     clock.Now().UTC(),
		availableTokens: rate.burst,
	}
}

// consume makes an attempt to consume the specified number of tokens from the
// bucket. If there are enough tokens available then `0, nil` is returned; if
// tokens to consume is larger than the burst size, then an error is returned
// and the delay is not defined; otherwise returned a none zero delay that tells
// how much time the caller needs to wait until the desired number of tokens
// will become available for consumption.
func (tb *tokenBucket) consume(tokens int64) (time.Duration, error) {
	tb.updateAvailableTokens()
	tb.lastConsumed = 0
	if tokens > tb.burst {
		return UndefinedDelay, fmt.Errorf("requested tokens larger than max tokens")
	}
	if tb.availableTokens < tokens {
		return tb.timeTillAvailable(tokens), nil
	}
	tb.availableTokens -= tokens
	tb.lastConsumed = tokens
	return 0, nil
}

// rollback reverts effect of the most recent consumption. If the most recent
// `consume` resulted in an error or a burst overflow, and therefore did not
// modify the number of available tokens, then `rollback` won't do that either.
// It is safe to call this method multiple times, for the second and all
// following calls have no effect.
func (tb *tokenBucket) rollback() {
	tb.availableTokens += tb.lastConsumed
	tb.lastConsumed = 0
}

// update modifies `average` and `burst` fields of the token bucket according
// to the provided `Rate`.
func (tb *tokenBucket) update(rate *rate) error {
	if rate.period != tb.period {
		return fmt.Errorf("period mismatch: %v != %v", tb.period, rate.period)
	}
	tb.timePerToken = time.Duration(int64(tb.period) / rate.average)
	tb.burst = rate.burst
	if tb.availableTokens > rate.burst {
		tb.availableTokens = rate.burst
	}
	return nil
}

// timeTillAvailable returns the number of nanoseconds that we need to
// wait until the specified number of tokens becomes available for consumption.
func (tb *tokenBucket) timeTillAvailable(tokens int64) time.Duration {
	missingTokens := tokens - tb.availableTokens
	return time.Duration(missingTokens) * tb.timePerToken
}

// updateAvailableTokens updates the number of tokens available for consumption.
// It is calculated based on the refill rate, the time passed since last refresh,
// and is limited by the bucket capacity.
func (tb *tokenBucket) updateAvailableTokens() {
	now := clock.Now().UTC()
	timePassed := now.Sub(tb.lastRefresh)

	if tb.timePerToken == 0 {
		return
	}

	tokens := tb.availableTokens + int64(timePassed/tb.timePerToken)
	// If we haven't added any tokens that means that not enough time has passed,
	// in this case do not adjust last refill checkpoint, otherwise it will be
	// always moving in time in case of frequent requests that exceed the rate
	if tokens != tb.availableTokens {
		tb.lastRefresh = now
		tb.availableTokens = tokens
	}
	if tb.availableTokens > tb.burst {
		tb.availableTokens = tb.burst
	}
}