1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
package nebula
import (
"sync"
"time"
)
// How many timer objects should be cached
const timerCacheMax = 50000
type TimerWheel[T any] struct {
// Current tick
current int
// Cheat on finding the length of the wheel
wheelLen int
// Last time we ticked, since we are lazy ticking
lastTick *time.Time
// Durations of a tick and the entire wheel
tickDuration time.Duration
wheelDuration time.Duration
// The actual wheel which is just a set of singly linked lists, head/tail pointers
wheel []*TimeoutList[T]
// Singly linked list of items that have timed out of the wheel
expired *TimeoutList[T]
// Item cache to avoid garbage collect
itemCache *TimeoutItem[T]
itemsCached int
}
type LockingTimerWheel[T any] struct {
m sync.Mutex
t *TimerWheel[T]
}
// TimeoutList Represents a tick in the wheel
type TimeoutList[T any] struct {
Head *TimeoutItem[T]
Tail *TimeoutItem[T]
}
// TimeoutItem Represents an item within a tick
type TimeoutItem[T any] struct {
Item T
Next *TimeoutItem[T]
}
// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
// Purge must be called once per entry to actually remove anything
// The TimerWheel does not handle concurrency on its own.
// Locks around access to it must be used if multiple routines are manipulating it.
func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
//TODO provide an error
//if min >= max {
// return nil
//}
// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
// timeout
wLen := int((max / min) + 2)
tw := TimerWheel[T]{
wheelLen: wLen,
wheel: make([]*TimeoutList[T], wLen),
tickDuration: min,
wheelDuration: max,
expired: &TimeoutList[T]{},
}
for i := range tw.wheel {
tw.wheel[i] = &TimeoutList[T]{}
}
return &tw
}
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
return &LockingTimerWheel[T]{
t: NewTimerWheel[T](min, max),
}
}
// Add will add an item to the wheel in its proper timeout.
// Caller should Advance the wheel prior to ensure the proper slot is used.
func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
i := tw.findWheel(timeout)
// Try to fetch off the cache
ti := tw.itemCache
if ti != nil {
tw.itemCache = ti.Next
tw.itemsCached--
ti.Next = nil
} else {
ti = &TimeoutItem[T]{}
}
// Relink and return
ti.Item = v
if tw.wheel[i].Tail == nil {
tw.wheel[i].Head = ti
tw.wheel[i].Tail = ti
} else {
tw.wheel[i].Tail.Next = ti
tw.wheel[i].Tail = ti
}
return ti
}
// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
// If no item is available then an empty T is returned and the 2nd argument is false.
func (tw *TimerWheel[T]) Purge() (T, bool) {
if tw.expired.Head == nil {
var na T
return na, false
}
ti := tw.expired.Head
tw.expired.Head = ti.Next
if tw.expired.Head == nil {
tw.expired.Tail = nil
}
// Clear out the items references
ti.Next = nil
// Maybe cache it for later
if tw.itemsCached < timerCacheMax {
ti.Next = tw.itemCache
tw.itemCache = ti
tw.itemsCached++
}
return ti.Item, true
}
// findWheel find the next position in the wheel for the provided timeout given the current tick
func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
if timeout < tw.tickDuration {
// Can't track anything below the set resolution
timeout = tw.tickDuration
} else if timeout > tw.wheelDuration {
// We aren't handling timeouts greater than the wheels duration
timeout = tw.wheelDuration
}
// Find the next highest, rounding up
tick := int(((timeout - 1) / tw.tickDuration) + 1)
// Add another tick since the current tick may almost be over then map it to the wheel from our
// current position
tick += tw.current + 1
if tick >= tw.wheelLen {
tick -= tw.wheelLen
}
return tick
}
// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
func (tw *TimerWheel[T]) Advance(now time.Time) {
if tw.lastTick == nil {
tw.lastTick = &now
}
// We want to round down
ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
adv := ticks
if ticks > tw.wheelLen {
ticks = tw.wheelLen
}
for i := 0; i < ticks; i++ {
tw.current++
if tw.current >= tw.wheelLen {
tw.current = 0
}
if tw.wheel[tw.current].Head != nil {
// We need to append the expired items as to not starve evicting the oldest ones
if tw.expired.Tail == nil {
tw.expired.Head = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
} else {
tw.expired.Tail.Next = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
}
tw.wheel[tw.current].Head = nil
tw.wheel[tw.current].Tail = nil
}
}
// Advance the tick based on duration to avoid losing some accuracy
newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
tw.lastTick = &newTick
}
func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Add(v, timeout)
}
func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Purge()
}
func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
lw.m.Lock()
defer lw.m.Unlock()
lw.t.Advance(now)
}
|