File: decoder.rb

package info (click to toggle)
ruby-aws-eventstream 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 108 kB
  • sloc: ruby: 283; makefile: 3
file content (220 lines) | stat: -rw-r--r-- 7,008 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# frozen_string_literal: true

require 'stringio'
require 'tempfile'
require 'zlib'

module Aws
  module EventStream

    # This class provides method for decoding binary inputs into
    # single or multiple messages (Aws::EventStream::Message).
    #
    # * {#decode} - decodes messages from an IO like object responds
    #   to #read that containing binary data, returning decoded
    #   Aws::EventStream::Message along the way or wrapped in an enumerator
    #
    # ## Examples
    #
    #   decoder = Aws::EventStream::Decoder.new
    #
    #   # decoding from IO
    #   decoder.decode(io) do |message|
    #     message.headers
    #     # => { ... }
    #     message.payload
    #     # => StringIO / Tempfile
    #   end
    #
    #   # alternatively
    #   message_pool = decoder.decode(io)
    #   message_pool.next
    #   # => Aws::EventStream::Message
    #
    # * {#decode_chunk} - decodes a single message from a chunk of data,
    #   returning message object followed by boolean(indicating eof status
    #   of data) in an array object
    #
    # ## Examples
    #
    #   # chunk containing exactly one message data
    #   message, chunk_eof = decoder.decode_chunk(chunk_str)
    #   message
    #   # => Aws::EventStream::Message
    #   chunk_eof
    #   # => true
    #
    #   # chunk containing a partial message
    #   message, chunk_eof = decoder.decode_chunk(chunk_str)
    #   message
    #   # => nil
    #   chunk_eof
    #   # => true
    #   # chunk data is saved at decoder's message_buffer
    #
    #   # chunk containing more that one data message
    #   message, chunk_eof = decoder.decode_chunk(chunk_str)
    #   message
    #   # => Aws::EventStream::Message
    #   chunk_eof
    #   # => false
    #   # extra chunk data is saved at message_buffer of the decoder
    #
    class Decoder

      include Enumerable

      ONE_MEGABYTE = 1024 * 1024
      private_constant :ONE_MEGABYTE

      # bytes of prelude part, including 4 bytes of
      # total message length, headers length and crc checksum of prelude
      PRELUDE_LENGTH = 12
      private_constant :PRELUDE_LENGTH

      # 4 bytes message crc checksum
      CRC32_LENGTH = 4
      private_constant :CRC32_LENGTH

      # @param [Hash] options The initialization options.
      # @option options [Boolean] :format (true) When `false` it
      #   disables user-friendly formatting for message header values
      #   including timestamp and uuid etc.
      def initialize(options = {})
        @format = options.fetch(:format, true)
        @message_buffer = ''
      end

      # Decodes messages from a binary stream
      #
      # @param [IO#read] io An IO-like object
      #   that responds to `#read`
      #
      # @yieldparam [Message] message
      # @return [Enumerable<Message>, nil] Returns a new Enumerable
      #   containing decoded messages if no block is given
      def decode(io, &block)
        raw_message = io.read
        decoded_message = decode_message(raw_message)
        return wrap_as_enumerator(decoded_message) unless block_given?
        # fetch message only
        raw_event, _eof = decoded_message
        block.call(raw_event)
      end

      # Decodes a single message from a chunk of string
      #
      # @param [String] chunk A chunk of string to be decoded,
      #   chunk can contain partial event message to multiple event messages
      #   When not provided, decode data from #message_buffer
      #
      # @return [Array<Message|nil, Boolean>] Returns single decoded message
      #   and boolean pair, the boolean flag indicates whether this chunk
      #   has been fully consumed, unused data is tracked at #message_buffer
      def decode_chunk(chunk = nil)
        @message_buffer = [@message_buffer, chunk].pack('a*a*') if chunk
        decode_message(@message_buffer)
      end

      private

      # exposed via object.send for testing
      attr_reader :message_buffer

      def wrap_as_enumerator(decoded_message)
        Enumerator.new do |yielder|
          yielder << decoded_message
        end
      end

      def decode_message(raw_message)
        # incomplete message prelude received
        return [nil, true] if raw_message.bytesize < PRELUDE_LENGTH

        prelude, content = raw_message.unpack("a#{PRELUDE_LENGTH}a*")

        # decode prelude
        total_length, header_length = decode_prelude(prelude)

        # incomplete message received, leave it in the buffer
        return [nil, true] if raw_message.bytesize < total_length

        content, checksum, remaining = content.unpack("a#{total_length - PRELUDE_LENGTH - CRC32_LENGTH}Na*")
        unless Zlib.crc32([prelude, content].pack('a*a*')) == checksum
          raise Errors::MessageChecksumError
        end

        # decode headers and payload
        headers, payload = decode_context(content, header_length)

        @message_buffer = remaining

        [Message.new(headers: headers, payload: payload), remaining.empty?]
      end

      def decode_prelude(prelude)
        # prelude contains length of message and headers,
        # followed with CRC checksum of itself
        content, checksum = prelude.unpack("a#{PRELUDE_LENGTH - CRC32_LENGTH}N")
        raise Errors::PreludeChecksumError unless Zlib.crc32(content) == checksum
        content.unpack('N*')
      end

      def decode_context(content, header_length)
        encoded_header, encoded_payload = content.unpack("a#{header_length}a*")
        [
          extract_headers(encoded_header),
          extract_payload(encoded_payload)
        ]
      end

      def extract_headers(buffer)
        scanner = buffer
        headers = {}
        until scanner.bytesize == 0
          # header key
          key_length, scanner = scanner.unpack('Ca*')
          key, scanner = scanner.unpack("a#{key_length}a*")

          # header value
          type_index, scanner = scanner.unpack('Ca*')
          value_type = Types.types[type_index]
          unpack_pattern, value_length = Types.pattern[value_type]
          value = if !!unpack_pattern == unpack_pattern
            # boolean types won't have value specified
            unpack_pattern
          else
            value_length, scanner = scanner.unpack('S>a*') unless value_length
            unpacked_value, scanner = scanner.unpack("#{unpack_pattern || "a#{value_length}"}a*")
            unpacked_value
          end

          headers[key] = HeaderValue.new(
            format: @format,
            value: value,
            type: value_type
          )
        end
        headers
      end

      def extract_payload(encoded)
        encoded.bytesize <= ONE_MEGABYTE ?
          payload_stringio(encoded) :
          payload_tempfile(encoded)
      end

      def payload_stringio(encoded)
        StringIO.new(encoded)
      end

      def payload_tempfile(encoded)
        payload = Tempfile.new
        payload.binmode
        payload.write(encoded)
        payload.rewind
        payload
      end
    end
  end
end