File: bucket.go

package info (click to toggle)
golang-github-tsenart-tb 0.0~git20151208.0.19f4c3d-2
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 92 kB
  • ctags: 51
  • sloc: makefile: 2
file content (130 lines) | stat: -rw-r--r-- 3,290 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package tb

import (
	"math"
	"sync/atomic"
	"time"
)

// Bucket defines a generic lock-free implementation of a Token Bucket.
type Bucket struct {
	inc      int64
	tokens   int64
	capacity int64
	freq     time.Duration
	closing  chan struct{}
}

// NewBucket returns a full Bucket with c capacity and starts a filling
// go-routine which ticks every freq. The number of tokens added on each tick
// is computed dynamically to be even across the duration of a second.
//
// If freq == -1 then the filling go-routine won't be started. Otherwise,
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
func NewBucket(c int64, freq time.Duration) *Bucket {
	b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}

	if freq == -1 {
		return b
	} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
		freq = evenFreq
	}

	b.freq = freq
	b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))

	go b.fill()

	return b
}

// Take attempts to take n tokens out of the bucket.
// If tokens == 0, nothing will be taken.
// If n <= tokens, n tokens will be taken.
// If n > tokens, all tokens will be taken.
//
// This method is thread-safe.
func (b *Bucket) Take(n int64) (taken int64) {
	for {
		if tokens := atomic.LoadInt64(&b.tokens); tokens == 0 {
			return 0
		} else if n <= tokens {
			if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens-n) {
				continue
			}
			return n
		} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, 0) { // Spill
			return tokens
		}
	}
}

// Put attempts to add n tokens to the bucket.
// If tokens == capacity, nothing will be added.
// If n <= capacity - tokens, n tokens will be added.
// If n > capacity - tokens, capacity - tokens will be added.
//
// This method is thread-safe.
func (b *Bucket) Put(n int64) (added int64) {
	for {
		if tokens := atomic.LoadInt64(&b.tokens); tokens == b.capacity {
			return 0
		} else if left := b.capacity - tokens; n <= left {
			if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens+n) {
				continue
			}
			return n
		} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, b.capacity) {
			return left
		}
	}
}

// Wait waits for n amount of tokens to be available.
// If n tokens are immediatelly available it doesn't sleep.
// Otherwise, it sleeps the minimum amount of time required for the remaining
// tokens to be available. It returns the wait duration.
//
// This method is thread-safe.
func (b *Bucket) Wait(n int64) time.Duration {
	var rem int64
	if rem = n - b.Take(n); rem == 0 {
		return 0
	}

	var wait time.Duration
	for rem > 0 {
		sleep := b.wait(rem)
		wait += sleep
		time.Sleep(sleep)
		rem -= b.Take(rem)
	}
	return wait
}

// Close stops the filling go-routine given it was started.
func (b *Bucket) Close() error {
	close(b.closing)
	return nil
}

// wait returns the minimum amount of time required for n tokens to be available.
// if n > capacity, n will be adjusted to capacity
func (b *Bucket) wait(n int64) time.Duration {
	return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
		b.freq.Nanoseconds())
}

func (b *Bucket) fill() {
	ticker := time.NewTicker(b.freq)
	defer ticker.Stop()

	for _ = range ticker.C {
		select {
		case <-b.closing:
			return
		default:
			b.Put(b.inc)
		}
	}
}