1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
package tb
import (
"math"
"sync/atomic"
"time"
)
// Bucket defines a generic lock-free implementation of a Token Bucket.
type Bucket struct {
inc int64
tokens int64
capacity int64
freq time.Duration
closing chan struct{}
}
// NewBucket returns a full Bucket with c capacity and starts a filling
// go-routine which ticks every freq. The number of tokens added on each tick
// is computed dynamically to be even across the duration of a second.
//
// If freq == -1 then the filling go-routine won't be started. Otherwise,
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
func NewBucket(c int64, freq time.Duration) *Bucket {
b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}
if freq == -1 {
return b
} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
freq = evenFreq
}
b.freq = freq
b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))
go b.fill()
return b
}
// Take attempts to take n tokens out of the bucket.
// If tokens == 0, nothing will be taken.
// If n <= tokens, n tokens will be taken.
// If n > tokens, all tokens will be taken.
//
// This method is thread-safe.
func (b *Bucket) Take(n int64) (taken int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == 0 {
return 0
} else if n <= tokens {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens-n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, 0) { // Spill
return tokens
}
}
}
// Put attempts to add n tokens to the bucket.
// If tokens == capacity, nothing will be added.
// If n <= capacity - tokens, n tokens will be added.
// If n > capacity - tokens, capacity - tokens will be added.
//
// This method is thread-safe.
func (b *Bucket) Put(n int64) (added int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == b.capacity {
return 0
} else if left := b.capacity - tokens; n <= left {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens+n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, b.capacity) {
return left
}
}
}
// Wait waits for n amount of tokens to be available.
// If n tokens are immediatelly available it doesn't sleep.
// Otherwise, it sleeps the minimum amount of time required for the remaining
// tokens to be available. It returns the wait duration.
//
// This method is thread-safe.
func (b *Bucket) Wait(n int64) time.Duration {
var rem int64
if rem = n - b.Take(n); rem == 0 {
return 0
}
var wait time.Duration
for rem > 0 {
sleep := b.wait(rem)
wait += sleep
time.Sleep(sleep)
rem -= b.Take(rem)
}
return wait
}
// Close stops the filling go-routine given it was started.
func (b *Bucket) Close() error {
close(b.closing)
return nil
}
// wait returns the minimum amount of time required for n tokens to be available.
// if n > capacity, n will be adjusted to capacity
func (b *Bucket) wait(n int64) time.Duration {
return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
b.freq.Nanoseconds())
}
func (b *Bucket) fill() {
ticker := time.NewTicker(b.freq)
defer ticker.Stop()
for _ = range ticker.C {
select {
case <-b.closing:
return
default:
b.Put(b.inc)
}
}
}
|