File: timeout_read_closer.go

package info (click to toggle)
golang-github-aws-aws-sdk-go-v2 1.30.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 662,428 kB
  • sloc: java: 16,875; makefile: 432; sh: 175
file content (104 lines) | stat: -rw-r--r-- 2,797 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package http

import (
	"context"
	"fmt"
	"io"
	"time"

	"github.com/aws/smithy-go"
	"github.com/aws/smithy-go/middleware"
	smithyhttp "github.com/aws/smithy-go/transport/http"
)

type readResult struct {
	n   int
	err error
}

// ResponseTimeoutError is an error when the reads from the response are
// delayed longer than the timeout the read was configured for.
type ResponseTimeoutError struct {
	TimeoutDur time.Duration
}

// Timeout returns that the error is was caused by a timeout, and can be
// retried.
func (*ResponseTimeoutError) Timeout() bool { return true }

func (e *ResponseTimeoutError) Error() string {
	return fmt.Sprintf("read on body reach timeout limit, %v", e.TimeoutDur)
}

// timeoutReadCloser will handle body reads that take too long.
// We will return a ErrReadTimeout error if a timeout occurs.
type timeoutReadCloser struct {
	reader   io.ReadCloser
	duration time.Duration
}

// Read will spin off a goroutine to call the reader's Read method. We will
// select on the timer's channel or the read's channel. Whoever completes first
// will be returned.
func (r *timeoutReadCloser) Read(b []byte) (int, error) {
	timer := time.NewTimer(r.duration)
	c := make(chan readResult, 1)

	go func() {
		n, err := r.reader.Read(b)
		timer.Stop()
		c <- readResult{n: n, err: err}
	}()

	select {
	case data := <-c:
		return data.n, data.err
	case <-timer.C:
		return 0, &ResponseTimeoutError{TimeoutDur: r.duration}
	}
}

func (r *timeoutReadCloser) Close() error {
	return r.reader.Close()
}

// AddResponseReadTimeoutMiddleware adds a middleware to the stack that wraps the
// response body so that a read that takes too long will return an error.
func AddResponseReadTimeoutMiddleware(stack *middleware.Stack, duration time.Duration) error {
	return stack.Deserialize.Add(&readTimeout{duration: duration}, middleware.After)
}

// readTimeout wraps the response body with a timeoutReadCloser
type readTimeout struct {
	duration time.Duration
}

// ID returns the id of the middleware
func (*readTimeout) ID() string {
	return "ReadResponseTimeout"
}

// HandleDeserialize implements the DeserializeMiddleware interface
func (m *readTimeout) HandleDeserialize(
	ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler,
) (
	out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
) {
	out, metadata, err = next.HandleDeserialize(ctx, in)
	if err != nil {
		return out, metadata, err
	}

	response, ok := out.RawResponse.(*smithyhttp.Response)
	if !ok {
		return out, metadata, &smithy.DeserializationError{Err: fmt.Errorf("unknown transport type %T", out.RawResponse)}
	}

	response.Body = &timeoutReadCloser{
		reader:   response.Body,
		duration: m.duration,
	}
	out.RawResponse = response

	return out, metadata, err
}