File: array.go

package info (click to toggle)
golang-github-linkedin-goavro 2.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 944 kB
  • sloc: makefile: 8
file content (226 lines) | stat: -rw-r--r-- 8,375 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright [2019] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License.  You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

package goavro

import (
	"fmt"
	"io"
	"math"
	"reflect"
)

func makeArrayCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) {
	// array type must have items
	itemSchema, ok := schemaMap["items"]
	if !ok {
		return nil, fmt.Errorf("Array ought to have items key")
	}
	itemCodec, err := buildCodec(st, enclosingNamespace, itemSchema, cb)
	if err != nil {
		return nil, fmt.Errorf("Array items ought to be valid Avro type: %s", err)
	}

	return &Codec{
		typeName: &name{"array", nullNamespace},
		nativeFromBinary: func(buf []byte) (interface{}, []byte, error) {
			var value interface{}
			var err error

			// block count and block size
			if value, buf, err = longNativeFromBinary(buf); err != nil {
				return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
			}
			blockCount := value.(int64)
			if blockCount < 0 {
				// NOTE: A negative block count implies there is a long encoded
				// block size following the negative block count. We have no use
				// for the block size in this decoder, so we read and discard
				// the value.
				if blockCount == math.MinInt64 {
					// The minimum number for any signed numerical type can never be made positive
					return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", blockCount)
				}
				blockCount = -blockCount // convert to its positive equivalent
				if _, buf, err = longNativeFromBinary(buf); err != nil {
					return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
				}
			}
			// Ensure block count does not exceed some sane value.
			if blockCount > MaxBlockCount {
				return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
			}
			// NOTE: While the attempt of a RAM optimization shown below is not
			// necessary, many encoders will encode all items in a single block.
			// We can optimize amount of RAM allocated by runtime for the array
			// by initializing the array for that number of items.
			arrayValues := make([]interface{}, 0, blockCount)

			for blockCount != 0 {
				// Decode `blockCount` datum values from buffer
				for i := int64(0); i < blockCount; i++ {
					if value, buf, err = itemCodec.nativeFromBinary(buf); err != nil {
						return nil, nil, fmt.Errorf("cannot decode binary array item %d: %s", i+1, err)
					}
					arrayValues = append(arrayValues, value)
				}
				// Decode next blockCount from buffer, because there may be more blocks
				if value, buf, err = longNativeFromBinary(buf); err != nil {
					return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
				}
				blockCount = value.(int64)
				if blockCount < 0 {
					// NOTE: A negative block count implies there is a long
					// encoded block size following the negative block count. We
					// have no use for the block size in this decoder, so we
					// read and discard the value.
					if blockCount == math.MinInt64 {
						// The minimum number for any signed numerical type can
						// never be made positive
						return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", blockCount)
					}
					blockCount = -blockCount // convert to its positive equivalent
					if _, buf, err = longNativeFromBinary(buf); err != nil {
						return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
					}
				}
				// Ensure block count does not exceed some sane value.
				if blockCount > MaxBlockCount {
					return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
				}
			}
			return arrayValues, buf, nil
		},
		binaryFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
			arrayValues, err := convertArray(datum)
			if err != nil {
				return nil, fmt.Errorf("cannot encode binary array: %s", err)
			}

			arrayLength := int64(len(arrayValues))
			var alreadyEncoded, remainingInBlock int64

			for i, item := range arrayValues {
				if remainingInBlock == 0 { // start a new block
					remainingInBlock = arrayLength - alreadyEncoded
					if remainingInBlock > MaxBlockCount {
						// limit block count to MacBlockCount
						remainingInBlock = MaxBlockCount
					}
					buf, _ = longBinaryFromNative(buf, remainingInBlock)
				}

				if buf, err = itemCodec.binaryFromNative(buf, item); err != nil {
					return nil, fmt.Errorf("cannot encode binary array item %d: %v: %s", i+1, item, err)
				}

				remainingInBlock--
				alreadyEncoded++
			}

			return longBinaryFromNative(buf, 0) // append trailing 0 block count to signal end of Array
		},
		nativeFromTextual: func(buf []byte) (interface{}, []byte, error) {
			var arrayValues []interface{}
			var value interface{}
			var err error
			var b byte

			if buf, err = advanceAndConsume(buf, '['); err != nil {
				return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
			}
			if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
				return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
			}
			// NOTE: Special case for empty array
			if buf[0] == ']' {
				return arrayValues, buf[1:], nil
			}

			// NOTE: Also terminates when read ']' byte.
			for len(buf) > 0 {
				// decode value
				value, buf, err = itemCodec.nativeFromTextual(buf)
				if err != nil {
					return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
				}
				arrayValues = append(arrayValues, value)
				// either comma or closing curly brace
				if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
					return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
				}
				switch b = buf[0]; b {
				case ']':
					return arrayValues, buf[1:], nil
				case ',':
					// no-op
				default:
					return nil, nil, fmt.Errorf("cannot decode textual array: expected ',' or ']'; received: %q", b)
				}
				// NOTE: consume comma from above
				if buf, _ = advanceToNonWhitespace(buf[1:]); len(buf) == 0 {
					return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
				}
			}
			return nil, buf, io.ErrShortBuffer
		},
		textualFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
			arrayValues, err := convertArray(datum)
			if err != nil {
				return nil, fmt.Errorf("cannot encode textual array: %s", err)
			}

			var atLeastOne bool

			buf = append(buf, '[')

			for i, item := range arrayValues {
				atLeastOne = true

				// Encode value
				buf, err = itemCodec.textualFromNative(buf, item)
				if err != nil {
					// field was specified in datum; therefore its value was invalid
					return nil, fmt.Errorf("cannot encode textual array item %d; %v: %s", i+1, item, err)
				}
				buf = append(buf, ',')
			}

			if atLeastOne {
				return append(buf[:len(buf)-1], ']'), nil
			}
			return append(buf, ']'), nil
		},
	}, nil
}

// convertArray converts interface{} to []interface{} if possible.
func convertArray(datum interface{}) ([]interface{}, error) {
	arrayValues, ok := datum.([]interface{})
	if ok {
		return arrayValues, nil
	}
	// NOTE: When given a slice of any other type, zip values to
	// items as a convenience to client.
	v := reflect.ValueOf(datum)
	if v.Kind() != reflect.Slice {
		return nil, fmt.Errorf("cannot create []interface{}: expected slice; received: %T", datum)
	}
	// NOTE: Two better alternatives to the current algorithm are:
	//   (1) mutate the reflection tuple underneath to convert the
	//       []int, for example, to []interface{}, with O(1) complexity
	//   (2) use copy builtin to zip the data items over with O(n) complexity,
	//       but more efficient than what's below.
	// Suggestions?
	arrayValues = make([]interface{}, v.Len())
	for idx := 0; idx < v.Len(); idx++ {
		arrayValues[idx] = v.Index(idx).Interface()
	}
	return arrayValues, nil
}