1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
package buffer
import (
"errors"
"io"
"time"
)
var (
// ErrTimeout indicates an operation has timed out.
ErrTimeout = errors.New("operation timed-out")
// ErrClosed indicates the buffer is closed and can no longer be used.
ErrClosed = errors.New("buffer is closed")
)
type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer struct {
io.Closer
dataCh chan interface{}
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
}
)
// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.dataCh <- item:
return nil
case <-time.After(buffer.options.PushTimeout):
return ErrTimeout
}
}
// Flush outputs the buffer to a permanent destination.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.flushCh <- struct{}{}:
return nil
case <-time.After(buffer.options.FlushTimeout):
return ErrTimeout
}
}
// Close flushes the buffer and prevents it from being further used.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has already been closed.
//
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
if buffer.closed() {
return ErrClosed
}
select {
case buffer.closeCh <- struct{}{}:
// noop
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
select {
case <-buffer.doneCh:
close(buffer.dataCh)
close(buffer.flushCh)
close(buffer.closeCh)
return nil
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}
}
func (buffer Buffer) closed() bool {
select {
case <-buffer.doneCh:
return true
default:
return false
}
}
func (buffer *Buffer) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)
isOpen := true
for isOpen {
select {
case item := <-buffer.dataCh:
items[count] = item
count++
mustFlush = count >= len(items)
case <-ticker:
mustFlush = count > 0
case <-buffer.flushCh:
mustFlush = count > 0
case <-buffer.closeCh:
isOpen = false
mustFlush = count > 0
}
if mustFlush {
stopTicker()
buffer.options.Flusher.Write(items[:count])
count = 0
items = make([]interface{}, buffer.options.Size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
}
stopTicker()
close(buffer.doneCh)
}
func newTicker(interval time.Duration) (<-chan time.Time, func()) {
if interval == 0 {
return nil, func() {}
}
ticker := time.NewTicker(interval)
return ticker.C, ticker.Stop
}
// New creates a new buffer instance with the provided options.
func New(opts ...Option) *Buffer {
buffer := &Buffer{
dataCh: make(chan interface{}),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
}
go buffer.consume()
return buffer
}
|