1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
class TestStreamDecoder < Test::Unit::TestCase
include Helper::Buildable
class Listener < Arrow::StreamListener
type_register
attr_reader :events
def initialize
super
@events = []
end
private
def virtual_do_on_eos
@events << [:eos]
true
end
def virtual_do_on_record_batch_decoded(record_batch, metadata)
@events << [:record_batch_decoded, record_batch, metadata]
true
end
def virtual_do_on_schema_decoded(schema, filtered_schema)
@events << [:schema_decoded, schema, filtered_schema]
true
end
end
def setup
columns = {
"enabled": build_boolean_array([true, false, nil, true]),
}
@record_batch = build_record_batch(columns)
@schema = @record_batch.schema
@buffer = Arrow::ResizableBuffer.new(0)
output = Arrow::BufferOutputStream.new(@buffer)
stream_writer = Arrow::RecordBatchStreamWriter.new(output, @schema)
stream_writer.write_record_batch(@record_batch)
stream_writer.close
output.close
@listener = Listener.new
@decoder = Arrow::StreamDecoder.new(@listener)
end
def test_listener
assert_equal(@listener, @decoder.listener)
end
def test_consume_bytes
@buffer.data.to_s.each_byte do |byte|
@decoder.consume_bytes(GLib::Bytes.new(byte.chr))
end
assert_equal([
[:schema_decoded, @schema, @schema],
[:record_batch_decoded, @record_batch, nil],
[:eos],
],
@listener.events)
end
def test_consume_buffer
# We need to keep data that aren't processed yet.
data = []
@buffer.data.to_s.each_byte do |byte|
data << byte.chr
can_clear = (@decoder.next_required_size == 1)
@decoder.consume_buffer(Arrow::Buffer.new(data.last))
# We can release a reference for kept data after they are
# processed.
data.clear if can_clear
end
assert_equal([
[:schema_decoded, @schema, @schema],
[:record_batch_decoded, @record_batch, nil],
[:eos],
],
@listener.events)
end
def test_reset
@decoder.consume_bytes(@buffer.data.to_s[0, 10])
@decoder.reset
@decoder.consume_bytes(@buffer.data)
assert_equal([
[:schema_decoded, @schema, @schema],
[:record_batch_decoded, @record_batch, nil],
[:eos],
],
@listener.events)
end
def test_schema
assert_nil(@decoder.schema)
@decoder.consume_bytes(@buffer.data)
assert_equal(@schema, @decoder.schema)
end
def test_next_required_size
data = @buffer.data.to_s
loop do
next_required_size = @decoder.next_required_size
break if next_required_size.zero?
@decoder.consume_bytes(data[0, next_required_size])
data = data[next_required_size..-1]
end
assert_equal([
[:schema_decoded, @schema, @schema],
[:record_batch_decoded, @record_batch, nil],
[:eos],
],
@listener.events)
end
end
|