File: zt_retry_reader_test.go

package info (click to toggle)
golang-github-azure-azure-storage-blob-go 0.15.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 2,084 kB
  • sloc: makefile: 3
file content (399 lines) | stat: -rw-r--r-- 13,537 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package azblob

import (
	"context"
	"crypto/rand"
	"errors"
	"fmt"
	"io"
	"net"
	"net/http"
	"time"

	chk "gopkg.in/check.v1"
)

// Testings for RetryReader
// This reader return one byte through each Read call
type perByteReader struct {
	RandomBytes []byte // Random generated bytes

	byteCount              int // Bytes can be returned before EOF
	currentByteIndex       int // Bytes that have already been returned.
	doInjectError          bool
	doInjectErrorByteIndex int
	doInjectTimes          int
	injectedError          error

	// sleepDuraion and closeChannel are only use in "forced cancellation" tests
	sleepDuration time.Duration
	closeChannel  chan struct{}
}

func newPerByteReader(byteCount int) *perByteReader {
	perByteReader := perByteReader{
		byteCount:    byteCount,
		closeChannel: nil,
	}

	perByteReader.RandomBytes = make([]byte, byteCount)
	_, _ = rand.Read(perByteReader.RandomBytes)

	return &perByteReader
}

func newSingleUsePerByteReader(contents []byte) *perByteReader {
	perByteReader := perByteReader{
		byteCount:    len(contents),
		closeChannel: make(chan struct{}, 10),
	}

	perByteReader.RandomBytes = contents

	return &perByteReader
}

func (r *perByteReader) Read(b []byte) (n int, err error) {
	if r.doInjectError && r.doInjectErrorByteIndex == r.currentByteIndex && r.doInjectTimes > 0 {
		r.doInjectTimes--
		return 0, r.injectedError
	}

	if r.currentByteIndex < r.byteCount {
		n = copy(b, r.RandomBytes[r.currentByteIndex:r.currentByteIndex+1])
		r.currentByteIndex += n

		// simulate a delay, which may be successful or, if we're closed from another go-routine, may return an
		// error
		select {
		case <-r.closeChannel:
			return n, errors.New(ReadOnClosedBodyMessage)
		case <-time.After(r.sleepDuration):
			return n, nil
		}
	}

	return 0, io.EOF
}

func (r *perByteReader) Close() error {
	if r.closeChannel != nil {
		r.closeChannel <- struct{}{}
	}
	return nil
}

// Test normal retry succeed, note initial response not provided.
// Tests both with and without notification of failures
func (s *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
	// Test twice, the second time using the optional "logging"/notification callback for failed tries
	// We must test both with and without the callback, since be testing without
	// we are testing that it is, indeed, optional to provide the callback
	for _, logThisRun := range []bool{false, true} {

		// Extra setup for testing notification of failures (i.e. of unsuccessful tries)
		failureMethodNumCalls := 0
		failureWillRetryCount := 0
		failureLastReportedFailureCount := -1
		var failureLastReportedError error = nil
		failureMethod := func(failureCount int, lastError error, offset int64, count int64, willRetry bool) {
			failureMethodNumCalls++
			if willRetry {
				failureWillRetryCount++
			}
			failureLastReportedFailureCount = failureCount
			failureLastReportedError = lastError
		}

		// Main test setup
		byteCount := 1
		body := newPerByteReader(byteCount)
		body.doInjectError = true
		body.doInjectErrorByteIndex = 0
		body.doInjectTimes = 1
		body.injectedError = &net.DNSError{IsTemporary: true}

		getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
			r := http.Response{}
			body.currentByteIndex = int(info.Offset)
			r.Body = body

			return &r, nil
		}

		httpGetterInfo := HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}
		initResponse, err := getter(context.Background(), httpGetterInfo)
		c.Assert(err, chk.IsNil)

		rrOptions := RetryReaderOptions{MaxRetryRequests: 1}
		if logThisRun {
			rrOptions.NotifyFailedRead = failureMethod
		}
		retryReader := NewRetryReader(context.Background(), initResponse, httpGetterInfo, rrOptions, getter)

		// should fail and succeed through retry
		can := make([]byte, 1)
		n, err := retryReader.Read(can)
		c.Assert(n, chk.Equals, 1)
		c.Assert(err, chk.IsNil)

		// check "logging", if it was enabled
		if logThisRun {
			// We only expect one failed try in this test
			// And the notification method is not called for successes
			c.Assert(failureMethodNumCalls, chk.Equals, 1)           // this is the number of calls we counted
			c.Assert(failureWillRetryCount, chk.Equals, 1)           // the sole failure was retried
			c.Assert(failureLastReportedFailureCount, chk.Equals, 1) // this is the number of failures reported by the notification method
			c.Assert(failureLastReportedError, chk.NotNil)
		}
		// should return EOF
		n, err = retryReader.Read(can)
		c.Assert(n, chk.Equals, 0)
		c.Assert(err, chk.Equals, io.EOF)
	}
}

// Test normal retry succeed, note initial response not provided.
// Tests both with and without notification of failures
func (s *aztestsSuite) TestRetryReaderWithRetryIoUnexpectedEOF(c *chk.C) {
	// Test twice, the second time using the optional "logging"/notification callback for failed tries
	// We must test both with and without the callback, since be testing without
	// we are testing that it is, indeed, optional to provide the callback
	for _, logThisRun := range []bool{false, true} {

		// Extra setup for testing notification of failures (i.e. of unsuccessful tries)
		failureMethodNumCalls := 0
		failureWillRetryCount := 0
		failureLastReportedFailureCount := -1
		var failureLastReportedError error = nil
		failureMethod := func(failureCount int, lastError error, offset int64, count int64, willRetry bool) {
			failureMethodNumCalls++
			if willRetry {
				failureWillRetryCount++
			}
			failureLastReportedFailureCount = failureCount
			failureLastReportedError = lastError
		}

		// Main test setup
		byteCount := 1
		body := newPerByteReader(byteCount)
		body.doInjectError = true
		body.doInjectErrorByteIndex = 0
		body.doInjectTimes = 1
		body.injectedError = io.ErrUnexpectedEOF

		getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
			r := http.Response{}
			body.currentByteIndex = int(info.Offset)
			r.Body = body

			return &r, nil
		}

		httpGetterInfo := HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}
		initResponse, err := getter(context.Background(), httpGetterInfo)
		c.Assert(err, chk.IsNil)

		rrOptions := RetryReaderOptions{MaxRetryRequests: 1}
		if logThisRun {
			rrOptions.NotifyFailedRead = failureMethod
		}
		retryReader := NewRetryReader(context.Background(), initResponse, httpGetterInfo, rrOptions, getter)

		// should fail and succeed through retry
		can := make([]byte, 1)
		n, err := retryReader.Read(can)
		c.Assert(n, chk.Equals, 1)
		c.Assert(err, chk.IsNil)

		// check "logging", if it was enabled
		if logThisRun {
			// We only expect one failed try in this test
			// And the notification method is not called for successes
			c.Assert(failureMethodNumCalls, chk.Equals, 1)           // this is the number of calls we counted
			c.Assert(failureWillRetryCount, chk.Equals, 1)           // the sole failure was retried
			c.Assert(failureLastReportedFailureCount, chk.Equals, 1) // this is the number of failures reported by the notification method
			c.Assert(failureLastReportedError, chk.NotNil)
		}
		// should return EOF
		n, err = retryReader.Read(can)
		c.Assert(n, chk.Equals, 0)
		c.Assert(err, chk.Equals, io.EOF)
	}
}

// Test normal retry fail as retry Count not enough.
func (s *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
	// Extra setup for testing notification of failures (i.e. of unsuccessful tries)
	failureMethodNumCalls := 0
	failureWillRetryCount := 0
	failureLastReportedFailureCount := -1
	var failureLastReportedError error = nil
	failureMethod := func(failureCount int, lastError error, offset int64, count int64, willRetry bool) {
		failureMethodNumCalls++
		if willRetry {
			failureWillRetryCount++
		}
		failureLastReportedFailureCount = failureCount
		failureLastReportedError = lastError
	}

	// Main test setup
	byteCount := 1
	body := newPerByteReader(byteCount)
	body.doInjectError = true
	body.doInjectErrorByteIndex = 0
	body.doInjectTimes = 2
	body.injectedError = &net.DNSError{IsTemporary: true}

	startResponse := http.Response{}
	startResponse.Body = body

	getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
		r := http.Response{}
		body.currentByteIndex = int(info.Offset)
		r.Body = body

		return &r, nil
	}

	rrOptions := RetryReaderOptions{
		MaxRetryRequests: 1,
		NotifyFailedRead: failureMethod}
	retryReader := NewRetryReader(context.Background(), &startResponse, HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}, rrOptions, getter)

	// should fail
	can := make([]byte, 1)
	n, err := retryReader.Read(can)
	c.Assert(n, chk.Equals, 0)
	c.Assert(err, chk.Equals, body.injectedError)

	// Check that we recieved the right notification callbacks
	// We only expect two failed tries in this test, but only one
	// of the would have had willRetry = true
	c.Assert(failureMethodNumCalls, chk.Equals, 2)           // this is the number of calls we counted
	c.Assert(failureWillRetryCount, chk.Equals, 1)           // only the first failure was retried
	c.Assert(failureLastReportedFailureCount, chk.Equals, 2) // this is the number of failures reported by the notification method
	c.Assert(failureLastReportedError, chk.NotNil)
}

// Test boundary case when Count equals to 0 and fail.
func (s *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
	byteCount := 1
	body := newPerByteReader(byteCount)
	body.doInjectError = true
	body.doInjectErrorByteIndex = 1
	body.doInjectTimes = 1
	body.injectedError = &net.DNSError{IsTemporary: true}

	startResponse := http.Response{}
	startResponse.Body = body

	getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
		r := http.Response{}
		body.currentByteIndex = int(info.Offset)
		r.Body = body

		return &r, nil
	}

	retryReader := NewRetryReader(context.Background(), &startResponse, HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}, RetryReaderOptions{MaxRetryRequests: 1}, getter)

	// should consume the only byte
	can := make([]byte, 1)
	n, err := retryReader.Read(can)
	c.Assert(n, chk.Equals, 1)
	c.Assert(err, chk.IsNil)

	// should not read when Count=0, and should return EOF
	n, err = retryReader.Read(can)
	c.Assert(n, chk.Equals, 0)
	c.Assert(err, chk.Equals, io.EOF)
}

func (s *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
	byteCount := 1
	body := newPerByteReader(byteCount)
	body.doInjectError = true
	body.doInjectErrorByteIndex = 0
	body.doInjectTimes = 1
	body.injectedError = fmt.Errorf("not retriable error")

	startResponse := http.Response{}
	startResponse.Body = body

	getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
		r := http.Response{}
		body.currentByteIndex = int(info.Offset)
		r.Body = body

		return &r, nil
	}

	retryReader := NewRetryReader(context.Background(), &startResponse, HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}, RetryReaderOptions{MaxRetryRequests: 2}, getter)

	dest := make([]byte, 1)
	_, err := retryReader.Read(dest)
	c.Assert(err, chk.Equals, body.injectedError)
}

// Test the case where we programmatically force a retry to happen, via closing the body early from another goroutine
// Unlike the retries orchestrated elsewhere in this test file, which simulate network failures for the
// purposes of unit testing, here we are testing the cancellation mechanism that is exposed to
// consumers of the API, to allow programmatic forcing of retries (e.g. if the consumer deems
// the read to be taking too long, they may force a retry in the hope of better performance next time).
func (s *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {

	for _, enableRetryOnEarlyClose := range []bool{false, true} {

		// use the notification callback, so we know that the retry really did happen
		failureMethodNumCalls := 0
		failureMethod := func(failureCount int, lastError error, offset int64, count int64, willRetry bool) {
			failureMethodNumCalls++
		}

		// Main test setup
		byteCount := 10 // so multiple passes through read loop will be required
		sleepDuration := 100 * time.Millisecond
		randBytes := make([]byte, byteCount)
		_, _ = rand.Read(randBytes)
		getter := func(ctx context.Context, info HTTPGetterInfo) (*http.Response, error) {
			body := newSingleUsePerByteReader(randBytes) // make new one every time, since we force closes in this test, and its unusable after a close
			body.sleepDuration = sleepDuration
			r := http.Response{}
			body.currentByteIndex = int(info.Offset)
			r.Body = body

			return &r, nil
		}

		httpGetterInfo := HTTPGetterInfo{Offset: 0, Count: int64(byteCount)}
		initResponse, err := getter(context.Background(), httpGetterInfo)
		c.Assert(err, chk.IsNil)

		rrOptions := RetryReaderOptions{MaxRetryRequests: 2, TreatEarlyCloseAsError: !enableRetryOnEarlyClose}
		rrOptions.NotifyFailedRead = failureMethod
		retryReader := NewRetryReader(context.Background(), initResponse, httpGetterInfo, rrOptions, getter)

		// set up timed cancellation from separate goroutine
		go func() {
			time.Sleep(sleepDuration * 5)
			retryReader.Close()
		}()

		// do the read (should fail, due to forced cancellation, and succeed through retry)
		output := make([]byte, byteCount)
		n, err := io.ReadFull(retryReader, output)
		if enableRetryOnEarlyClose {
			c.Assert(n, chk.Equals, byteCount)
			c.Assert(err, chk.IsNil)
			c.Assert(output, chk.DeepEquals, randBytes)
			c.Assert(failureMethodNumCalls, chk.Equals, 1) // assert that the cancellation did indeed happen
		} else {
			c.Assert(err, chk.NotNil)
		}
	}
}

// End testings for RetryReader