1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
//go:build !functional
package sarama
import (
"testing"
)
var (
abortTxCtrlRecKey = []byte{
0, 0, // version
0, 0, // TX_ABORT = 0
}
abortTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 10, // coordinator epoch
}
commitTxCtrlRecKey = []byte{
0, 0, // version
0, 1, // TX_COMMIT = 1
}
commitTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 15, // coordinator epoch
}
unknownCtrlRecKey = []byte{
0, 0, // version
0, 128, // UNKNOWN = -1
}
// empty value for unknown record
unknownCtrlRecValue = []byte{}
)
func testDecode(t *testing.T, tp string, key []byte, value []byte) ControlRecord {
controlRecord := ControlRecord{}
err := controlRecord.decode(&realDecoder{raw: key}, &realDecoder{raw: value})
if err != nil {
t.Error("Decoding control record of type " + tp + " failed")
return ControlRecord{}
}
return controlRecord
}
func assertRecordType(t *testing.T, r *ControlRecord, expected ControlRecordType) {
if r.Type != expected {
t.Errorf("control record type mismatch, expected: %v, have %v", expected, r.Type)
}
}
func TestDecodingControlRecords(t *testing.T) {
abortTx := testDecode(t, "abort transaction", abortTxCtrlRecKey, abortTxCtrlRecValue)
assertRecordType(t, &abortTx, ControlRecordAbort)
if abortTx.CoordinatorEpoch != 10 {
t.Errorf("abort tx control record coordinator epoch mismatch")
}
commitTx := testDecode(t, "commit transaction", commitTxCtrlRecKey, commitTxCtrlRecValue)
if commitTx.CoordinatorEpoch != 15 {
t.Errorf("commit tx control record coordinator epoch mismatch")
}
assertRecordType(t, &commitTx, ControlRecordCommit)
unknown := testDecode(t, "unknown", unknownCtrlRecKey, unknownCtrlRecValue)
assertRecordType(t, &unknown, ControlRecordUnknown)
}
|