File: eventstream_tmpl_reader.go

package info (click to toggle)
golang-github-aws-aws-sdk-go 1.49.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 312,636 kB
  • sloc: makefile: 120
file content (159 lines) | stat: -rw-r--r-- 4,356 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//go:build codegen
// +build codegen

package api

import "text/template"

var eventStreamShapeReaderTmpl = template.Must(template.New("eventStreamShapeReaderTmpl").
	Funcs(template.FuncMap{}).
	Parse(`
{{- $es := $.EventStream }}

// {{ $es.StreamReaderAPIName }} provides the interface for reading to the stream. The
// default implementation for this interface will be {{ $.ShapeName }}.
//
// The reader's Close method must allow multiple concurrent calls.
//
// These events are:
// {{ range $_, $event := $es.Events }}
//     * {{ $event.Shape.ShapeName }}
{{- end }}
//     * {{ $es.StreamUnknownEventName }}
type {{ $es.StreamReaderAPIName }} interface {
	// Returns a channel of events as they are read from the event stream.
	Events() <-chan {{ $es.EventGroupName }}

	// Close will stop the reader reading events from the stream.
	Close() error

	// Returns any error that has occurred while reading from the event stream.
	Err() error
}

type {{ $es.StreamReaderImplName }} struct {
	eventReader *eventstreamapi.EventReader
	stream      chan {{ $es.EventGroupName }}
	err         *eventstreamapi.OnceError

	done      chan struct{}
	closeOnce sync.Once
}

func {{ $es.StreamReaderImplConstructorName }}(eventReader *eventstreamapi.EventReader) *{{ $es.StreamReaderImplName }} {
	r := &{{ $es.StreamReaderImplName }}{
		eventReader: eventReader,
		stream: make(chan {{ $es.EventGroupName }}),
		done:   make(chan struct{}),
		err:    eventstreamapi.NewOnceError(),
	}
	go r.readEventStream()

	return r
}

// Close will close the underlying event stream reader.
func (r *{{ $es.StreamReaderImplName }}) Close() error {
	r.closeOnce.Do(r.safeClose)
	return r.Err()
}

func (r *{{ $es.StreamReaderImplName }}) ErrorSet() <-chan struct{} {
	return r.err.ErrorSet()
}

func (r *{{ $es.StreamReaderImplName }}) Closed() <-chan struct{} {
	return r.done
}

func (r *{{ $es.StreamReaderImplName }}) safeClose() {
	close(r.done)
}

func (r *{{ $es.StreamReaderImplName }}) Err() error {
	return r.err.Err()
}

func (r *{{ $es.StreamReaderImplName }}) Events() <-chan {{ $es.EventGroupName }} {
	return r.stream
}

func (r *{{ $es.StreamReaderImplName }}) readEventStream() {
	defer r.Close()
	defer close(r.stream)

	for {
		event, err := r.eventReader.ReadEvent()
		if err != nil {
			if err == io.EOF {
				return
			}
			select {
			case <-r.done:
				// If closed already ignore the error
				return
			default:
			}
			if _, ok := err.(*eventstreamapi.UnknownMessageTypeError); ok {
				continue
			}
			r.err.SetError(err)
			return
		}

		select {
		case r.stream <- event.({{ $es.EventGroupName }}):
		case <-r.done:
			return
		}
	}
}

type {{ $es.StreamUnmarshalerForEventName }} struct {
	metadata protocol.ResponseMetadata
}

func (u {{ $es.StreamUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) {
	switch eventType {
		{{- range $_, $event := $es.Events }}
			case {{ printf "%q" $event.Name }}:
				return &{{ $event.Shape.ShapeName }}{}, nil
		{{- end }}
		{{- range $_, $event := $es.Exceptions }}
			case {{ printf "%q" $event.Name }}:
				return newError{{ $event.Shape.ShapeName }}(u.metadata).(eventstreamapi.Unmarshaler), nil
		{{- end }}
	default:
		return &{{ $es.StreamUnknownEventName }}{Type: eventType}, nil
	}
}

// {{ $es.StreamUnknownEventName }} provides a failsafe event for the 
// {{ $es.Name }} group of events when an unknown event is received.
type {{ $es.StreamUnknownEventName }} struct {
	Type string
	Message eventstream.Message
}

// The {{ $es.StreamUnknownEventName }} is and event in the {{ $es.Name }}
// group of events.
func (s *{{ $es.StreamUnknownEventName }}) event{{ $es.Name }}() {}

// MarshalEvent marshals the type into an stream event value. This method
// should only used internally within the SDK's EventStream handling.
func (e *{{ $es.StreamUnknownEventName }}) MarshalEvent(pm protocol.PayloadMarshaler) (
	msg eventstream.Message, err error,
) {
	return e.Message.Clone(), nil
}

// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value.
// This method is only used internally within the SDK's EventStream handling.
func (e *{{ $es.StreamUnknownEventName }}) UnmarshalEvent(
	payloadUnmarshaler protocol.PayloadUnmarshaler,
	msg eventstream.Message,
) error {
	e.Message = msg.Clone()
	return nil
}
`))