File: rawproduce_test.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (123 lines) | stat: -rw-r--r-- 3,009 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package kafka

import (
	"bytes"
	"context"
	"testing"
	"time"

	"github.com/segmentio/kafka-go/protocol"
	ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientRawProduce(t *testing.T) {
	// The RawProduce request records are encoded in the format introduced in Kafka 0.11.0.
	if !ktesting.KafkaIsAtLeast("0.11.0") {
		t.Skip("Skipping because the RawProduce request is not supported by Kafka versions below 0.11.0")
	}

	client, topic, shutdown := newLocalClientAndTopic()
	defer shutdown()

	now := time.Now()

	res, err := client.RawProduce(context.Background(), &RawProduceRequest{
		Topic:        topic,
		Partition:    0,
		RequiredAcks: -1,
		RawRecords: NewRawRecordSet(NewRecordReader(
			Record{Time: now, Value: NewBytes([]byte(`hello-1`))},
			Record{Time: now, Value: NewBytes([]byte(`hello-2`))},
			Record{Time: now, Value: NewBytes([]byte(`hello-3`))},
		), 0),
	})

	if err != nil {
		t.Fatal(err)
	}

	if res.Error != nil {
		t.Error(res.Error)
	}

	for index, err := range res.RecordErrors {
		t.Errorf("record at index %d produced an error: %v", index, err)
	}
}

func TestClientRawProduceCompressed(t *testing.T) {
	// The RawProduce request records are encoded in the format introduced in Kafka 0.11.0.
	if !ktesting.KafkaIsAtLeast("0.11.0") {
		t.Skip("Skipping because the RawProduce request is not supported by Kafka versions below 0.11.0")
	}

	client, topic, shutdown := newLocalClientAndTopic()
	defer shutdown()

	now := time.Now()

	res, err := client.RawProduce(context.Background(), &RawProduceRequest{
		Topic:        topic,
		Partition:    0,
		RequiredAcks: -1,
		RawRecords: NewRawRecordSet(NewRecordReader(
			Record{Time: now, Value: NewBytes([]byte(`hello-1`))},
			Record{Time: now, Value: NewBytes([]byte(`hello-2`))},
			Record{Time: now, Value: NewBytes([]byte(`hello-3`))},
		), protocol.Gzip),
	})

	if err != nil {
		t.Fatal(err)
	}

	if res.Error != nil {
		t.Error(res.Error)
	}

	for index, err := range res.RecordErrors {
		t.Errorf("record at index %d produced an error: %v", index, err)
	}
}

func TestClientRawProduceNilRecords(t *testing.T) {
	client, topic, shutdown := newLocalClientAndTopic()
	defer shutdown()

	_, err := client.RawProduce(context.Background(), &RawProduceRequest{
		Topic:        topic,
		Partition:    0,
		RequiredAcks: -1,
		RawRecords:   protocol.RawRecordSet{Reader: nil},
	})

	if err != nil {
		t.Fatal(err)
	}
}

func TestClientRawProduceEmptyRecords(t *testing.T) {
	client, topic, shutdown := newLocalClientAndTopic()
	defer shutdown()

	_, err := client.Produce(context.Background(), &ProduceRequest{
		Topic:        topic,
		Partition:    0,
		RequiredAcks: -1,
		Records:      NewRecordReader(),
	})

	if err != nil {
		t.Fatal(err)
	}
}

func NewRawRecordSet(reader protocol.RecordReader, attr protocol.Attributes) protocol.RawRecordSet {
	rs := protocol.RecordSet{Version: 2, Attributes: attr, Records: reader}
	buf := &bytes.Buffer{}
	rs.WriteTo(buf)

	return protocol.RawRecordSet{
		Reader: buf,
	}
}