File: delta_byte_array.go

package info (click to toggle)
golang-github-apache-arrow-go 18.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 32,200 kB
  • sloc: asm: 477,547; ansic: 5,369; cpp: 759; sh: 585; makefile: 319; python: 190; sed: 5
file content (277 lines) | stat: -rw-r--r-- 8,657 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/apache/arrow-go/v18/internal/utils"
	"github.com/apache/arrow-go/v18/parquet"
	"golang.org/x/xerrors"
)

// DeltaByteArrayEncoder is an encoder for writing bytearrays which are delta encoded
// this is also known as incremental encoding or front compression. For each element
// in a sequence of strings, we store the prefix length of the previous entry plus the suffix
// see https://en.wikipedia.org/wiki/Incremental_encoding for a longer description.
//
// This is stored as a sequence of delta-encoded prefix lengths followed by the suffixes
// encoded as delta length byte arrays.
type DeltaByteArrayEncoder struct {
	encoder

	prefixEncoder *DeltaBitPackInt32Encoder
	suffixEncoder *DeltaLengthByteArrayEncoder

	lastVal parquet.ByteArray
}

func (enc *DeltaByteArrayEncoder) EstimatedDataEncodedSize() int64 {
	prefixEstimatedSize := int64(0)
	if enc.prefixEncoder != nil {
		prefixEstimatedSize = enc.prefixEncoder.EstimatedDataEncodedSize()
	}
	suffixEstimatedSize := int64(0)
	if enc.suffixEncoder != nil {
		suffixEstimatedSize = enc.suffixEncoder.EstimatedDataEncodedSize()
	}
	return prefixEstimatedSize + suffixEstimatedSize
}

func (enc *DeltaByteArrayEncoder) initEncoders() {
	enc.prefixEncoder = &DeltaBitPackInt32Encoder{
		encoder: newEncoderBase(enc.encoding, nil, enc.mem),
	}
	enc.suffixEncoder = &DeltaLengthByteArrayEncoder{
		newEncoderBase(enc.encoding, nil, enc.mem),
		&DeltaBitPackInt32Encoder{
			encoder: newEncoderBase(enc.encoding, nil, enc.mem),
		},
	}
}

// Type returns the underlying physical type this operates on, in this case ByteArrays only
func (DeltaByteArrayEncoder) Type() parquet.Type { return parquet.Types.ByteArray }

// Put writes a slice of ByteArrays to the encoder
func (enc *DeltaByteArrayEncoder) Put(in []parquet.ByteArray) {
	if len(in) == 0 {
		return
	}

	var suf parquet.ByteArray
	if enc.prefixEncoder == nil { // initialize our encoders if we haven't yet
		enc.initEncoders()
		enc.prefixEncoder.Put([]int32{0})
		suf = in[0]
		enc.lastVal = in[0]
		enc.suffixEncoder.Put([]parquet.ByteArray{suf})
		in = in[1:]
	}

	// for each value, figure out the common prefix with the previous value
	// and then write the prefix length and the suffix.
	for _, val := range in {
		l1 := enc.lastVal.Len()
		l2 := val.Len()
		j := 0
		for j < l1 && j < l2 {
			if enc.lastVal[j] != val[j] {
				break
			}
			j++
		}
		enc.prefixEncoder.Put([]int32{int32(j)})
		suf = val[j:]
		enc.suffixEncoder.Put([]parquet.ByteArray{suf})
		enc.lastVal = val
	}

	// do the memcpy after the loops to keep a copy of the lastVal
	// we do a copy here so that we only copy and keep a reference
	// to the suffix, and aren't forcing the *entire* value to stay
	// in memory while we have this reference to just the suffix.
	enc.lastVal = append([]byte{}, enc.lastVal...)
}

// PutSpaced is like Put, but assumes the data is already spaced for nulls and uses the bitmap provided and offset
// to compress the data before writing it without the null slots.
func (enc *DeltaByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
	if validBits != nil {
		data := make([]parquet.ByteArray, len(in))
		nvalid := spacedCompress(in, data, validBits, validBitsOffset)
		enc.Put(data[:nvalid])
	} else {
		enc.Put(in)
	}
}

// Flush flushes any remaining data out and returns the finished encoded buffer.
// or returns nil and any error encountered during flushing.
func (enc *DeltaByteArrayEncoder) FlushValues() (Buffer, error) {
	if enc.prefixEncoder == nil {
		enc.initEncoders()
	}
	prefixBuf, err := enc.prefixEncoder.FlushValues()
	if err != nil {
		return nil, err
	}
	defer prefixBuf.Release()

	suffixBuf, err := enc.suffixEncoder.FlushValues()
	if err != nil {
		return nil, err
	}
	defer suffixBuf.Release()

	ret := bufferPool.Get().(*memory.Buffer)
	ret.ResizeNoShrink(prefixBuf.Len() + suffixBuf.Len())
	copy(ret.Bytes(), prefixBuf.Bytes())
	copy(ret.Bytes()[prefixBuf.Len():], suffixBuf.Bytes())
	return poolBuffer{ret}, nil
}

// DeltaByteArrayDecoder is a decoder for a column of data encoded using incremental or prefix encoding.
type DeltaByteArrayDecoder struct {
	*DeltaLengthByteArrayDecoder

	prefixLengths []int32
	lastVal       parquet.ByteArray
}

// Type returns the underlying physical type this decoder operates on, in this case ByteArrays only
func (DeltaByteArrayDecoder) Type() parquet.Type {
	return parquet.Types.ByteArray
}

func (d *DeltaByteArrayDecoder) Allocator() memory.Allocator { return d.mem }

// SetData expects the passed in data to be the prefix lengths, followed by the
// blocks of suffix data in order to initialize the decoder.
func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error {
	prefixLenDec := DeltaBitPackInt32Decoder{
		decoder: newDecoderBase(d.encoding, d.descr),
		mem:     d.mem,
	}

	if err := prefixLenDec.SetData(nvalues, data); err != nil {
		return err
	}

	d.prefixLengths = make([]int32, nvalues)
	// decode all the prefix lengths first so we know how many bytes it took to get the
	// prefix lengths for nvalues
	prefixLenDec.Decode(d.prefixLengths)

	// now that we know how many bytes we needed for the prefix lengths, the rest are the
	// delta length byte array encoding.
	return d.DeltaLengthByteArrayDecoder.SetData(nvalues, data[int(prefixLenDec.bytesRead()):])
}

func (d *DeltaByteArrayDecoder) Discard(n int) (int, error) {
	n = min(n, d.nvals)
	if n == 0 {
		return 0, nil
	}

	remaining := n
	tmp := make([]parquet.ByteArray, 1)
	if d.lastVal == nil {
		if _, err := d.DeltaLengthByteArrayDecoder.Decode(tmp); err != nil {
			return 0, err
		}
		d.lastVal = tmp[0]
		d.prefixLengths = d.prefixLengths[1:]
		remaining--
	}

	var prefixLen int32
	for remaining > 0 {
		prefixLen, d.prefixLengths = d.prefixLengths[0], d.prefixLengths[1:]
		prefix := d.lastVal[:prefixLen:prefixLen]

		if _, err := d.DeltaLengthByteArrayDecoder.Decode(tmp); err != nil {
			return n - remaining, err
		}

		if len(tmp[0]) == 0 {
			d.lastVal = prefix
		} else {
			d.lastVal = make([]byte, int(prefixLen)+len(tmp[0]))
			copy(d.lastVal, prefix)
			copy(d.lastVal[prefixLen:], tmp[0])
		}
		remaining--
	}

	return n, nil
}

// Decode decodes byte arrays into the slice provided and returns the number of values actually decoded
func (d *DeltaByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
	max := utils.Min(len(out), d.nvals)
	if max == 0 {
		return 0, nil
	}
	out = out[:max]

	var err error
	if d.lastVal == nil {
		_, err = d.DeltaLengthByteArrayDecoder.Decode(out[:1])
		if err != nil {
			return 0, err
		}
		d.lastVal = out[0]
		out = out[1:]
		d.prefixLengths = d.prefixLengths[1:]
	}

	var prefixLen int32
	suffixHolder := make([]parquet.ByteArray, 1)
	for len(out) > 0 {
		prefixLen, d.prefixLengths = d.prefixLengths[0], d.prefixLengths[1:]

		prefix := d.lastVal[:prefixLen:prefixLen]
		_, err = d.DeltaLengthByteArrayDecoder.Decode(suffixHolder)
		if err != nil {
			return 0, err
		}

		if len(suffixHolder[0]) == 0 {
			d.lastVal = prefix
		} else {
			d.lastVal = make([]byte, int(prefixLen)+len(suffixHolder[0]))
			copy(d.lastVal, prefix)
			copy(d.lastVal[prefixLen:], suffixHolder[0])
		}
		out[0], out = d.lastVal, out[1:]
	}
	return max, nil
}

// DecodeSpaced is like decode, but the result is spaced out based on the bitmap provided.
func (d *DeltaByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
	toread := len(out) - nullCount
	values, err := d.Decode(out[:toread])
	if err != nil {
		return values, err
	}
	if values != toread {
		return values, xerrors.New("parquet: number of values / definition levels read did not match")
	}

	return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}