1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
|
package chunkedreader
import (
"context"
"fmt"
"io"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pool"
)
// parallel reads Object in chunks of a given size in parallel.
type parallel struct {
ctx context.Context
o fs.Object // source to read from
mu sync.Mutex // protects following fields
endStream int64 // offset we have started streams for
offset int64 // offset the read file pointer is at
chunkSize int64 // length of the chunks to read
nstreams int // number of streams to use
streams []*stream // the opened streams in offset order - the current one is first
closed bool // has Close been called?
}
// stream holds the info about a single download
type stream struct {
cr *parallel // parent reader
ctx context.Context // ctx to cancel if needed
cancel func() // cancel the stream
rc io.ReadCloser // reader that it is reading from, may be nil
offset int64 // where the stream is reading from
size int64 // and the size it is reading
readBytes int64 // bytes read from the stream
rw *pool.RW // buffer for read
err chan error // error returned from the read
name string // name of this stream for debugging
}
// Start a stream reading (offset, offset+size)
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
ctx, cancel := context.WithCancel(ctx)
// Create the stream
rw := multipart.NewRW()
s = &stream{
cr: cr,
ctx: ctx,
cancel: cancel,
offset: offset,
size: size,
rw: rw,
err: make(chan error, 1),
}
s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s)
// Start the background read into the buffer
go s.readFrom(ctx)
// Return the stream to the caller
return s, nil
}
// read the file into the buffer
func (s *stream) readFrom(ctx context.Context) {
// Open the object at the correct range
fs.Debugf(s.cr.o, "%s: open", s.name)
rc, err := operations.Open(ctx, s.cr.o,
&fs.HashesOption{Hashes: hash.Set(hash.None)},
&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
if err != nil {
s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
return
}
s.rc = rc
fs.Debugf(s.cr.o, "%s: readfrom started", s.name)
_, err = s.rw.ReadFrom(s.rc)
fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err)
s.err <- err
}
// eof is true when we've read all the data we are expecting
func (s *stream) eof() bool {
return s.readBytes >= s.size
}
// read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. If some
// data is available but not len(p) bytes, read returns what is
// available instead of waiting for more.
func (s *stream) read(p []byte) (n int, err error) {
defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err)
if len(p) == 0 {
return n, nil
}
for {
var nn int
nn, err = s.rw.Read(p[n:])
fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err)
s.readBytes += int64(nn)
n += nn
if err != nil && err != io.EOF {
return n, err
}
if s.eof() {
return n, io.EOF
}
// Received a faux io.EOF because we haven't read all the data yet
if n >= len(p) {
break
}
// Wait for a write to happen to read more
s.rw.WaitWrite(s.ctx)
}
return n, nil
}
// Sets *perr to newErr if err is nil
func orErr(perr *error, newErr error) {
if *perr == nil {
*perr = newErr
}
}
// Close a stream
func (s *stream) close() (err error) {
defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err)
s.cancel()
err = <-s.err // wait for readFrom to stop and return error
orErr(&err, s.rw.Close())
if s.rc != nil {
orErr(&err, s.rc.Close())
}
if err != nil && err != io.EOF {
return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
}
return nil
}
// Make a new parallel chunked reader
//
// Mustn't be called for an unknown size object
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
// Make sure chunkSize is a multiple of multipart.BufferSize
if chunkSize < 0 {
chunkSize = multipart.BufferSize
}
newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
if newChunkSize < chunkSize {
newChunkSize += multipart.BufferSize
}
fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)
return ¶llel{
ctx: ctx,
o: o,
offset: 0,
chunkSize: newChunkSize,
nstreams: streams,
}
}
// _open starts the file transferring at offset
//
// Call with the lock held
func (cr *parallel) _open() (err error) {
size := cr.o.Size()
if size < 0 {
return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
}
// Launched enough streams already
if cr.endStream >= size {
return nil
}
// Make sure cr.nstreams are running
for i := len(cr.streams); i < cr.nstreams; i++ {
// clip to length of file
chunkSize := cr.chunkSize
newEndStream := cr.endStream + chunkSize
if newEndStream > size {
chunkSize = size - cr.endStream
newEndStream = cr.endStream + chunkSize
}
s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
if err != nil {
return err
}
cr.streams = append(cr.streams, s)
cr.endStream = newEndStream
if cr.endStream >= size {
break
}
}
return nil
}
// Finished reading the current stream so pop it off and destroy it
//
// Call with lock held
func (cr *parallel) _popStream() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
if len(cr.streams) == 0 {
return nil
}
stream := cr.streams[0]
err = stream.close()
cr.streams[0] = nil
cr.streams = cr.streams[1:]
return err
}
// Get rid of all the streams
//
// Call with lock held
func (cr *parallel) _popStreams() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
for len(cr.streams) > 0 {
orErr(&err, cr._popStream())
}
cr.streams = nil
return err
}
// Read from the file - for details see io.Reader
func (cr *parallel) Read(p []byte) (n int, err error) {
defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for n < len(p) {
// Make sure we have the correct number of streams open
err = cr._open()
if err != nil {
return n, err
}
// No streams left means EOF
if len(cr.streams) == 0 {
return n, io.EOF
}
// Read from the stream
stream := cr.streams[0]
nn, err := stream.read(p[n:])
n += nn
cr.offset += int64(nn)
if err == io.EOF {
err = cr._popStream()
if err != nil {
break
}
} else if err != nil {
break
}
}
return n, err
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *parallel) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
// Close all the streams
return cr._popStreams()
}
// Seek the file - for details see io.Seeker
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
currentOffset := cr.offset
switch whence {
case io.SeekStart:
currentOffset = 0
case io.SeekEnd:
currentOffset = size
}
// set the new chunk start
newOffset := currentOffset + offset
if newOffset < 0 || newOffset >= size {
return 0, ErrorInvalidSeek
}
// If seek pointer didn't move, return now
if newOffset == cr.offset {
fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
return cr.offset, nil
}
cr.offset = newOffset
// Ditch out of range streams
for len(cr.streams) > 0 {
stream := cr.streams[0]
if newOffset >= stream.offset+stream.size {
_ = cr._popStream()
} else {
break
}
}
// If no streams remain we can just restart
if len(cr.streams) == 0 {
fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
cr.endStream = cr.offset
return cr.offset, nil
}
// Current stream
stream := cr.streams[0]
// If new offset is before current stream then ditch all the streams
if newOffset < stream.offset {
_ = cr._popStreams()
fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
cr.endStream = cr.offset
return cr.offset, nil
}
// Seek the current stream
streamOffset := newOffset - stream.offset
stream.readBytes = streamOffset // correct read value
fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
// Wait for the read to the correct part of the data
for stream.rw.Size() < streamOffset {
stream.rw.WaitWrite(cr.ctx)
}
_, err := stream.rw.Seek(streamOffset, io.SeekStart)
if err != nil {
return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
}
return cr.offset, nil
}
// RangeSeek the file - for details see RangeSeeker
//
// In the parallel chunked reader this just acts like Seek
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
return cr.Seek(offset, whence)
}
// Open forces the connection to be opened
func (cr *parallel) Open() (ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
return cr, cr._open()
}
var (
_ ChunkedReader = (*parallel)(nil)
)
|