1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
package volumequeue
import (
"sync"
"time"
)
// baseRetryInterval is the base interval to retry volume operations. each
// subsequent attempt is exponential from this one
const baseRetryInterval = 100 * time.Millisecond
// maxRetryInterval is the maximum amount of time we will wait between retrying
// volume operations.
const maxRetryInterval = 10 * time.Minute
// vqTimerSource is an interface for creating timers for the volumeQueue
type vqTimerSource interface {
// NewTimer takes an attempt number and returns a vqClockTrigger which will
// trigger after a set period based on that attempt number.
NewTimer(attempt uint) vqTimer
}
// vqTimer is an interface representing a timer. However, the timer
// trigger channel, C, is instead wrapped in a Done method, so that in testing,
// the timer can be substituted for a different object.
type vqTimer interface {
Done() <-chan time.Time
Stop() bool
}
// timerSource is an empty struct type which is used to represent the default
// vqTimerSource, which uses time.Timer.
type timerSource struct{}
// NewTimer creates a new timer.
func (timerSource) NewTimer(attempt uint) vqTimer {
var waitFor time.Duration
if attempt == 0 {
waitFor = 0
} else {
// bit-shifting the base retry interval will raise it by 2 to the power
// of attempt. this is an easy way to do an exponent solely with
// integers
waitFor = baseRetryInterval << attempt
if waitFor > maxRetryInterval {
waitFor = maxRetryInterval
}
}
return timer{Timer: time.NewTimer(waitFor)}
}
// timer wraps a time.Timer to provide a Done method.
type timer struct {
*time.Timer
}
// Done returns the timer's C channel, which triggers in response to the timer
// expiring.
func (t timer) Done() <-chan time.Time {
return t.C
}
// VolumeQueue manages the exponential backoff of retrying volumes. it behaves
// somewhat like a priority queue. however, the key difference is that volumes
// which are ready to process or reprocess are read off of an unbuffered
// channel, meaning the order in which ready volumes are processed is at the
// mercy of the golang scheduler. in practice, this does not matter.
type VolumeQueue struct {
sync.Mutex
// next returns the next volumeQueueEntry when it is ready.
next chan *volumeQueueEntry
// outstanding is the set of all pending volumeQueueEntries, mapped by
// volume ID.
outstanding map[string]*volumeQueueEntry
// stopChan stops the volumeQueue and cancels all entries.
stopChan chan struct{}
// timerSource is an object which is used to create the timer for a
// volumeQueueEntry. it exists so that in testing, the timer can be
// substituted for an object that we control.
timerSource vqTimerSource
}
// volumeQueueEntry represents one entry in the volumeQueue
type volumeQueueEntry struct {
// id is the id of the volume this entry represents. we only need the ID,
// because the CSI manager will look up the latest revision of the volume
// before doing any work on it.
id string
// attempt is the current retry attempt of the entry.
attempt uint
// cancel is a function which is called to abort the retry attempt.
cancel func()
}
// NewVolumeQueue returns a new VolumeQueue with the default timerSource.
func NewVolumeQueue() *VolumeQueue {
return &VolumeQueue{
next: make(chan *volumeQueueEntry),
outstanding: make(map[string]*volumeQueueEntry),
stopChan: make(chan struct{}),
timerSource: timerSource{},
}
}
// Enqueue adds an entry to the VolumeQueue with the specified retry attempt.
// if an entry for the specified id already exists, enqueue will remove it and
// create a new entry.
func (vq *VolumeQueue) Enqueue(id string, attempt uint) {
// we must lock the volumeQueue when we add entries, because we will be
// accessing the outstanding map
vq.Lock()
defer vq.Unlock()
if entry, ok := vq.outstanding[id]; ok {
entry.cancel()
delete(vq.outstanding, id)
}
cancelChan := make(chan struct{})
v := &volumeQueueEntry{
id: id,
attempt: attempt,
cancel: func() {
close(cancelChan)
},
}
t := vq.timerSource.NewTimer(attempt)
// this goroutine is the meat of the volumeQueue. when the timer triggers,
// the volume queue entry is written out to the next channel.
//
// the nature of the select statement, and of goroutines and of
// ansynchronous operations means that this is not actually strictly
// ordered. if several entries are ready, then the one that actually gets
// dequeued is at the mercy of the golang scheduler.
//
// however, the flip side of this is that canceling an entry truly cancels
// it. because we're blocking on a write attempt, if we cancel, we don't
// do that write attempt, and there's no need to try to remove from the
// queue a ready-but-now-canceled entry before it is processed.
go func() {
select {
case <-t.Done():
// once the timer fires, we will try to write this entry to the
// next channel. however, because next is unbuffered, if we ended
// up in a situation where no read occurred, we would be
// deadlocked. to avoid this, we select on both a vq.next write and
// on a read from cancelChan, which allows us to abort our write
// attempt.
select {
case vq.next <- v:
case <-cancelChan:
}
case <-cancelChan:
// the documentation for timer recommends draining the channel like
// this.
if !t.Stop() {
<-t.Done()
}
}
}()
vq.outstanding[id] = v
}
// Wait returns the ID and attempt number of the next Volume ready to process.
// If no volume is ready, wait blocks until one is ready. if the volumeQueue
// is stopped, wait returns "", 0
func (vq *VolumeQueue) Wait() (string, uint) {
select {
case v := <-vq.next:
vq.Lock()
defer vq.Unlock()
// we need to be certain that this entry is the same entry that we
// read, because otherwise there may be a race.
//
// it would be possible for the read from next to succeed, but before
// the lock is acquired, a new attempt is enqueued. enqueuing the new
// attempt deletes the old entry before replacing it with the new entry
// and releasing the lock. then, this routine may acquire the lock, and
// delete a new entry.
//
// in practice, it is unclear if this race could happen or would matter
// if it did, but always better safe than sorry.
e, ok := vq.outstanding[v.id]
if ok && e == v {
delete(vq.outstanding, v.id)
}
return v.id, v.attempt
case <-vq.stopChan:
// if the volumeQueue is stopped, then there may be no more writes, so
// we should return an empty result from wait
return "", 0
}
}
// Outstanding returns the number of items outstanding in this queue
func (vq *VolumeQueue) Outstanding() int {
return len(vq.outstanding)
}
// Stop stops the volumeQueue and cancels all outstanding entries. stop may
// only be called once.
func (vq *VolumeQueue) Stop() {
vq.Lock()
defer vq.Unlock()
close(vq.stopChan)
for _, entry := range vq.outstanding {
entry.cancel()
}
return
}
|