1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package file
import (
"fmt"
"math"
"math/bits"
"unsafe"
shared_utils "github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/bmi"
"github.com/apache/arrow-go/v18/parquet/internal/utils"
"github.com/apache/arrow-go/v18/parquet/schema"
"golang.org/x/xerrors"
)
type LevelInfo struct {
// How many slots an undefined but present (i.e. null) element in
// parquet consumes when decoding to Arrow.
// "Slot" is used in the same context as the Arrow specification
// (i.e. a value holder).
// This is only ever >1 for descendents of FixedSizeList.
NullSlotUsage int32
// The definition level at which the value for the field
// is considered not null (definition levels greater than
// or equal to this value indicate a not-null
// value for the field). For list fields definition levels
// greater than or equal to this field indicate a present,
// possibly null, child value.
DefLevel int16
// The repetition level corresponding to this element
// or the closest repeated ancestor. Any repetition
// level less than this indicates either a new list OR
// an empty list (which is determined in conjunction
// with definition levels).
RepLevel int16
// The definition level indicating the level at which the closest
// repeated ancestor is not empty. This is used to discriminate
// between a value less than |def_level| being null or excluded entirely.
// For instance if we have an arrow schema like:
// list(struct(f0: int)). Then then there are the following
// definition levels:
// 0 = null list
// 1 = present but empty list.
// 2 = a null value in the list
// 3 = a non null struct but null integer.
// 4 = a present integer.
// When reconstructing, the struct and integer arrays'
// repeated_ancestor_def_level would be 2. Any
// def_level < 2 indicates that there isn't a corresponding
// child value in the list.
// i.e. [null, [], [null], [{f0: null}], [{f0: 1}]]
// has the def levels [0, 1, 2, 3, 4]. The actual
// struct array is only of length 3: [not-set, set, set] and
// the int array is also of length 3: [N/A, null, 1].
RepeatedAncestorDefLevel int16
}
func (l *LevelInfo) Equal(rhs *LevelInfo) bool {
return l.NullSlotUsage == rhs.NullSlotUsage &&
l.DefLevel == rhs.DefLevel &&
l.RepLevel == rhs.RepLevel &&
l.RepeatedAncestorDefLevel == rhs.RepeatedAncestorDefLevel
}
func (l *LevelInfo) HasNullableValues() bool {
return l.RepeatedAncestorDefLevel < l.DefLevel
}
func (l *LevelInfo) IncrementOptional() {
l.DefLevel++
}
func (l *LevelInfo) IncrementRepeated() int16 {
lastRepAncestor := l.RepeatedAncestorDefLevel
// Repeated fields add both a repetition and definition level. This is used
// to distinguish between an empty list and a list with an item in it.
l.RepLevel++
l.DefLevel++
// For levels >= repeated_ancestor_def_level it indicates the list was
// non-null and had at least one element. This is important
// for later decoding because we need to add a slot for these
// values. for levels < current_def_level no slots are added
// to arrays.
l.RepeatedAncestorDefLevel = l.DefLevel
return lastRepAncestor
}
func (l *LevelInfo) Increment(n schema.Node) {
switch n.RepetitionType() {
case parquet.Repetitions.Repeated:
l.IncrementRepeated()
case parquet.Repetitions.Optional:
l.IncrementOptional()
}
}
// Input/Output structure for reconstructed validity bitmaps.
type ValidityBitmapInputOutput struct {
// Input only.
// The maximum number of values_read expected (actual
// values read must be less than or equal to this value).
// If this number is exceeded methods will throw a
// ParquetException. Exceeding this limit indicates
// either a corrupt or incorrectly written file.
ReadUpperBound int64
// Output only. The number of values added to the encountered
// (this is logically the count of the number of elements
// for an Arrow array).
Read int64
// Input/Output. The number of nulls encountered.
NullCount int64
// Output only. The validity bitmap to populate. May be be null only
// for DefRepLevelsToListInfo (if all that is needed is list offsets).
ValidBits []byte
// Input only, offset into valid_bits to start at.
ValidBitsOffset int64
}
// create a bitmap out of the definition Levels and return the number of non-null values
func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) (count uint64) {
const maxbatch = 8 * int(unsafe.Sizeof(uint64(0)))
if !hasRepeatedParent && int64(len(defLevels)) > remainingUpperBound {
panic("values read exceed upper bound")
}
var batch []int16
for len(defLevels) > 0 {
batchSize := shared_utils.Min(maxbatch, len(defLevels))
batch, defLevels = defLevels[:batchSize], defLevels[batchSize:]
definedBitmap := bmi.GreaterThanBitmap(batch, info.DefLevel-1)
if hasRepeatedParent {
// Greater than level_info.repeated_ancestor_def_level - 1 implies >= the
// repeated_ancestor_def_level
presentBitmap := bmi.GreaterThanBitmap(batch, info.RepeatedAncestorDefLevel-1)
selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap)
selectedCount := int64(bits.OnesCount64(presentBitmap))
if selectedCount > remainingUpperBound {
panic("values read exceeded upper bound")
}
wr.AppendWord(selectedBits, selectedCount)
count += uint64(bits.OnesCount64(selectedBits))
continue
}
wr.AppendWord(definedBitmap, int64(len(batch)))
count += uint64(bits.OnesCount64(definedBitmap))
}
return
}
// create a bitmap out of the definition Levels
func defLevelsToBitmapInternal(defLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput, hasRepeatedParent bool) {
wr := utils.NewFirstTimeBitmapWriter(out.ValidBits, out.ValidBitsOffset, int64(out.ReadUpperBound))
defer wr.Finish()
setCount := defLevelsBatchToBitmap(defLevels, out.ReadUpperBound, info, wr, hasRepeatedParent)
out.Read = int64(wr.Pos())
out.NullCount += out.Read - int64(setCount)
}
// DefLevelsToBitmap creates a validitybitmap out of the passed in definition levels and info object.
func DefLevelsToBitmap(defLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput) {
hasRepeatedParent := false
if info.RepLevel > 0 {
hasRepeatedParent = true
}
defLevelsToBitmapInternal(defLevels, info, out, hasRepeatedParent)
}
// DefRepLevelsToListInfo takes in the definition and repetition levels in order to populate the validity bitmap
// and properly handle nested lists and update the offsets for them.
func DefRepLevelsToListInfo(defLevels, repLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput, offsets []int32) error {
var wr utils.BitmapWriter
if out.ValidBits != nil {
wr = utils.NewFirstTimeBitmapWriter(out.ValidBits, out.ValidBitsOffset, out.ReadUpperBound)
defer wr.Finish()
}
offsetPos := 0
for idx := range defLevels {
// skip items that belong to empty or null ancestor lists and further nested lists
if defLevels[idx] < info.RepeatedAncestorDefLevel || repLevels[idx] > info.RepLevel {
continue
}
if repLevels[idx] == info.RepLevel {
// continuation of an existing list.
// offsets can be null for structs with repeated children
if offsetPos < len(offsets) {
if offsets[offsetPos] == math.MaxInt32 {
return xerrors.New("list index overflow")
}
offsets[offsetPos]++
}
} else {
if (wr != nil && int64(wr.Pos()) >= out.ReadUpperBound) || (offsetPos >= int(out.ReadUpperBound)) {
return fmt.Errorf("definition levels exceeded upper bound: %d", out.ReadUpperBound)
}
// current_rep < list rep_level i.e. start of a list (ancestor empty lists
// are filtered out above)
// offsets can be null for structs with repeated children
if offsetPos+1 < len(offsets) {
offsetPos++
// use cumulative offsets because variable size lists are more common
// than fixed size lists so it should be cheaper to make these
// cumulative and subtract when validating fixed size lists
offsets[offsetPos] = offsets[offsetPos-1]
if defLevels[idx] >= info.DefLevel {
if offsets[offsetPos] == math.MaxInt32 {
return xerrors.New("list index overflow")
}
offsets[offsetPos]++
}
}
if wr != nil {
// the level info def level for lists reflects element present level
// the prior level distinguishes between empty lists
if defLevels[idx] >= info.DefLevel-1 {
wr.Set()
} else {
out.NullCount++
wr.Clear()
}
wr.Next()
}
}
}
if len(offsets) > 0 {
out.Read = int64(offsetPos)
} else if wr != nil {
out.Read = int64(wr.Pos())
}
if out.NullCount > 0 && info.NullSlotUsage > 1 {
return xerrors.New("null values with null_slot_usage > 1 not supported.")
}
return nil
}
// DefRepLevelsToBitmap constructs a full validitybitmap out of the definition and repetition levels
// properly handling nested lists and parents.
func DefRepLevelsToBitmap(defLevels, repLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput) error {
info.RepLevel++
info.DefLevel++
return DefRepLevelsToListInfo(defLevels, repLevels, info, out, nil)
}
|