1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
package kafka
import (
"bufio"
"bytes"
)
type groupAssignment struct {
Version int16
Topics map[string][]int32
UserData []byte
}
func (t groupAssignment) size() int32 {
sz := sizeofInt16(t.Version) + sizeofInt16(int16(len(t.Topics)))
for topic, partitions := range t.Topics {
sz += sizeofString(topic) + sizeofInt32Array(partitions)
}
return sz + sizeofBytes(t.UserData)
}
func (t groupAssignment) writeTo(w *bufio.Writer) {
writeInt16(w, t.Version)
writeInt32(w, int32(len(t.Topics)))
for topic, partitions := range t.Topics {
writeString(w, topic)
writeInt32Array(w, partitions)
}
writeBytes(w, t.UserData)
}
func (t *groupAssignment) readFrom(r *bufio.Reader, size int) (remain int, err error) {
// I came across this case when testing for compatibility with bsm/sarama-cluster. It
// appears in some cases, sarama-cluster can send a nil array entry. Admittedly, I
// didn't look too closely at it.
if size == 0 {
t.Topics = map[string][]int32{}
return 0, nil
}
if remain, err = readInt16(r, size, &t.Version); err != nil {
return
}
if remain, err = readMapStringInt32(r, remain, &t.Topics); err != nil {
return
}
if remain, err = readBytes(r, remain, &t.UserData); err != nil {
return
}
return
}
func (t groupAssignment) bytes() []byte {
buf := bytes.NewBuffer(nil)
w := bufio.NewWriter(buf)
t.writeTo(w)
w.Flush()
return buf.Bytes()
}
type syncGroupRequestGroupAssignmentV0 struct {
// MemberID assigned by the group coordinator
MemberID string
// MemberAssignments holds client encoded assignments
//
// See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
MemberAssignments []byte
}
func (t syncGroupRequestGroupAssignmentV0) size() int32 {
return sizeofString(t.MemberID) +
sizeofBytes(t.MemberAssignments)
}
func (t syncGroupRequestGroupAssignmentV0) writeTo(w *bufio.Writer) {
writeString(w, t.MemberID)
writeBytes(w, t.MemberAssignments)
}
type syncGroupRequestV0 struct {
// GroupID holds the unique group identifier
GroupID string
// GenerationID holds the generation of the group.
GenerationID int32
// MemberID assigned by the group coordinator
MemberID string
GroupAssignments []syncGroupRequestGroupAssignmentV0
}
func (t syncGroupRequestV0) size() int32 {
return sizeofString(t.GroupID) +
sizeofInt32(t.GenerationID) +
sizeofString(t.MemberID) +
sizeofArray(len(t.GroupAssignments), func(i int) int32 { return t.GroupAssignments[i].size() })
}
func (t syncGroupRequestV0) writeTo(w *bufio.Writer) {
writeString(w, t.GroupID)
writeInt32(w, t.GenerationID)
writeString(w, t.MemberID)
writeArray(w, len(t.GroupAssignments), func(i int) { t.GroupAssignments[i].writeTo(w) })
}
type syncGroupResponseV0 struct {
// ErrorCode holds response error code
ErrorCode int16
// MemberAssignments holds client encoded assignments
//
// See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
MemberAssignments []byte
}
func (t syncGroupResponseV0) size() int32 {
return sizeofInt16(t.ErrorCode) +
sizeofBytes(t.MemberAssignments)
}
func (t syncGroupResponseV0) writeTo(w *bufio.Writer) {
writeInt16(w, t.ErrorCode)
writeBytes(w, t.MemberAssignments)
}
func (t *syncGroupResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
return
}
if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
return
}
return
}
|