1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
package sarama
import (
"fmt"
"github.com/rcrowley/go-metrics"
)
// Encoder is the interface that wraps the basic Encode method.
// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
type encoder interface {
encode(pe packetEncoder) error
}
type encoderWithHeader interface {
encoder
headerVersion() int16
}
// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
if e == nil {
return nil, nil
}
var prepEnc prepEncoder
var realEnc realEncoder
err := e.encode(prepareFlexibleEncoder(&prepEnc, e))
if err != nil {
return nil, err
}
if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
}
realEnc.raw = make([]byte, prepEnc.length)
realEnc.registry = metricRegistry
err = e.encode(prepareFlexibleEncoder(&realEnc, e))
if err != nil {
return nil, err
}
return realEnc.raw, nil
}
// decoder is the interface that wraps the basic Decode method.
// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
type decoder interface {
decode(pd packetDecoder) error
}
type versionedDecoder interface {
decode(pd packetDecoder, version int16) error
}
type flexibleVersion interface {
isFlexibleVersion(version int16) bool
isFlexible() bool
}
// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}
helper := realDecoder{
raw: buf,
registry: metricRegistry,
}
err := in.decode(&helper)
if err != nil {
return err
}
if helper.off != len(buf) {
return PacketDecodingError{fmt.Sprintf("invalid length: buf=%d decoded=%d %#v", len(buf), helper.off, in)}
}
return nil
}
func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}
helper := prepareFlexibleDecoder(&realDecoder{
raw: buf,
registry: metricRegistry,
}, in, version)
err := in.decode(helper, version)
if err != nil {
return err
}
if remaining := helper.remaining(); remaining != 0 {
return PacketDecodingError{
Info: fmt.Sprintf("invalid length len=%d remaining=%d", len(buf), remaining),
}
}
return nil
}
func prepareFlexibleDecoder(pd *realDecoder, in versionedDecoder, version int16) packetDecoder {
if flexibleDecoder, ok := in.(flexibleVersion); ok && flexibleDecoder.isFlexibleVersion(version) {
return &realFlexibleDecoder{pd}
}
return pd
}
func prepareFlexibleEncoder(pe packetEncoder, req encoder) packetEncoder {
if flexibleEncoder, ok := req.(flexibleVersion); ok && flexibleEncoder.isFlexible() {
switch e := pe.(type) {
case *prepEncoder:
return &prepFlexibleEncoder{e}
case *realEncoder:
return &realFlexibleEncoder{e}
default:
return pe
}
}
return pe
}
func downgradeFlexibleDecoder(pd packetDecoder) packetDecoder {
if f, ok := pd.(*realFlexibleDecoder); ok {
return f.realDecoder
}
return pd
}
|