File: middleware.go

package info (click to toggle)
golang-github-aws-aws-sdk-go-v2 1.24.1-2~bpo12%2B1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-backports
  • size: 554,032 kB
  • sloc: java: 15,941; makefile: 419; sh: 175
file content (71 lines) | stat: -rw-r--r-- 2,039 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package eventstreamapi

import (
	"context"
	"fmt"
	"github.com/aws/smithy-go/middleware"
	smithyhttp "github.com/aws/smithy-go/transport/http"
	"io"
)

type eventStreamWriterKey struct{}

// GetInputStreamWriter returns EventTypeHeader io.PipeWriter used for the operation's input event stream.
func GetInputStreamWriter(ctx context.Context) io.WriteCloser {
	writeCloser, _ := middleware.GetStackValue(ctx, eventStreamWriterKey{}).(io.WriteCloser)
	return writeCloser
}

func setInputStreamWriter(ctx context.Context, writeCloser io.WriteCloser) context.Context {
	return middleware.WithStackValue(ctx, eventStreamWriterKey{}, writeCloser)
}

// InitializeStreamWriter is a Finalize middleware initializes an in-memory pipe for sending event stream messages
// via the HTTP request body.
type InitializeStreamWriter struct{}

// AddInitializeStreamWriter adds the InitializeStreamWriter middleware to the provided stack.
func AddInitializeStreamWriter(stack *middleware.Stack) error {
	return stack.Finalize.Add(&InitializeStreamWriter{}, middleware.After)
}

// ID returns the identifier for the middleware.
func (i *InitializeStreamWriter) ID() string {
	return "InitializeStreamWriter"
}

// HandleFinalize is the middleware implementation.
func (i *InitializeStreamWriter) HandleFinalize(
	ctx context.Context, in middleware.FinalizeInput, next middleware.FinalizeHandler,
) (
	out middleware.FinalizeOutput, metadata middleware.Metadata, err error,
) {
	request, ok := in.Request.(*smithyhttp.Request)
	if !ok {
		return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request)
	}

	inputReader, inputWriter := io.Pipe()
	defer func() {
		if err == nil {
			return
		}
		_ = inputReader.Close()
		_ = inputWriter.Close()
	}()

	request, err = request.SetStream(inputReader)
	if err != nil {
		return out, metadata, err
	}
	in.Request = request

	ctx = setInputStreamWriter(ctx, inputWriter)

	out, metadata, err = next.HandleFinalize(ctx, in)
	if err != nil {
		return out, metadata, err
	}

	return out, metadata, err
}