1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"encoding/binary"
"errors"
"io"
"math"
"reflect"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
)
// masks for grabbing the trailing bits based on the number of trailing bits desired
var trailingMask [64]uint64
func init() {
// generate the masks at init so we don't have to hard code them.
for i := 0; i < 64; i++ {
trailingMask[i] = (math.MaxUint64 >> (64 - i))
}
}
// trailingBits returns a value constructed from the bits trailing bits of
// the value v that is passed in. If bits >= 64, then we just return v.
func trailingBits(v uint64, bits uint) uint64 {
if bits >= 64 {
return v
}
return v & trailingMask[bits]
}
// reader is a useful interface to define the functionality we need for implementation
type reader interface {
io.Reader
io.ReaderAt
io.Seeker
}
// default buffer length
const buflen = 1024
// BitReader implements functionality for reading bits or bytes buffering up to a uint64
// at a time from the reader in order to improve efficiency. It also provides
// methods to read multiple bytes in one read such as encoded ints/values.
//
// This BitReader is the basis for the other utility classes like RLE decoding
// and such, providing the necessary functions for interpreting the values.
type BitReader struct {
reader reader
buffer uint64
byteoffset int64
bitoffset uint
raw [8]byte
unpackBuf [buflen]uint32
}
// NewBitReader takes in a reader that implements io.Reader, io.ReaderAt and io.Seeker
// interfaces and returns a BitReader for use with various bit level manipulations.
func NewBitReader(r reader) *BitReader {
return &BitReader{reader: r}
}
// CurOffset returns the current Byte offset into the data that the reader is at.
func (b *BitReader) CurOffset() int64 {
return b.byteoffset + bitutil.BytesForBits(int64(b.bitoffset))
}
// Reset allows reusing a BitReader by setting a new reader and resetting the internal
// state back to zeros.
func (b *BitReader) Reset(r reader) {
b.reader = r
b.buffer = 0
b.byteoffset = 0
b.bitoffset = 0
}
// GetVlqInt reads a Vlq encoded int from the stream. The encoded value must start
// at the beginning of a byte and this returns false if there weren't enough bytes
// in the buffer or reader. This will call `ReadByte` which in turn retrieves byte
// aligned values from the reader
func (b *BitReader) GetVlqInt() (uint64, bool) {
tmp, err := binary.ReadUvarint(b)
if err != nil {
return 0, false
}
return tmp, true
}
// GetZigZagVlqInt reads a zigzag encoded integer, returning false if there weren't
// enough bytes remaining.
func (b *BitReader) GetZigZagVlqInt() (int64, bool) {
u, ok := b.GetVlqInt()
if !ok {
return 0, false
}
return int64(u>>1) ^ -int64(u&1), true
}
// ReadByte reads a single aligned byte from the underlying stream, or populating
// error if there aren't enough bytes left.
func (b *BitReader) ReadByte() (byte, error) {
var tmp byte
if ok := b.GetAligned(1, &tmp); !ok {
return 0, errors.New("failed to read byte")
}
return tmp, nil
}
// GetAligned reads nbytes from the underlying stream into the passed interface value.
// Returning false if there aren't enough bytes remaining in the stream or if an invalid
// type is passed. The bytes are read aligned to byte boundaries.
//
// v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32, *uint64).
// encoded values are assumed to be little endian.
func (b *BitReader) GetAligned(nbytes int, v interface{}) bool {
// figure out the number of bytes to represent v
typBytes := int(reflect.TypeOf(v).Elem().Size())
if nbytes > typBytes {
return false
}
bread := bitutil.BytesForBits(int64(b.bitoffset))
b.byteoffset += bread
n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
if err != nil && err != io.EOF {
return false
}
if n != nbytes {
return false
}
// zero pad the bytes
memory.Set(b.raw[n:typBytes], 0)
switch v := v.(type) {
case *byte:
*v = b.raw[0]
case *uint64:
*v = binary.LittleEndian.Uint64(b.raw[:typBytes])
case *uint32:
*v = binary.LittleEndian.Uint32(b.raw[:typBytes])
case *uint16:
*v = binary.LittleEndian.Uint16(b.raw[:typBytes])
default:
return false
}
b.byteoffset += int64(nbytes)
b.bitoffset = 0
b.fillbuffer()
return true
}
// fillbuffer fills the uint64 buffer with bytes from the underlying stream
func (b *BitReader) fillbuffer() error {
n, err := b.reader.ReadAt(b.raw[:], b.byteoffset)
if err != nil && n == 0 && err != io.EOF {
return err
}
for i := n; i < 8; i++ {
b.raw[i] = 0
}
b.buffer = binary.LittleEndian.Uint64(b.raw[:])
return nil
}
// next reads an integral value from the next bits in the buffer
func (b *BitReader) next(bits uint) (v uint64, err error) {
v = trailingBits(b.buffer, b.bitoffset+bits) >> b.bitoffset
b.bitoffset += bits
// if we need more bits to get what was requested then refill the buffer
if b.bitoffset >= 64 {
b.byteoffset += 8
b.bitoffset -= 64
if err = b.fillbuffer(); err != nil {
return 0, err
}
v |= trailingBits(b.buffer, b.bitoffset) << (bits - b.bitoffset)
}
return
}
// GetBatchIndex is like GetBatch but for IndexType (used for dictionary decoding)
func (b *BitReader) GetBatchIndex(bits uint, out []IndexType) (i int, err error) {
// IndexType is a 32-bit value so bits must be less than 32 when unpacking
// values using the bitreader.
if bits > 32 {
return 0, errors.New("must be 32 bits or less per read")
}
var val uint64
length := len(out)
// if we aren't currently byte-aligned, read bits until we are byte-aligned.
for ; i < length && b.bitoffset != 0; i++ {
val, err = b.next(bits)
out[i] = IndexType(val)
if err != nil {
return
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
// grab as many 32 byte chunks as possible in one shot
if i < length { // IndexType should be a 32 bit value so we can do quick unpacking right into the output
numUnpacked := unpack32(b.reader, (*(*[]uint32)(unsafe.Pointer(&out)))[i:], int(bits))
i += numUnpacked
b.byteoffset += int64(numUnpacked * int(bits) / 8)
}
// re-fill our buffer just in case.
b.fillbuffer()
// grab the remaining values that aren't 32 byte aligned
for ; i < length; i++ {
val, err = b.next(bits)
out[i] = IndexType(val)
if err != nil {
break
}
}
return
}
// GetBatchBools is like GetBatch but optimized for reading bits as boolean values
func (b *BitReader) GetBatchBools(out []bool) (int, error) {
bits := uint(1)
length := len(out)
i := 0
// read until we are byte-aligned
for ; i < length && b.bitoffset != 0; i++ {
val, err := b.next(bits)
out[i] = val != 0
if err != nil {
return i, err
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
buf := arrow.Uint32Traits.CastToBytes(b.unpackBuf[:])
blen := buflen * 8
for i < length {
// grab byte-aligned bits in a loop since it's more efficient than going
// bit by bit when you can grab 8 bools at a time.
unpackSize := utils.Min(blen, length-i) / 8 * 8
n, err := b.reader.Read(buf[:bitutil.BytesForBits(int64(unpackSize))])
if err != nil {
return i, err
}
BytesToBools(buf[:n], out[i:])
i += unpackSize
b.byteoffset += int64(n)
}
b.fillbuffer()
// grab the trailing bits
for ; i < length; i++ {
val, err := b.next(bits)
out[i] = val != 0
if err != nil {
return i, err
}
}
return i, nil
}
func (b *BitReader) Discard(bits uint, n int) (int, error) {
if bits > 64 {
return 0, errors.New("must be 64 bits or less per read")
}
i := 0
for ; i < n && b.bitoffset != 0; i++ {
if _, err := b.next(bits); err != nil {
return i, err
}
}
if n-i > 32 {
toSkip := (n - i) / 32 * 32
bytesToSkip := bitutil.BytesForBits(int64(toSkip * int(bits)))
b.byteoffset += int64(bytesToSkip)
i += toSkip
}
b.fillbuffer()
for ; i < n; i++ {
if _, err := b.next(bits); err != nil {
return i, err
}
}
return n, nil
}
// GetBatch fills out by decoding values repeated from the stream that are encoded
// using bits as the number of bits per value. The values are expected to be bit packed
// so we will unpack the values to populate.
func (b *BitReader) GetBatch(bits uint, out []uint64) (int, error) {
// since we're unpacking into uint64 values, we can't support bits being
// larger than 64 here as that's the largest size value we're reading
if bits > 64 {
return 0, errors.New("must be 64 bits or less per read")
}
length := len(out)
i := 0
// read until we are byte aligned
for ; i < length && b.bitoffset != 0; i++ {
val, err := b.next(bits)
out[i] = val
if err != nil {
return i, err
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
for i < length {
// unpack groups of 32 bytes at a time into a buffer since it's more efficient
unpackSize := utils.Min(buflen, length-i)
numUnpacked := unpack32(b.reader, b.unpackBuf[:unpackSize], int(bits))
if numUnpacked == 0 {
break
}
for k := 0; k < numUnpacked; k++ {
out[i+k] = uint64(b.unpackBuf[k])
}
i += numUnpacked
b.byteoffset += int64(numUnpacked * int(bits) / 8)
}
b.fillbuffer()
// and then the remaining trailing values
for ; i < length; i++ {
val, err := b.next(bits)
out[i] = val
if err != nil {
return i, err
}
}
return i, nil
}
// GetValue returns a single value that is bit packed using width as the number of bits
// and returns false if there weren't enough bits remaining.
func (b *BitReader) GetValue(width int) (uint64, bool) {
v := make([]uint64, 1)
n, _ := b.GetBatch(uint(width), v)
return v[0], n == 1
}
|