1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
package kafkatest
import (
"errors"
"fmt"
"reflect"
"time"
"github.com/optiopay/kafka/v2"
"github.com/optiopay/kafka/v2/proto"
)
func ExampleBroker_Producer() {
broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}
producer := broker.Producer(kafka.NewProducerConf())
// mock server actions, handling any produce call
go func() {
resp, err := broker.ReadProducers(time.Millisecond * 20)
if err != nil {
panic(fmt.Sprintf("failed reading producers: %s", err))
}
if len(resp.Messages) != 1 {
panic("expected single message")
}
if !reflect.DeepEqual(resp.Messages[0], msg) {
panic("expected different message")
}
}()
// provide data for above goroutine
_, err := producer.Produce("my-topic", 0, msg)
if err != nil {
panic(fmt.Sprintf("cannot produce message: %s", err))
}
mockProducer := producer.(*Producer)
// test error handling by forcing producer to return error,
//
// it is possible to manipulate produce result by changing producer's
// ResponseOffset and ResponseError attributes
mockProducer.ResponseError = errors.New("my spoon is too big!")
_, err = producer.Produce("my-topic", 0, msg)
fmt.Printf("Error: %s\n", err)
// output:
//
// Error: my spoon is too big!
}
func ExampleBroker_Consumer() {
broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}
// mock server actions, pushing data through consumer
go func() {
consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))
c := consumer.(*Consumer)
// it is possible to send messages through consumer...
c.Messages <- msg
// every consumer fetch call is blocking untill there is either message
// or error ready to return, this way we can test slow consumers
time.Sleep(time.Millisecond * 20)
// ...as well as push errors to mock failure
c.Errors <- errors.New("expected error is expected")
}()
// test broker never fails creating consumer
consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))
m, err := consumer.Consume()
if err == nil {
fmt.Printf("Value: %q\n", m.Value)
}
if _, err = consumer.Consume(); err != nil {
fmt.Printf("Error: %s\n", err)
}
// output:
//
// Value: "first"
// Error: expected error is expected
}
func ExampleServer() {
// symulate server latency for all fetch requests
delayFetch := func(nodeID int32, reqKind int16, content []byte) Response {
if reqKind != proto.FetchReqKind {
return nil
}
time.Sleep(time.Millisecond * 500)
return nil
}
server := NewServer(delayFetch)
server.MustSpawn()
defer func() {
_ = server.Close()
}()
fmt.Printf("running server: %s", server.Addr())
server.AddMessages("my-topic", 0,
&proto.Message{Value: []byte("first")},
&proto.Message{Value: []byte("second")})
// connect to server using broker and fetch/write messages
}
|