File: event_stream_decoder.rb

package info (click to toggle)
ruby-aws-sdk-core 3.104.3-3%2Bdeb11u2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,444 kB
  • sloc: ruby: 11,201; makefile: 4
file content (64 lines) | stat: -rw-r--r-- 2,072 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# frozen_string_literal: true

require 'aws-eventstream'

module Aws
  module Binary
    # @api private
    class EventStreamDecoder

      # @param [String] protocol
      # @param [ShapeRef] rules ShapeRef of the eventstream member
      # @param [ShapeRef] output_ref ShapeRef of output shape
      # @param [Array] error_refs array of ShapeRefs for errors
      # @param [EventStream|nil] event_stream_handler A Service EventStream object
      # that registered with callbacks for processing events when they arrive
      def initialize(protocol, rules, output_ref, error_refs, io, event_stream_handler = nil)
        @decoder = Aws::EventStream::Decoder.new
        @event_parser = EventParser.new(parser_class(protocol), rules, error_refs, output_ref)
        @stream_class = extract_stream_class(rules.shape.struct_class)
        @emitter = event_stream_handler.event_emitter
        @events = []
      end

      # @return [Array] events Array of arrived event objects
      attr_reader :events

      def write(chunk)
        raw_event, eof = @decoder.decode_chunk(chunk)
        emit_event(raw_event) if raw_event
        while !eof
          # exhaust message_buffer data
          raw_event, eof = @decoder.decode_chunk
          emit_event(raw_event) if raw_event
        end
      end

      private

      def emit_event(raw_event)
        event = @event_parser.apply(raw_event)
        @events << event
        @emitter.signal(event.event_type, event) unless @emitter.nil?
      end

      def parser_class(protocol)
        case protocol
        when 'rest-xml' then Aws::Xml::Parser
        when 'rest-json' then Aws::Json::Parser
        when 'json' then Aws::Json::Parser
        else raise "unsupported protocol #{protocol} for event stream"
        end
      end

      def extract_stream_class(type_class)
        parts = type_class.to_s.split('::')
        parts.inject(Kernel) do |const, part_name|
          part_name == 'Types' ? const.const_get('EventStreams')
            : const.const_get(part_name)
        end
      end
    end

  end
end