1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
|
// Copyright [2019] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package goavro
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"sort"
)
// codecInfo is a set of quick lookups it holds all the lookup info for the
// all the schemas we need to handle the list of types for this union
type codecInfo struct {
allowedTypes []string
codecFromIndex []*Codec
codecFromName map[string]*Codec
indexFromName map[string]int
}
// Union wraps a datum value in a map for encoding as a Union, as required by
// Union encoder.
//
// When providing a value for an Avro union, the encoder will accept `nil` for a
// `null` value. If the value is non-`nil`, it must be a
// `map[string]interface{}` with a single key-value pair, where the key is the
// Avro type name and the value is the datum's value. As a convenience, the
// `Union` function wraps any datum value in a map as specified above.
//
// func ExampleUnion() {
// codec, err := goavro.NewCodec(`["null","string","int"]`)
// if err != nil {
// fmt.Println(err)
// }
// buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
// if err != nil {
// fmt.Println(err)
// }
// fmt.Println(string(buf))
// // Output: {"string":"some string"}
// }
func Union(name string, datum interface{}) interface{} {
if datum == nil && name == "null" {
return nil
}
return map[string]interface{}{name: datum}
}
// makeCodecInfo takes the schema array
// and builds some lookup indices
// returning a codecInfo
func makeCodecInfo(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (codecInfo, error) {
allowedTypes := make([]string, len(schemaArray)) // used for error reporting when encoder receives invalid datum type
codecFromIndex := make([]*Codec, len(schemaArray))
codecFromName := make(map[string]*Codec, len(schemaArray))
indexFromName := make(map[string]int, len(schemaArray))
for i, unionMemberSchema := range schemaArray {
unionMemberCodec, err := buildCodec(st, enclosingNamespace, unionMemberSchema, cb)
if err != nil {
return codecInfo{}, fmt.Errorf("Union item %d ought to be valid Avro type: %s", i+1, err)
}
fullName := unionMemberCodec.typeName.fullName
if _, ok := indexFromName[fullName]; ok {
return codecInfo{}, fmt.Errorf("Union item %d ought to be unique type: %s", i+1, unionMemberCodec.typeName)
}
allowedTypes[i] = fullName
codecFromIndex[i] = unionMemberCodec
codecFromName[fullName] = unionMemberCodec
indexFromName[fullName] = i
}
return codecInfo{
allowedTypes: allowedTypes,
codecFromIndex: codecFromIndex,
codecFromName: codecFromName,
indexFromName: indexFromName,
}, nil
}
func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
var decoded interface{}
var err error
decoded, buf, err = longNativeFromBinary(buf)
if err != nil {
return nil, nil, err
}
index := decoded.(int64) // longDecoder always returns int64, so elide error checking
if index < 0 || index >= int64(len(cr.codecFromIndex)) {
return nil, nil, fmt.Errorf("cannot decode binary union: index ought to be between 0 and %d; read index: %d", len(cr.codecFromIndex)-1, index)
}
c := cr.codecFromIndex[index]
decoded, buf, err = c.nativeFromBinary(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode binary union item %d: %s", index+1, err)
}
if decoded == nil {
// do not wrap a nil value in a map
return nil, buf, nil
}
// Non-nil values are wrapped in a map with single key set to type name of value
return Union(cr.allowedTypes[index], decoded), buf, nil
}
}
func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
index, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return longBinaryFromNative(buf, index)
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
c := cr.codecFromIndex[index]
buf, _ = longBinaryFromNative(buf, index)
return c.binaryFromNative(buf, value)
}
}
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.indexFromName["null"]; ok {
return nil, buf[4:], nil
}
}
var datum interface{}
var err error
datum, buf, err = genericMapTextDecoder(buf, nil, cr.codecFromName)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual union: %s", err)
}
return datum, buf, nil
}
}
func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
buf = append(buf, '{')
var err error
buf, err = stringTextualFromNative(buf, key)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
buf = append(buf, ':')
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return append(buf, '}'), nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}
rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,
typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromTextual: nativeFromTextual(&cr),
textualFromNative: textualFromNative(&cr),
}
return rv, nil
}
// Standard JSON
//
// The default avro library supports a json that would result from your data into json
// instead of serializing it into binary
//
// JSON in the wild differs from that in one critical way - unions
// the avro spec requires unions to have their type indicated
// which means every value that is of a union type
// is actually sent as a small map {"string", "some string"}
// instead of simply as the value itself, which is the way of wild JSON
// https://avro.apache.org/docs/current/spec.html#json_encoding
//
// In order to use this to avro encode standard json the unions have to be rewritten
// so the can encode into unions as expected by the avro schema
//
// so the technique is to read in the json in the usual way
// when a union type is found, read the next json object
// try to figure out if it fits into any of the types
// that are specified for the union per the supplied schema
// if so, then wrap the value into a map and return the expected Union
//
// the json is morphed on the read side
// and then it will remain avro-json object
// avro data is not serialized back into standard json
// the data goes to avro-json and stays that way
func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}
rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,
typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJson(&cr),
textualFromNative: textualFromNative(&cr),
}
return rv, nil
}
func checkAll(allowedTypes []string, cr *codecInfo, buf []byte) (interface{}, []byte, error) {
for _, name := range cr.allowedTypes {
if name == "null" {
// skip null since we know we already got type float64
continue
}
theCodec, ok := cr.codecFromName[name]
if !ok {
continue
}
rv, rb, err := theCodec.NativeFromTextual(buf)
if err != nil {
continue
}
return map[string]interface{}{name: rv}, rb, nil
}
return nil, buf, fmt.Errorf("could not decode any json data in input %v", string(buf))
}
func nativeAvroFromTextualJson(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
reader := bytes.NewReader(buf)
dec := json.NewDecoder(reader)
var m interface{}
// i should be able to grab the next json "value" with decoder.Decode()
// https://pkg.go.dev/encoding/json#Decoder.Decode
// that dec.More() loop will give the next
// whatever then dec.Decode(&m)
// if m is interface{}
// it goes one legit json object at a time like this
// json.Delim: [
// Q:map[string]interface {}: map[Name:Ed Text:Knock knock.]
// Q:map[string]interface {}: map[Name:Sam Text:Who's there?]
// Q:map[string]interface {}: map[Name:Ed Text:Go fmt.]
// Q:map[string]interface {}: map[Name:Sam Text:Go fmt who?]
// Q:map[string]interface {}: map[Name:Ed Text:Go fmt yourself!]
// string: eewew
// bottom:json.Delim: ]
//
// so right here, grab whatever this object is
// grab the object specified as the value
// and try to figure out what it is and handle it
err := dec.Decode(&m)
if err != nil {
return nil, buf, err
}
allowedTypes := cr.allowedTypes
switch m.(type) {
case nil:
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.codecFromName["null"]; ok {
return nil, buf[4:], nil
}
}
case float64:
// dec.Decode turns them all into float64
// avro spec knows about int, long (variable length zig-zag)
// and then float and double (32 bits, 64 bits)
// https://avro.apache.org/docs/current/spec.html#binary_encode_primitive
//
// double
// doubleNativeFromTextual
// float
// floatNativeFromTextual
// long
// longNativeFromTextual
// int
// intNativeFromTextual
// sorted so it would be
// double, float, int, long
// that makes the priorities right by chance
sort.Strings(cr.allowedTypes)
case map[string]interface{}:
// try to decode it as a map
// because a map should fail faster than a record
// if that fails assume record and return it
sort.Strings(cr.allowedTypes)
}
return checkAll(allowedTypes, cr, buf)
}
}
|