1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
package kafkatest
import (
"fmt"
"sync"
"testing"
"time"
"github.com/optiopay/kafka/v2"
"github.com/optiopay/kafka/v2/proto"
)
func TestBrokerProducer(t *testing.T) {
broker := NewBroker()
var wg sync.WaitGroup
wg.Add(1)
go readTestMessages(broker, t, &wg)
producer := broker.Producer(kafka.NewProducerConf())
for i := 0; i < 4; i++ {
wg.Add(1)
go produceTestMessage(producer, t, &wg)
}
wg.Wait()
}
func readTestMessages(b *Broker, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
var i int64
for i = 1; i <= 20; i++ {
msg := <-b.produced
if got := len(msg.Messages); got != 1 {
t.Fatalf("expected 1 message, got: %d", got)
}
m := msg.Messages[0]
if m.Offset != i {
t.Errorf("expected offset to be larger: prev: %d, got: %d", i, m.Offset)
}
}
}
func produceTestMessage(p kafka.Producer, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
now := time.Now().UnixNano()
msg := &proto.Message{Value: []byte(fmt.Sprintf("%d", now))}
_, err := p.Produce("my-topic", 0, msg)
if err != nil {
t.Errorf("cannot produce: %s", err)
}
}
}
|