File: response.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (95 lines) | stat: -rw-r--r-- 1,858 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package prototest

import (
	"bufio"
	"bytes"
	"encoding/hex"
	"fmt"
	"io"
	"testing"

	"github.com/segmentio/kafka-go/protocol"
)

func TestResponse(t *testing.T, version int16, msg protocol.Message) {
	reset := load(msg)

	t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
		b := &bytes.Buffer{}

		if err := protocol.WriteResponse(b, version, 1234, msg); err != nil {
			t.Fatal(err)
		}

		reset()

		t.Logf("\n%s", hex.Dump(b.Bytes()))

		correlationID, res, err := protocol.ReadResponse(b, msg.ApiKey(), version)
		if err != nil {
			t.Fatal(err)
		}
		if correlationID != 1234 {
			t.Errorf("correlation id mismatch: %d != %d", correlationID, 1234)
		}
		if !deepEqual(msg, res) {
			t.Errorf("response message mismatch:")
			t.Logf("expected: %+v", msg)
			t.Logf("found:    %+v", res)
		}
		closeMessage(res)
	})
}

func BenchmarkResponse(b *testing.B, version int16, msg protocol.Message) {
	reset := load(msg)

	b.Run(fmt.Sprintf("v%d", version), func(b *testing.B) {
		apiKey := msg.ApiKey()
		buffer := &bytes.Buffer{}
		buffer.Grow(1024)

		b.Run("read", func(b *testing.B) {
			w := io.Writer(buffer)

			if err := protocol.WriteResponse(w, version, 1234, msg); err != nil {
				b.Fatal(err)
			}

			reset()

			p := buffer.Bytes()
			x := bytes.NewReader(p)
			r := bufio.NewReader(x)

			for i := 0; i < b.N; i++ {
				_, res, err := protocol.ReadResponse(r, apiKey, version)
				if err != nil {
					b.Fatal(err)
				}
				closeMessage(res)
				x.Reset(p)
				r.Reset(x)
			}

			b.SetBytes(int64(len(p)))
			buffer.Reset()
		})

		b.Run("write", func(b *testing.B) {
			w := io.Writer(buffer)
			n := int64(0)

			for i := 0; i < b.N; i++ {
				if err := protocol.WriteResponse(w, version, 1234, msg); err != nil {
					b.Fatal(err)
				}
				reset()
				n = int64(buffer.Len())
				buffer.Reset()
			}

			b.SetBytes(n)
		})
	})
}