1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
// Package asyncreader provides an asynchronous reader which reads
// independently of write
package asyncreader
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
)
const (
// BufferSize is the default size of the async buffer
BufferSize = 1024 * 1024
softStartInitial = 4 * 1024
bufferCacheSize = 64 // max number of buffers to keep in cache
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
)
// ErrorStreamAbandoned is returned when the input is closed before the end of the stream
var ErrorStreamAbandoned = errors.New("stream abandoned")
// AsyncReader will do async read-ahead from the input reader
// and make the data available as an io.Reader.
// This should be fully transparent, except that once an error
// has been returned from the Reader, it will not recover.
type AsyncReader struct {
in io.ReadCloser // Input reader
ready chan *buffer // Buffers ready to be handed to the reader
token chan struct{} // Tokens which allow a buffer to be taken
exit chan struct{} // Closes when finished
buffers int // Number of buffers
err error // If an error has occurred it is here
cur *buffer // Current buffer being served
exited chan struct{} // Channel is closed been the async reader shuts down
size int // size of buffer to use
closed bool // whether we have closed the underlying stream
mu sync.Mutex // lock for Read/WriteTo/Abandon/Close
ci *fs.ConfigInfo // for reading config
}
// New returns a reader that will asynchronously read from
// the supplied Reader into a number of buffers each of size BufferSize
// It will start reading from the input at once, maybe even before this
// function has returned.
// The input can be read from the returned reader.
// When done use Close to release the buffers and close the supplied input.
func New(ctx context.Context, rd io.ReadCloser, buffers int) (*AsyncReader, error) {
if buffers <= 0 {
return nil, errors.New("number of buffers too small")
}
if rd == nil {
return nil, errors.New("nil reader supplied")
}
a := &AsyncReader{
ci: fs.GetConfig(ctx),
}
a.init(rd, buffers)
return a, nil
}
func (a *AsyncReader) init(rd io.ReadCloser, buffers int) {
a.in = rd
a.ready = make(chan *buffer, buffers)
a.token = make(chan struct{}, buffers)
a.exit = make(chan struct{})
a.exited = make(chan struct{})
a.buffers = buffers
a.cur = nil
a.size = softStartInitial
// Create tokens
for i := 0; i < buffers; i++ {
a.token <- struct{}{}
}
// Start async reader
go func() {
// Ensure that when we exit this is signalled.
defer close(a.exited)
defer close(a.ready)
for {
select {
case <-a.token:
b := a.getBuffer()
if a.size < BufferSize {
b.buf = b.buf[:a.size]
a.size <<= 1
}
err := b.read(a.in)
a.ready <- b
if err != nil {
return
}
case <-a.exit:
return
}
}
}()
}
// bufferPool is a global pool of buffers
var bufferPool *pool.Pool
var bufferPoolOnce sync.Once
// return the buffer to the pool (clearing it)
func (a *AsyncReader) putBuffer(b *buffer) {
bufferPool.Put(b.buf)
b.buf = nil
}
// get a buffer from the pool
func (a *AsyncReader) getBuffer() *buffer {
bufferPoolOnce.Do(func() {
// Initialise the buffer pool when used
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, a.ci.UseMmap)
})
return &buffer{
buf: bufferPool.Get(),
}
}
// Read will return the next available data.
func (a *AsyncReader) fill() (err error) {
if a.cur.isEmpty() {
if a.cur != nil {
a.putBuffer(a.cur)
a.token <- struct{}{}
a.cur = nil
}
b, ok := <-a.ready
if !ok {
// Return an error to show fill failed
if a.err == nil {
return ErrorStreamAbandoned
}
return a.err
}
a.cur = b
}
return nil
}
// Read will return the next available data.
func (a *AsyncReader) Read(p []byte) (n int, err error) {
a.mu.Lock()
defer a.mu.Unlock()
// Swap buffer and maybe return error
err = a.fill()
if err != nil {
return 0, err
}
// Copy what we can
n = copy(p, a.cur.buffer())
a.cur.increment(n)
// If at end of buffer, return any error, if present
if a.cur.isEmpty() {
a.err = a.cur.err
return n, a.err
}
return n, nil
}
// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written.
// Any error encountered during the write is also returned.
func (a *AsyncReader) WriteTo(w io.Writer) (n int64, err error) {
a.mu.Lock()
defer a.mu.Unlock()
n = 0
for {
err = a.fill()
if err == io.EOF {
return n, nil
}
if err != nil {
return n, err
}
n2, err := w.Write(a.cur.buffer())
a.cur.increment(n2)
n += int64(n2)
if err != nil {
return n, err
}
if a.cur.err == io.EOF {
a.err = a.cur.err
return n, err
}
if a.cur.err != nil {
a.err = a.cur.err
return n, a.cur.err
}
}
}
// SkipBytes will try to seek 'skip' bytes relative to the current position.
// On success it returns true. If 'skip' is outside the current buffer data or
// an error occurs, Abandon is called and false is returned.
func (a *AsyncReader) SkipBytes(skip int) (ok bool) {
a.mu.Lock()
defer func() {
a.mu.Unlock()
if !ok {
a.Abandon()
}
}()
if a.err != nil {
return false
}
if skip < 0 {
// seek backwards if skip is inside current buffer
if a.cur != nil && a.cur.offset+skip >= 0 {
a.cur.offset += skip
return true
}
return false
}
// early return if skip is past the maximum buffer capacity
if skip >= (len(a.ready)+1)*BufferSize {
return false
}
refillTokens := 0
for {
if a.cur.isEmpty() {
if a.cur != nil {
a.putBuffer(a.cur)
refillTokens++
a.cur = nil
}
select {
case b, ok := <-a.ready:
if !ok {
return false
}
a.cur = b
default:
return false
}
}
n := len(a.cur.buffer())
if n > skip {
n = skip
}
a.cur.increment(n)
skip -= n
if skip == 0 {
for ; refillTokens > 0; refillTokens-- {
a.token <- struct{}{}
}
// If at end of buffer, store any error, if present
if a.cur.isEmpty() && a.cur.err != nil {
a.err = a.cur.err
}
return true
}
if a.cur.err != nil {
a.err = a.cur.err
return false
}
}
}
// StopBuffering will ensure that the underlying async reader is shut
// down so no more is read from the input.
//
// This does not free the memory so Abandon() or Close() need to be
// called on the input.
//
// This does not wait for Read/WriteTo to complete so can be called
// concurrently to those.
func (a *AsyncReader) StopBuffering() {
select {
case <-a.exit:
// Do nothing if reader routine already exited
return
default:
}
// Close and wait for go routine
close(a.exit)
<-a.exited
}
// Abandon will ensure that the underlying async reader is shut down
// and memory is returned. It does everything but close the input.
//
// It will NOT close the input supplied on New.
func (a *AsyncReader) Abandon() {
a.StopBuffering()
// take the lock to wait for Read/WriteTo to complete
a.mu.Lock()
defer a.mu.Unlock()
// Return any outstanding buffers to the Pool
if a.cur != nil {
a.putBuffer(a.cur)
a.cur = nil
}
for b := range a.ready {
a.putBuffer(b)
}
}
// Close will ensure that the underlying async reader is shut down.
// It will also close the input supplied on New.
func (a *AsyncReader) Close() (err error) {
a.Abandon()
if a.closed {
return nil
}
a.closed = true
return a.in.Close()
}
// Internal buffer
// If an error is present, it must be returned
// once all buffer content has been served.
type buffer struct {
buf []byte
err error
offset int
}
// isEmpty returns true is offset is at end of
// buffer, or
func (b *buffer) isEmpty() bool {
if b == nil {
return true
}
if len(b.buf)-b.offset <= 0 {
return true
}
return false
}
// read into start of the buffer from the supplied reader,
// resets the offset and updates the size of the buffer.
// Any error encountered during the read is returned.
func (b *buffer) read(rd io.Reader) error {
var n int
n, b.err = readers.ReadFill(rd, b.buf)
b.buf = b.buf[0:n]
b.offset = 0
return b.err
}
// Return the buffer at current offset
func (b *buffer) buffer() []byte {
return b.buf[b.offset:]
}
// increment the offset
func (b *buffer) increment(n int) {
b.offset += n
}
|