File: chunkedreader.go

package info (click to toggle)
rclone 1.65.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 38,352 kB
  • sloc: sh: 1,056; xml: 857; python: 693; javascript: 612; makefile: 289; ansic: 101; php: 74
file content (250 lines) | stat: -rw-r--r-- 7,329 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Package chunkedreader provides functionality for reading in chunks.
package chunkedreader

import (
	"context"
	"errors"
	"io"
	"sync"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/hash"
)

// io related errors returned by ChunkedReader
var (
	ErrorFileClosed  = errors.New("file already closed")
	ErrorInvalidSeek = errors.New("invalid seek position")
)

// ChunkedReader is a reader for an Object with the possibility
// of reading the source in chunks of given size
//
// An initialChunkSize of <= 0 will disable chunked reading.
type ChunkedReader struct {
	ctx              context.Context
	mu               sync.Mutex    // protects following fields
	o                fs.Object     // source to read from
	rc               io.ReadCloser // reader for the current open chunk
	offset           int64         // offset the next Read will start. -1 forces a reopen of o
	chunkOffset      int64         // beginning of the current or next chunk
	chunkSize        int64         // length of the current or next chunk. -1 will open o from chunkOffset to the end
	initialChunkSize int64         // default chunkSize after the chunk specified by RangeSeek is complete
	maxChunkSize     int64         // consecutive read chunks will double in size until reached. -1 means no limit
	customChunkSize  bool          // is the current chunkSize set by RangeSeek?
	closed           bool          // has Close been called?
}

// New returns a ChunkedReader for the Object.
//
// An initialChunkSize of <= 0 will disable chunked reading.
// If maxChunkSize is greater than initialChunkSize, the chunk size will be
// doubled after each chunk read with a maximum of maxChunkSize.
// A Seek or RangeSeek will reset the chunk size to it's initial value
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader {
	if initialChunkSize <= 0 {
		initialChunkSize = -1
	}
	if maxChunkSize != -1 && maxChunkSize < initialChunkSize {
		maxChunkSize = initialChunkSize
	}
	return &ChunkedReader{
		ctx:              ctx,
		o:                o,
		offset:           -1,
		chunkSize:        initialChunkSize,
		initialChunkSize: initialChunkSize,
		maxChunkSize:     maxChunkSize,
	}
}

// Read from the file - for details see io.Reader
func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	if cr.closed {
		return 0, ErrorFileClosed
	}

	for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
		// the current chunk boundary. valid only when chunkSize > 0
		chunkEnd := cr.chunkOffset + cr.chunkSize

		fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)

		switch {
		case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
			cr.chunkOffset = cr.offset
			if cr.customChunkSize { // last chunkSize was set by RangeSeek
				cr.customChunkSize = false
				cr.chunkSize = cr.initialChunkSize
			} else {
				cr.chunkSize *= 2
				if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
					cr.chunkSize = cr.maxChunkSize
				}
			}
			// recalculate the chunk boundary. valid only when chunkSize > 0
			chunkEnd = cr.chunkOffset + cr.chunkSize
			fallthrough
		case cr.offset == -1: // first Read or Read after RangeSeek
			err = cr.openRange()
			if err != nil {
				return
			}
		}

		var buf []byte
		chunkRest := chunkEnd - cr.offset
		// limit read to chunk boundaries if chunkSize > 0
		if reqSize > chunkRest && cr.chunkSize > 0 {
			buf, p = p[0:chunkRest], p[chunkRest:]
		} else {
			buf, p = p, nil
		}
		var rn int
		rn, err = io.ReadFull(cr.rc, buf)
		n += rn
		cr.offset += int64(rn)
		if err != nil {
			if err == io.ErrUnexpectedEOF {
				err = io.EOF
			}
			return
		}
	}
	return n, nil
}

// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *ChunkedReader) Close() error {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	if cr.closed {
		return ErrorFileClosed
	}
	cr.closed = true

	return cr.resetReader(nil, 0)
}

// Seek the file - for details see io.Seeker
func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) {
	return cr.RangeSeek(context.TODO(), offset, whence, -1)
}

// RangeSeek the file - for details see RangeSeeker
//
// The specified length will only apply to the next chunk opened.
// RangeSeek will not reopen the source until Read is called.
func (cr *ChunkedReader) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)

	if cr.closed {
		return 0, ErrorFileClosed
	}

	size := cr.o.Size()
	switch whence {
	case io.SeekStart:
		cr.offset = 0
	case io.SeekEnd:
		cr.offset = size
	}
	// set the new chunk start
	cr.chunkOffset = cr.offset + offset
	// force reopen on next Read
	cr.offset = -1
	if length > 0 {
		cr.customChunkSize = true
		cr.chunkSize = length
	} else {
		cr.chunkSize = cr.initialChunkSize
	}
	if cr.chunkOffset < 0 || cr.chunkOffset >= size {
		cr.chunkOffset = 0
		return 0, ErrorInvalidSeek
	}
	return cr.chunkOffset, nil
}

// Open forces the connection to be opened
func (cr *ChunkedReader) Open() (*ChunkedReader, error) {
	cr.mu.Lock()
	defer cr.mu.Unlock()

	if cr.rc != nil && cr.offset != -1 {
		return cr, nil
	}
	return cr, cr.openRange()
}

// openRange will open the source Object with the current chunk range
//
// If the current open reader implements RangeSeeker, it is tried first.
// When RangeSeek fails, o.Open with a RangeOption is used.
//
// A length <= 0 will request till the end of the file
func (cr *ChunkedReader) openRange() error {
	offset, length := cr.chunkOffset, cr.chunkSize
	fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)

	if cr.closed {
		return ErrorFileClosed
	}

	if rs, ok := cr.rc.(fs.RangeSeeker); ok {
		n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length)
		if err == nil && n == offset {
			cr.offset = offset
			return nil
		}
		if err != nil {
			fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
		} else {
			fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
		}
	}

	var rc io.ReadCloser
	var err error
	if length <= 0 {
		if offset == 0 {
			rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)})
		} else {
			rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1})
		}
	} else {
		rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1})
	}
	if err != nil {
		return err
	}
	return cr.resetReader(rc, offset)
}

// resetReader switches the current reader to the given reader.
// The old reader will be Close'd before setting the new reader.
func (cr *ChunkedReader) resetReader(rc io.ReadCloser, offset int64) error {
	if cr.rc != nil {
		if err := cr.rc.Close(); err != nil {
			return err
		}
	}
	cr.rc = rc
	cr.offset = offset
	return nil
}

var (
	_ io.ReadCloser  = (*ChunkedReader)(nil)
	_ io.Seeker      = (*ChunkedReader)(nil)
	_ fs.RangeSeeker = (*ChunkedReader)(nil)
)