1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
package prototest
import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"testing"
"github.com/segmentio/kafka-go/protocol"
)
func TestResponse(t *testing.T, version int16, msg protocol.Message) {
reset := load(msg)
t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
b := &bytes.Buffer{}
if err := protocol.WriteResponse(b, version, 1234, msg); err != nil {
t.Fatal(err)
}
reset()
t.Logf("\n%s", hex.Dump(b.Bytes()))
correlationID, res, err := protocol.ReadResponse(b, msg.ApiKey(), version)
if err != nil {
t.Fatal(err)
}
if correlationID != 1234 {
t.Errorf("correlation id mismatch: %d != %d", correlationID, 1234)
}
if !deepEqual(msg, res) {
t.Errorf("response message mismatch:")
t.Logf("expected: %+v", msg)
t.Logf("found: %+v", res)
}
closeMessage(res)
})
}
func BenchmarkResponse(b *testing.B, version int16, msg protocol.Message) {
reset := load(msg)
b.Run(fmt.Sprintf("v%d", version), func(b *testing.B) {
apiKey := msg.ApiKey()
buffer := &bytes.Buffer{}
buffer.Grow(1024)
b.Run("read", func(b *testing.B) {
w := io.Writer(buffer)
if err := protocol.WriteResponse(w, version, 1234, msg); err != nil {
b.Fatal(err)
}
reset()
p := buffer.Bytes()
x := bytes.NewReader(p)
r := bufio.NewReader(x)
for i := 0; i < b.N; i++ {
_, res, err := protocol.ReadResponse(r, apiKey, version)
if err != nil {
b.Fatal(err)
}
closeMessage(res)
x.Reset(p)
r.Reset(x)
}
b.SetBytes(int64(len(p)))
buffer.Reset()
})
b.Run("write", func(b *testing.B) {
w := io.Writer(buffer)
n := int64(0)
for i := 0; i < b.N; i++ {
if err := protocol.WriteResponse(w, version, 1234, msg); err != nil {
b.Fatal(err)
}
reset()
n = int64(buffer.Len())
buffer.Reset()
}
b.SetBytes(n)
})
})
}
|