File: parallel.go

package info (click to toggle)
rclone 1.69.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 45,716 kB
  • sloc: sh: 1,115; xml: 857; python: 754; javascript: 612; makefile: 299; ansic: 101; php: 74
file content (383 lines) | stat: -rw-r--r-- 9,965 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package chunkedreader

import (
	"context"
	"fmt"
	"io"
	"sync"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/hash"
	"github.com/rclone/rclone/fs/log"
	"github.com/rclone/rclone/fs/operations"
	"github.com/rclone/rclone/lib/multipart"
	"github.com/rclone/rclone/lib/pool"
)

// parallel reads Object in chunks of a given size in parallel.
type parallel struct {
	ctx       context.Context
	o         fs.Object  // source to read from
	mu        sync.Mutex // protects following fields
	endStream int64      // offset we have started streams for
	offset    int64      // offset the read file pointer is at
	chunkSize int64      // length of the chunks to read
	nstreams  int        // number of streams to use
	streams   []*stream  // the opened streams in offset order - the current one is first
	closed    bool       // has Close been called?
}

// stream holds the info about a single download
type stream struct {
	cr        *parallel       // parent reader
	ctx       context.Context // ctx to cancel if needed
	cancel    func()          // cancel the stream
	rc        io.ReadCloser   // reader that it is reading from, may be nil
	offset    int64           // where the stream is reading from
	size      int64           // and the size it is reading
	readBytes int64           // bytes read from the stream
	rw        *pool.RW        // buffer for read
	err       chan error      // error returned from the read
	name      string          // name of this stream for debugging
}

// Start a stream reading (offset, offset+size)
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
	ctx, cancel := context.WithCancel(ctx)

	// Create the stream
	rw := multipart.NewRW()
	s = &stream{
		cr:     cr,
		ctx:    ctx,
		cancel: cancel,
		offset: offset,
		size:   size,
		rw:     rw,
		err:    make(chan error, 1),
	}
	s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s)

	// Start the background read into the buffer
	go s.readFrom(ctx)

	// Return the stream to the caller
	return s, nil
}

// read the file into the buffer
func (s *stream) readFrom(ctx context.Context) {
	// Open the object at the correct range
	fs.Debugf(s.cr.o, "%s: open", s.name)
	rc, err := operations.Open(ctx, s.cr.o,
		&fs.HashesOption{Hashes: hash.Set(hash.None)},
		&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
	if err != nil {
		s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
		return
	}
	s.rc = rc

	fs.Debugf(s.cr.o, "%s: readfrom started", s.name)
	_, err = s.rw.ReadFrom(s.rc)
	fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err)
	s.err <- err
}

// eof is true when we've read all the data we are expecting
func (s *stream) eof() bool {
	return s.readBytes >= s.size
}

// read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. If some
// data is available but not len(p) bytes, read returns what is
// available instead of waiting for more.
func (s *stream) read(p []byte) (n int, err error) {
	defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err)
	if len(p) == 0 {
		return n, nil
	}
	for {
		var nn int
		nn, err = s.rw.Read(p[n:])
		fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err)
		s.readBytes += int64(nn)
		n += nn
		if err != nil && err != io.EOF {
			return n, err
		}
		if s.eof() {
			return n, io.EOF
		}
		// Received a faux io.EOF because we haven't read all the data yet
		if n >= len(p) {
			break
		}
		// Wait for a write to happen to read more
		s.rw.WaitWrite(s.ctx)
	}
	return n, nil
}

// Sets *perr to newErr if err is nil
func orErr(perr *error, newErr error) {
	if *perr == nil {
		*perr = newErr
	}
}

// Close a stream
func (s *stream) close() (err error) {
	defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err)
	s.cancel()
	err = <-s.err // wait for readFrom to stop and return error
	orErr(&err, s.rw.Close())
	if s.rc != nil {
		orErr(&err, s.rc.Close())
	}
	if err != nil && err != io.EOF {
		return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
	}
	return nil
}

// Make a new parallel chunked reader
//
// Mustn't be called for an unknown size object
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
	// Make sure chunkSize is a multiple of multipart.BufferSize
	if chunkSize < 0 {
		chunkSize = multipart.BufferSize
	}
	newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
	if newChunkSize < chunkSize {
		newChunkSize += multipart.BufferSize
	}

	fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)

	return &parallel{
		ctx:       ctx,
		o:         o,
		offset:    0,
		chunkSize: newChunkSize,
		nstreams:  streams,
	}
}

// _open starts the file transferring at offset
//
// Call with the lock held
func (cr *parallel) _open() (err error) {
	size := cr.o.Size()
	if size < 0 {
		return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
	}
	// Launched enough streams already
	if cr.endStream >= size {
		return nil
	}

	// Make sure cr.nstreams are running
	for i := len(cr.streams); i < cr.nstreams; i++ {
		// clip to length of file
		chunkSize := cr.chunkSize
		newEndStream := cr.endStream + chunkSize
		if newEndStream > size {
			chunkSize = size - cr.endStream
			newEndStream = cr.endStream + chunkSize
		}

		s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
		if err != nil {
			return err
		}
		cr.streams = append(cr.streams, s)
		cr.endStream = newEndStream

		if cr.endStream >= size {
			break
		}
	}

	return nil
}

// Finished reading the current stream so pop it off and destroy it
//
// Call with lock held
func (cr *parallel) _popStream() (err error) {
	defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
	if len(cr.streams) == 0 {
		return nil
	}
	stream := cr.streams[0]
	err = stream.close()
	cr.streams[0] = nil
	cr.streams = cr.streams[1:]
	return err
}

// Get rid of all the streams
//
// Call with lock held
func (cr *parallel) _popStreams() (err error) {
	defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
	for len(cr.streams) > 0 {
		orErr(&err, cr._popStream())
	}
	cr.streams = nil
	return err
}

// Read from the file - for details see io.Reader
func (cr *parallel) Read(p []byte) (n int, err error) {
	defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
	cr.mu.Lock()
	defer cr.mu.Unlock()

	if cr.closed {
		return 0, ErrorFileClosed
	}

	for n < len(p) {
		// Make sure we have the correct number of streams open
		err = cr._open()
		if err != nil {
			return n, err
		}

		// No streams left means EOF
		if len(cr.streams) == 0 {
			return n, io.EOF
		}

		// Read from the stream
		stream := cr.streams[0]
		nn, err := stream.read(p[n:])
		n += nn
		cr.offset += int64(nn)
		if err == io.EOF {
			err = cr._popStream()
			if err != nil {
				break
			}
		} else if err != nil {
			break
		}
	}
	return n, err
}

// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *parallel) Close() error {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	if cr.closed {
		return ErrorFileClosed
	}
	cr.closed = true

	// Close all the streams
	return cr._popStreams()
}

// Seek the file - for details see io.Seeker
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)

	if cr.closed {
		return 0, ErrorFileClosed
	}

	size := cr.o.Size()
	currentOffset := cr.offset
	switch whence {
	case io.SeekStart:
		currentOffset = 0
	case io.SeekEnd:
		currentOffset = size
	}
	// set the new chunk start
	newOffset := currentOffset + offset
	if newOffset < 0 || newOffset >= size {
		return 0, ErrorInvalidSeek
	}

	// If seek pointer didn't move, return now
	if newOffset == cr.offset {
		fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
		return cr.offset, nil
	}

	cr.offset = newOffset

	// Ditch out of range streams
	for len(cr.streams) > 0 {
		stream := cr.streams[0]
		if newOffset >= stream.offset+stream.size {
			_ = cr._popStream()
		} else {
			break
		}
	}

	// If no streams remain we can just restart
	if len(cr.streams) == 0 {
		fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
		cr.endStream = cr.offset
		return cr.offset, nil
	}

	// Current stream
	stream := cr.streams[0]

	// If new offset is before current stream then ditch all the streams
	if newOffset < stream.offset {
		_ = cr._popStreams()
		fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
		cr.endStream = cr.offset
		return cr.offset, nil
	}

	// Seek the current stream
	streamOffset := newOffset - stream.offset
	stream.readBytes = streamOffset // correct read value
	fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
	// Wait for the read to the correct part of the data
	for stream.rw.Size() < streamOffset {
		stream.rw.WaitWrite(cr.ctx)
	}
	_, err := stream.rw.Seek(streamOffset, io.SeekStart)
	if err != nil {
		return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
	}

	return cr.offset, nil
}

// RangeSeek the file - for details see RangeSeeker
//
// In the parallel chunked reader this just acts like Seek
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
	return cr.Seek(offset, whence)
}

// Open forces the connection to be opened
func (cr *parallel) Open() (ChunkedReader, error) {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	return cr, cr._open()
}

var (
	_ ChunkedReader = (*parallel)(nil)
)