1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
package eventstream
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
)
type decodedMessage struct {
rawMessage
Headers decodedHeaders `json:"headers"`
}
type jsonMessage struct {
Length json.Number `json:"total_length"`
HeadersLen json.Number `json:"headers_length"`
PreludeCRC json.Number `json:"prelude_crc"`
Headers decodedHeaders `json:"headers"`
Payload []byte `json:"payload"`
CRC json.Number `json:"message_crc"`
}
func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) {
var jsonMsg jsonMessage
if err = json.Unmarshal(b, &jsonMsg); err != nil {
return err
}
d.Length, err = numAsUint32(jsonMsg.Length)
if err != nil {
return err
}
d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen)
if err != nil {
return err
}
d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC)
if err != nil {
return err
}
d.Headers = jsonMsg.Headers
d.Payload = jsonMsg.Payload
d.CRC, err = numAsUint32(jsonMsg.CRC)
if err != nil {
return err
}
return nil
}
func (d *decodedMessage) MarshalJSON() ([]byte, error) {
jsonMsg := jsonMessage{
Length: json.Number(strconv.Itoa(int(d.Length))),
HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))),
PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))),
Headers: d.Headers,
Payload: d.Payload,
CRC: json.Number(strconv.Itoa(int(d.CRC))),
}
return json.Marshal(jsonMsg)
}
func numAsUint32(n json.Number) (uint32, error) {
v, err := n.Int64()
if err != nil {
return 0, fmt.Errorf("failed to get int64 json number, %v", err)
}
return uint32(v), nil
}
func (d decodedMessage) Message() Message {
return Message{
Headers: Headers(d.Headers),
Payload: d.Payload,
}
}
type decodedHeaders Headers
func (hs *decodedHeaders) UnmarshalJSON(b []byte) error {
var jsonHeaders []struct {
Name string `json:"name"`
Type valueType `json:"type"`
Value interface{} `json:"value"`
}
decoder := json.NewDecoder(bytes.NewReader(b))
decoder.UseNumber()
if err := decoder.Decode(&jsonHeaders); err != nil {
return err
}
var headers Headers
for _, h := range jsonHeaders {
value, err := valueFromType(h.Type, h.Value)
if err != nil {
return err
}
headers.Set(h.Name, value)
}
*hs = decodedHeaders(headers)
return nil
}
func valueFromType(typ valueType, val interface{}) (Value, error) {
switch typ {
case trueValueType:
return BoolValue(true), nil
case falseValueType:
return BoolValue(false), nil
case int8ValueType:
v, err := val.(json.Number).Int64()
return Int8Value(int8(v)), err
case int16ValueType:
v, err := val.(json.Number).Int64()
return Int16Value(int16(v)), err
case int32ValueType:
v, err := val.(json.Number).Int64()
return Int32Value(int32(v)), err
case int64ValueType:
v, err := val.(json.Number).Int64()
return Int64Value(v), err
case bytesValueType:
v, err := base64.StdEncoding.DecodeString(val.(string))
return BytesValue(v), err
case stringValueType:
v, err := base64.StdEncoding.DecodeString(val.(string))
return StringValue(string(v)), err
case timestampValueType:
v, err := val.(json.Number).Int64()
return TimestampValue(timeFromEpochMilli(v)), err
case uuidValueType:
v, err := base64.StdEncoding.DecodeString(val.(string))
var tv UUIDValue
copy(tv[:], v)
return tv, err
default:
panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val))
}
}
|