1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
package buffer
import (
"errors"
"io"
"io/ioutil"
"sync"
"time"
)
var (
AlreadyClosed = errors.New("Buffer already closed")
)
// A specialized concurrent circular buffer intended to buffer a stream's inbound data with the following properties:
// - Minimizes copies by skipping the buffer if a write occurs while a reader is waiting
// - Provides a mechnaism to time out reads after a deadline
// - Provides a mechanism to set an 'error' that will fail reads when the buffer is empty
type waitingReader struct {
buf []byte
n int
}
type Inbound struct {
*Circular
*sync.Cond
err error
waitingReader
deadline time.Time
timer *time.Timer
}
func NewInbound(size int) *Inbound {
return &Inbound{
Circular: NewCircular(size),
Cond: sync.NewCond(new(sync.Mutex)),
}
}
func (b *Inbound) SetDeadline(t time.Time) {
b.L.Lock()
// set the deadline
b.deadline = t
// how long until the deadline
delay := t.Sub(time.Now())
if b.timer != nil {
b.timer.Stop()
}
// after the delay, wake up waiters
b.timer = time.AfterFunc(delay, func() {
b.Broadcast()
})
b.L.Unlock()
}
func (b *Inbound) SetError(err error) {
b.L.Lock()
b.err = err
b.Broadcast()
b.L.Unlock()
}
func (b *Inbound) GetError() (err error) {
b.L.Lock()
err = b.err
b.L.Unlock()
return
}
func (b *Inbound) ReadFrom(rd io.Reader) (n int, err error) {
b.L.Lock()
if b.err != nil {
b.L.Unlock()
if _, err = ioutil.ReadAll(rd); err != nil {
return
}
return 0, AlreadyClosed
}
// write directly to a reader's buffer, if possible
if b.waitingReader.buf != nil {
b.waitingReader.n, err = readInto(rd, b.waitingReader.buf)
n += b.waitingReader.n
b.waitingReader.buf = nil
if err != nil {
if err == io.EOF {
// EOF is not an error
err = nil
}
b.Broadcast()
b.L.Unlock()
return
}
}
// write the rest to buffer
var writeN int
writeN, err = b.Circular.ReadFrom(rd)
n += writeN
b.Broadcast()
b.L.Unlock()
return
}
func (b *Inbound) Read(p []byte) (n int, err error) {
b.L.Lock()
var wait *waitingReader
for {
// we got a direct write to our buffer
if wait != nil && wait.n != 0 {
n = wait.n
break
}
// check for timeout
if !b.deadline.IsZero() {
if time.Now().After(b.deadline) {
err = errors.New("Read timeout")
break
}
}
// try to read from the buffer
n, _ = b.Circular.Read(p)
// successfully read some data
if n != 0 {
break
}
// there's an error
if b.err != nil {
err = b.err
break
}
// register for a direct write
if b.waitingReader.buf == nil {
wait = &b.waitingReader
wait.buf = p
wait.n = 0
}
// no data, wait
b.Wait()
}
b.L.Unlock()
return
}
|