1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
package sarama
// ControlRecordType ...
type ControlRecordType int
const (
// ControlRecordAbort is a control record for abort
ControlRecordAbort ControlRecordType = iota
// ControlRecordCommit is a control record for commit
ControlRecordCommit
// ControlRecordUnknown is a control record of unknown type
ControlRecordUnknown
)
// Control records are returned as a record by fetchRequest
// However unlike "normal" records, they mean nothing application wise.
// They only serve internal logic for supporting transactions.
type ControlRecord struct {
Version int16
CoordinatorEpoch int32
Type ControlRecordType
}
func (cr *ControlRecord) decode(key, value packetDecoder) error {
var err error
// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
// Either way, all these version can only be 0 for now
cr.Version, err = key.getInt16()
if err != nil {
return err
}
recordType, err := key.getInt16()
if err != nil {
return err
}
switch recordType {
case 0:
cr.Type = ControlRecordAbort
case 1:
cr.Type = ControlRecordCommit
default:
// from JAVA implementation:
// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
cr.Type = ControlRecordUnknown
}
// we want to parse value only if we are decoding control record of known type
if cr.Type != ControlRecordUnknown {
cr.Version, err = value.getInt16()
if err != nil {
return err
}
cr.CoordinatorEpoch, err = value.getInt32()
if err != nil {
return err
}
}
return nil
}
func (cr *ControlRecord) encode(key, value packetEncoder) {
value.putInt16(cr.Version)
value.putInt32(cr.CoordinatorEpoch)
key.putInt16(cr.Version)
switch cr.Type {
case ControlRecordAbort:
key.putInt16(0)
case ControlRecordCommit:
key.putInt16(1)
}
}
|