1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
package sturdyc
import (
"time"
)
// buffer represents a buffer for a batch refresh.
type buffer struct {
channel chan []string
ids []string
}
// createBuffer should be called WITH a lock when a refresh buffer is created.
func (c *Client[T]) createBuffer(permutation string, ids []string) {
bufferIDs := make([]string, 0, c.bufferSize)
bufferIDs = append(bufferIDs, ids...)
buf := &buffer{
channel: make(chan []string),
ids: bufferIDs,
}
c.permutationBufferMap[permutation] = buf
}
// deleteBuffer should be called WITH a lock when a buffer has been processed.
func (c *Client[T]) deleteBuffer(permutation string) {
delete(c.permutationBufferMap, permutation)
}
// bufferBatchRefresh will buffer the batch of IDs until the batch size is reached or the buffer duration is exceeded.
func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) {
if len(ids) == 0 {
return
}
// If we got a perfect batch size, we can refresh the records immediately.
if len(ids) == c.bufferSize {
c.refreshBatch(ids, keyFn, fetchFn)
return
}
c.batchMutex.Lock()
// If the ids are greater than our batch size we'll have to chunk them.
if len(ids) > c.bufferSize {
idsToRefresh := ids[:c.bufferSize]
overflowingIDs := ids[c.bufferSize:]
c.batchMutex.Unlock()
// These IDs are the size we want, so we'll refresh them immediately.
c.safeGo(func() {
c.refreshBatch(idsToRefresh, keyFn, fetchFn)
})
// We'll continue to process the remaining IDs recursively.
c.safeGo(func() {
bufferBatchRefresh(c, overflowingIDs, keyFn, fetchFn)
})
return
}
// Extract the permutation string from the ids.
permutationString := extractPermutation(keyFn(ids[0]))
// Check if we already have a batch for this set of options.
if buf, ok := c.permutationBufferMap[permutationString]; ok {
// There is a small chance that another goroutine manages to write to the channel
// and fill the buffer as we unlock this mutex. Therefore, we'll add a timer so
// that we can process these ids again if that were to happen.
c.batchMutex.Unlock()
timer, stop := c.clock.NewTimer(time.Millisecond * 10)
select {
case buf.channel <- ids:
stop()
case <-timer:
c.safeGo(func() {
bufferBatchRefresh(c, ids, keyFn, fetchFn)
})
}
return
}
// There is no existing batch buffering for this permutation
// of options. Hence, we'll create a new one.
c.createBuffer(permutationString, ids)
c.batchMutex.Unlock()
c.safeGo(func() {
timer, stop := c.clock.NewTimer(c.bufferTimeout)
c.batchMutex.Lock()
idStream := c.permutationBufferMap[permutationString].channel
c.batchMutex.Unlock()
for {
select {
// If the buffer times out, we'll refresh the records regardless of the buffer size.
case _, ok := <-timer:
if !ok {
return
}
// We reached the deadline for this batch.
c.batchMutex.Lock()
buffer := c.permutationBufferMap[permutationString]
c.deleteBuffer(permutationString)
c.batchMutex.Unlock()
c.safeGo(func() {
c.refreshBatch(buffer.ids, keyFn, fetchFn)
})
return
case additionalIDs, ok := <-idStream:
if !ok {
return
}
// Lock the mutex, and add the additional IDs to the buffer.
c.batchMutex.Lock()
buffer := c.permutationBufferMap[permutationString]
buffer.ids = append(buffer.ids, additionalIDs...)
// If we haven't reached the batch size yet, we'll wait for more ids.
if len(buffer.ids) < c.bufferSize {
c.batchMutex.Unlock()
continue
}
// At this point, we have either reached or exceeded the batch size. We'll stop the timer and drain the channel.
if !stop() {
<-timer
}
// Grab a reference to the IDs, and then delete the buffer.
permIDs := buffer.ids
c.deleteBuffer(permutationString)
c.batchMutex.Unlock()
idsToRefresh := permIDs[:c.bufferSize]
overflowingIDs := permIDs[c.bufferSize:]
// Refresh the first batch of IDs immediately.
c.safeGo(func() {
c.refreshBatch(idsToRefresh, keyFn, fetchFn)
})
// If we exceeded the batch size, we'll continue to process the remaining IDs recursively.
if len(overflowingIDs) > 0 {
c.safeGo(func() {
bufferBatchRefresh(c, overflowingIDs, keyFn, fetchFn)
})
}
return
}
}
})
}
|