1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
# frozen_string_literal: true
require 'stringio'
require 'tempfile'
require 'zlib'
module Aws
module EventStream
# This class provides method for decoding binary inputs into
# single or multiple messages (Aws::EventStream::Message).
#
# * {#decode} - decodes messages from an IO like object responds
# to #read that containing binary data, returning decoded
# Aws::EventStream::Message along the way or wrapped in an enumerator
#
# ## Examples
#
# decoder = Aws::EventStream::Decoder.new
#
# # decoding from IO
# decoder.decode(io) do |message|
# message.headers
# # => { ... }
# message.payload
# # => StringIO / Tempfile
# end
#
# # alternatively
# message_pool = decoder.decode(io)
# message_pool.next
# # => Aws::EventStream::Message
#
# * {#decode_chunk} - decodes a single message from a chunk of data,
# returning message object followed by boolean(indicating eof status
# of data) in an array object
#
# ## Examples
#
# # chunk containing exactly one message data
# message, chunk_eof = decoder.decode_chunk(chunk_str)
# message
# # => Aws::EventStream::Message
# chunk_eof
# # => true
#
# # chunk containing a partial message
# message, chunk_eof = decoder.decode_chunk(chunk_str)
# message
# # => nil
# chunk_eof
# # => true
# # chunk data is saved at decoder's message_buffer
#
# # chunk containing more that one data message
# message, chunk_eof = decoder.decode_chunk(chunk_str)
# message
# # => Aws::EventStream::Message
# chunk_eof
# # => false
# # extra chunk data is saved at message_buffer of the decoder
#
class Decoder
include Enumerable
ONE_MEGABYTE = 1024 * 1024
private_constant :ONE_MEGABYTE
# bytes of prelude part, including 4 bytes of
# total message length, headers length and crc checksum of prelude
PRELUDE_LENGTH = 12
private_constant :PRELUDE_LENGTH
# 4 bytes message crc checksum
CRC32_LENGTH = 4
private_constant :CRC32_LENGTH
# @param [Hash] options The initialization options.
# @option options [Boolean] :format (true) When `false` it
# disables user-friendly formatting for message header values
# including timestamp and uuid etc.
def initialize(options = {})
@format = options.fetch(:format, true)
@message_buffer = ''
end
# Decodes messages from a binary stream
#
# @param [IO#read] io An IO-like object
# that responds to `#read`
#
# @yieldparam [Message] message
# @return [Enumerable<Message>, nil] Returns a new Enumerable
# containing decoded messages if no block is given
def decode(io, &block)
raw_message = io.read
decoded_message = decode_message(raw_message)
return wrap_as_enumerator(decoded_message) unless block_given?
# fetch message only
raw_event, _eof = decoded_message
block.call(raw_event)
end
# Decodes a single message from a chunk of string
#
# @param [String] chunk A chunk of string to be decoded,
# chunk can contain partial event message to multiple event messages
# When not provided, decode data from #message_buffer
#
# @return [Array<Message|nil, Boolean>] Returns single decoded message
# and boolean pair, the boolean flag indicates whether this chunk
# has been fully consumed, unused data is tracked at #message_buffer
def decode_chunk(chunk = nil)
@message_buffer = [@message_buffer, chunk].pack('a*a*') if chunk
decode_message(@message_buffer)
end
private
# exposed via object.send for testing
attr_reader :message_buffer
def wrap_as_enumerator(decoded_message)
Enumerator.new do |yielder|
yielder << decoded_message
end
end
def decode_message(raw_message)
# incomplete message prelude received
return [nil, true] if raw_message.bytesize < PRELUDE_LENGTH
prelude, content = raw_message.unpack("a#{PRELUDE_LENGTH}a*")
# decode prelude
total_length, header_length = decode_prelude(prelude)
# incomplete message received, leave it in the buffer
return [nil, true] if raw_message.bytesize < total_length
content, checksum, remaining = content.unpack("a#{total_length - PRELUDE_LENGTH - CRC32_LENGTH}Na*")
unless Zlib.crc32([prelude, content].pack('a*a*')) == checksum
raise Errors::MessageChecksumError
end
# decode headers and payload
headers, payload = decode_context(content, header_length)
@message_buffer = remaining
[Message.new(headers: headers, payload: payload), remaining.empty?]
end
def decode_prelude(prelude)
# prelude contains length of message and headers,
# followed with CRC checksum of itself
content, checksum = prelude.unpack("a#{PRELUDE_LENGTH - CRC32_LENGTH}N")
raise Errors::PreludeChecksumError unless Zlib.crc32(content) == checksum
content.unpack('N*')
end
def decode_context(content, header_length)
encoded_header, encoded_payload = content.unpack("a#{header_length}a*")
[
extract_headers(encoded_header),
extract_payload(encoded_payload)
]
end
def extract_headers(buffer)
scanner = buffer
headers = {}
until scanner.bytesize == 0
# header key
key_length, scanner = scanner.unpack('Ca*')
key, scanner = scanner.unpack("a#{key_length}a*")
# header value
type_index, scanner = scanner.unpack('Ca*')
value_type = Types.types[type_index]
unpack_pattern, value_length = Types.pattern[value_type]
value = if !!unpack_pattern == unpack_pattern
# boolean types won't have value specified
unpack_pattern
else
value_length, scanner = scanner.unpack('S>a*') unless value_length
unpacked_value, scanner = scanner.unpack("#{unpack_pattern || "a#{value_length}"}a*")
unpacked_value
end
headers[key] = HeaderValue.new(
format: @format,
value: value,
type: value_type
)
end
headers
end
def extract_payload(encoded)
encoded.bytesize <= ONE_MEGABYTE ?
payload_stringio(encoded) :
payload_tempfile(encoded)
end
def payload_stringio(encoded)
StringIO.new(encoded)
end
def payload_tempfile(encoded)
payload = Tempfile.new
payload.binmode
payload.write(encoded)
payload.rewind
payload
end
end
end
end
|