File: encoder_decoder.go

package info (click to toggle)
golang-github-ibm-sarama 1.46.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,072 kB
  • sloc: makefile: 40; sh: 30
file content (134 lines) | stat: -rw-r--r-- 3,231 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package sarama

import (
	"fmt"

	"github.com/rcrowley/go-metrics"
)

// Encoder is the interface that wraps the basic Encode method.
// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
type encoder interface {
	encode(pe packetEncoder) error
}

type encoderWithHeader interface {
	encoder
	headerVersion() int16
}

// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
	if e == nil {
		return nil, nil
	}

	var prepEnc prepEncoder
	var realEnc realEncoder

	err := e.encode(prepareFlexibleEncoder(&prepEnc, e))
	if err != nil {
		return nil, err
	}

	if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
		return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
	}

	realEnc.raw = make([]byte, prepEnc.length)
	realEnc.registry = metricRegistry
	err = e.encode(prepareFlexibleEncoder(&realEnc, e))
	if err != nil {
		return nil, err
	}

	return realEnc.raw, nil
}

// decoder is the interface that wraps the basic Decode method.
// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
type decoder interface {
	decode(pd packetDecoder) error
}

type versionedDecoder interface {
	decode(pd packetDecoder, version int16) error
}

type flexibleVersion interface {
	isFlexibleVersion(version int16) bool
	isFlexible() bool
}

// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
	if buf == nil {
		return nil
	}
	helper := realDecoder{
		raw:      buf,
		registry: metricRegistry,
	}
	err := in.decode(&helper)
	if err != nil {
		return err
	}

	if helper.off != len(buf) {
		return PacketDecodingError{fmt.Sprintf("invalid length: buf=%d decoded=%d %#v", len(buf), helper.off, in)}
	}

	return nil
}

func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
	if buf == nil {
		return nil
	}

	helper := prepareFlexibleDecoder(&realDecoder{
		raw:      buf,
		registry: metricRegistry,
	}, in, version)
	err := in.decode(helper, version)
	if err != nil {
		return err
	}

	if remaining := helper.remaining(); remaining != 0 {
		return PacketDecodingError{
			Info: fmt.Sprintf("invalid length len=%d remaining=%d", len(buf), remaining),
		}
	}

	return nil
}

func prepareFlexibleDecoder(pd *realDecoder, in versionedDecoder, version int16) packetDecoder {
	if flexibleDecoder, ok := in.(flexibleVersion); ok && flexibleDecoder.isFlexibleVersion(version) {
		return &realFlexibleDecoder{pd}
	}
	return pd
}

func prepareFlexibleEncoder(pe packetEncoder, req encoder) packetEncoder {
	if flexibleEncoder, ok := req.(flexibleVersion); ok && flexibleEncoder.isFlexible() {
		switch e := pe.(type) {
		case *prepEncoder:
			return &prepFlexibleEncoder{e}
		case *realEncoder:
			return &realFlexibleEncoder{e}
		default:
			return pe
		}
	}
	return pe
}

func downgradeFlexibleDecoder(pd packetDecoder) packetDecoder {
	if f, ok := pd.(*realFlexibleDecoder); ok {
		return f.realDecoder
	}
	return pd
}