File: reader_writer.go

package info (click to toggle)
rclone 1.69.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 45,716 kB
  • sloc: sh: 1,115; xml: 857; python: 754; javascript: 612; makefile: 299; ansic: 101; php: 74
file content (343 lines) | stat: -rw-r--r-- 8,660 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package pool

import (
	"context"
	"errors"
	"io"
	"sync"
	"time"
)

// RWAccount is a function which will be called after every read
// from the RW.
//
// It may return an error which will be passed back to the user.
type RWAccount func(n int) error

// RW contains the state for the read/writer
//
// It can be used as a FIFO to read data from a source and write it out again.
type RW struct {
	// Written once variables in initialization
	pool      *Pool     // pool to get pages from
	account   RWAccount // account for a read
	accountOn int       // only account on or after this read

	// Shared variables between Read and Write
	// Write updates these but Read reads from them
	// They must all stay in sync together
	mu         sync.Mutex    // protect the shared variables
	pages      [][]byte      // backing store
	size       int           // size written
	lastOffset int           // size in last page
	written    chan struct{} // signalled when a write happens

	// Read side Variables
	out   int // offset we are reading from
	reads int // count how many times the data has been read
}

var (
	errInvalidWhence = errors.New("pool.RW Seek: invalid whence")
	errNegativeSeek  = errors.New("pool.RW Seek: negative position")
	errSeekPastEnd   = errors.New("pool.RW Seek: attempt to seek past end of data")
)

// NewRW returns a reader / writer which is backed from pages from the
// pool passed in.
//
// Data can be stored in it by calling Write and read from it by
// calling Read.
//
// When writing it only appends data. Seek only applies to reading.
func NewRW(pool *Pool) *RW {
	rw := &RW{
		pool:    pool,
		pages:   make([][]byte, 0, 16),
		written: make(chan struct{}, 1),
	}
	return rw
}

// SetAccounting should be provided with a function which will be
// called after every read from the RW.
//
// It may return an error which will be passed back to the user.
//
// Not thread safe - call in initialization only.
func (rw *RW) SetAccounting(account RWAccount) *RW {
	rw.account = account
	return rw
}

// DelayAccountinger enables an accounting delay
type DelayAccountinger interface {
	// DelayAccounting makes sure the accounting function only
	// gets called on the i-th or later read of the data from this
	// point (counting from 1).
	//
	// This is useful so that we don't account initial reads of
	// the data e.g. when calculating hashes.
	//
	// Set this to 0 to account everything.
	DelayAccounting(i int)
}

// DelayAccounting makes sure the accounting function only gets called
// on the i-th or later read of the data from this point (counting
// from 1).
//
// This is useful so that we don't account initial reads of the data
// e.g. when calculating hashes.
//
// Set this to 0 to account everything.
//
// Not thread safe - call in initialization only.
func (rw *RW) DelayAccounting(i int) {
	rw.accountOn = i
	rw.reads = 0
}

// Returns the page and offset of i for reading.
//
// Ensure there are pages before calling this.
func (rw *RW) readPage(i int) (page []byte) {
	rw.mu.Lock()
	defer rw.mu.Unlock()
	// Count a read of the data if we read the first page
	if i == 0 {
		rw.reads++
	}
	pageNumber := i / rw.pool.bufferSize
	offset := i % rw.pool.bufferSize
	page = rw.pages[pageNumber]
	// Clip the last page to the amount written
	if pageNumber == len(rw.pages)-1 {
		page = page[:rw.lastOffset]
	}
	return page[offset:]
}

// account for n bytes being read
func (rw *RW) accountRead(n int) error {
	if rw.account == nil {
		return nil
	}
	// Don't start accounting until we've reached this many reads
	//
	// rw.reads will be 1 the first time this is called
	// rw.accountOn 2 means start accounting on the 2nd read through
	if rw.reads >= rw.accountOn {
		return rw.account(n)
	}
	return nil
}

// Returns true if we have read to EOF
func (rw *RW) eof() bool {
	rw.mu.Lock()
	defer rw.mu.Unlock()
	return rw.out >= rw.size
}

// Read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. If some
// data is available but not len(p) bytes, Read returns what is
// available instead of waiting for more.
func (rw *RW) Read(p []byte) (n int, err error) {
	var (
		nn   int
		page []byte
	)
	for len(p) > 0 {
		if rw.eof() {
			return n, io.EOF
		}
		page = rw.readPage(rw.out)
		nn = copy(p, page)
		p = p[nn:]
		n += nn
		rw.out += nn
		err = rw.accountRead(nn)
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

// WriteTo writes data to w until there's no more data to write or
// when an error occurs. The return value n is the number of bytes
// written. Any error encountered during the write is also returned.
//
// The Copy function uses WriteTo if available. This avoids an
// allocation and a copy.
func (rw *RW) WriteTo(w io.Writer) (n int64, err error) {
	var (
		nn   int
		page []byte
	)
	for !rw.eof() {
		page = rw.readPage(rw.out)
		nn, err = w.Write(page)
		n += int64(nn)
		rw.out += nn
		if err != nil {
			return n, err
		}
		err = rw.accountRead(nn)
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

// Get the page we are writing to
func (rw *RW) writePage() (page []byte) {
	rw.mu.Lock()
	defer rw.mu.Unlock()
	if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
		return rw.pages[len(rw.pages)-1][rw.lastOffset:]
	}
	page = rw.pool.Get()
	rw.pages = append(rw.pages, page)
	rw.lastOffset = 0
	return page
}

// Write writes len(p) bytes from p to the underlying data stream. It returns
// the number of bytes written len(p). It cannot return an error.
func (rw *RW) Write(p []byte) (n int, err error) {
	var (
		nn   int
		page []byte
	)
	for len(p) > 0 {
		page = rw.writePage()
		nn = copy(page, p)
		p = p[nn:]
		n += nn
		rw.mu.Lock()
		rw.size += nn
		rw.lastOffset += nn
		rw.mu.Unlock()
		rw.signalWrite() // signal more data available
	}
	return n, nil
}

// ReadFrom reads data from r until EOF or error. The return value n is the
// number of bytes read. Any error except EOF encountered during the read is
// also returned.
//
// The Copy function uses ReadFrom if available. This avoids an
// allocation and a copy.
func (rw *RW) ReadFrom(r io.Reader) (n int64, err error) {
	var (
		nn   int
		page []byte
	)
	for err == nil {
		page = rw.writePage()
		nn, err = r.Read(page)
		n += int64(nn)
		rw.mu.Lock()
		rw.size += nn
		rw.lastOffset += nn
		rw.mu.Unlock()
		rw.signalWrite() // signal more data available
	}
	if err == io.EOF {
		err = nil
	}
	return n, err
}

// signal that a write has happened
func (rw *RW) signalWrite() {
	select {
	case rw.written <- struct{}{}:
	default:
	}
}

// WaitWrite sleeps until a data is written to the RW or Close is
// called or the context is cancelled occurs or for a maximum of 1
// Second then returns.
//
// This can be used when calling Read while the buffer is filling up.
func (rw *RW) WaitWrite(ctx context.Context) {
	timer := time.NewTimer(time.Second)
	select {
	case <-timer.C:
	case <-ctx.Done():
	case <-rw.written:
	}
	timer.Stop()
}

// Seek sets the offset for the next Read (not Write - this is always
// appended) to offset, interpreted according to whence: SeekStart
// means relative to the start of the file, SeekCurrent means relative
// to the current offset, and SeekEnd means relative to the end (for
// example, offset = -2 specifies the penultimate byte of the file).
// Seek returns the new offset relative to the start of the file or an
// error, if any.
//
// Seeking to an offset before the start of the file is an error. Seeking
// beyond the end of the written data is an error.
func (rw *RW) Seek(offset int64, whence int) (int64, error) {
	var abs int64
	rw.mu.Lock()
	size := int64(rw.size)
	rw.mu.Unlock()
	switch whence {
	case io.SeekStart:
		abs = offset
	case io.SeekCurrent:
		abs = int64(rw.out) + offset
	case io.SeekEnd:
		abs = size + offset
	default:
		return 0, errInvalidWhence
	}
	if abs < 0 {
		return 0, errNegativeSeek
	}
	if abs > size {
		return offset - (abs - size), errSeekPastEnd
	}
	rw.out = int(abs)
	return abs, nil
}

// Close the buffer returning memory to the pool
func (rw *RW) Close() error {
	rw.mu.Lock()
	defer rw.mu.Unlock()
	rw.signalWrite() // signal more data available
	for _, page := range rw.pages {
		rw.pool.Put(page)
	}
	rw.pages = nil
	return nil
}

// Size returns the number of bytes in the buffer
func (rw *RW) Size() int64 {
	rw.mu.Lock()
	defer rw.mu.Unlock()
	return int64(rw.size)
}

// Check interfaces
var (
	_ io.Reader         = (*RW)(nil)
	_ io.ReaderFrom     = (*RW)(nil)
	_ io.Writer         = (*RW)(nil)
	_ io.WriterTo       = (*RW)(nil)
	_ io.Seeker         = (*RW)(nil)
	_ io.Closer         = (*RW)(nil)
	_ DelayAccountinger = (*RW)(nil)
)