1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
//go:build codegen
// +build codegen
package api
import "text/template"
var eventStreamShapeReaderTmpl = template.Must(template.New("eventStreamShapeReaderTmpl").
Funcs(template.FuncMap{}).
Parse(`
{{- $es := $.EventStream }}
// {{ $es.StreamReaderAPIName }} provides the interface for reading to the stream. The
// default implementation for this interface will be {{ $.ShapeName }}.
//
// The reader's Close method must allow multiple concurrent calls.
//
// These events are:
// {{ range $_, $event := $es.Events }}
// * {{ $event.Shape.ShapeName }}
{{- end }}
// * {{ $es.StreamUnknownEventName }}
type {{ $es.StreamReaderAPIName }} interface {
// Returns a channel of events as they are read from the event stream.
Events() <-chan {{ $es.EventGroupName }}
// Close will stop the reader reading events from the stream.
Close() error
// Returns any error that has occurred while reading from the event stream.
Err() error
}
type {{ $es.StreamReaderImplName }} struct {
eventReader *eventstreamapi.EventReader
stream chan {{ $es.EventGroupName }}
err *eventstreamapi.OnceError
done chan struct{}
closeOnce sync.Once
}
func {{ $es.StreamReaderImplConstructorName }}(eventReader *eventstreamapi.EventReader) *{{ $es.StreamReaderImplName }} {
r := &{{ $es.StreamReaderImplName }}{
eventReader: eventReader,
stream: make(chan {{ $es.EventGroupName }}),
done: make(chan struct{}),
err: eventstreamapi.NewOnceError(),
}
go r.readEventStream()
return r
}
// Close will close the underlying event stream reader.
func (r *{{ $es.StreamReaderImplName }}) Close() error {
r.closeOnce.Do(r.safeClose)
return r.Err()
}
func (r *{{ $es.StreamReaderImplName }}) ErrorSet() <-chan struct{} {
return r.err.ErrorSet()
}
func (r *{{ $es.StreamReaderImplName }}) Closed() <-chan struct{} {
return r.done
}
func (r *{{ $es.StreamReaderImplName }}) safeClose() {
close(r.done)
}
func (r *{{ $es.StreamReaderImplName }}) Err() error {
return r.err.Err()
}
func (r *{{ $es.StreamReaderImplName }}) Events() <-chan {{ $es.EventGroupName }} {
return r.stream
}
func (r *{{ $es.StreamReaderImplName }}) readEventStream() {
defer r.Close()
defer close(r.stream)
for {
event, err := r.eventReader.ReadEvent()
if err != nil {
if err == io.EOF {
return
}
select {
case <-r.done:
// If closed already ignore the error
return
default:
}
if _, ok := err.(*eventstreamapi.UnknownMessageTypeError); ok {
continue
}
r.err.SetError(err)
return
}
select {
case r.stream <- event.({{ $es.EventGroupName }}):
case <-r.done:
return
}
}
}
type {{ $es.StreamUnmarshalerForEventName }} struct {
metadata protocol.ResponseMetadata
}
func (u {{ $es.StreamUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) {
switch eventType {
{{- range $_, $event := $es.Events }}
case {{ printf "%q" $event.Name }}:
return &{{ $event.Shape.ShapeName }}{}, nil
{{- end }}
{{- range $_, $event := $es.Exceptions }}
case {{ printf "%q" $event.Name }}:
return newError{{ $event.Shape.ShapeName }}(u.metadata).(eventstreamapi.Unmarshaler), nil
{{- end }}
default:
return &{{ $es.StreamUnknownEventName }}{Type: eventType}, nil
}
}
// {{ $es.StreamUnknownEventName }} provides a failsafe event for the
// {{ $es.Name }} group of events when an unknown event is received.
type {{ $es.StreamUnknownEventName }} struct {
Type string
Message eventstream.Message
}
// The {{ $es.StreamUnknownEventName }} is and event in the {{ $es.Name }}
// group of events.
func (s *{{ $es.StreamUnknownEventName }}) event{{ $es.Name }}() {}
// MarshalEvent marshals the type into an stream event value. This method
// should only used internally within the SDK's EventStream handling.
func (e *{{ $es.StreamUnknownEventName }}) MarshalEvent(pm protocol.PayloadMarshaler) (
msg eventstream.Message, err error,
) {
return e.Message.Clone(), nil
}
// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value.
// This method is only used internally within the SDK's EventStream handling.
func (e *{{ $es.StreamUnknownEventName }}) UnmarshalEvent(
payloadUnmarshaler protocol.PayloadUnmarshaler,
msg eventstream.Message,
) error {
e.Message = msg.Clone()
return nil
}
`))
|