File: roundtrip.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (28 lines) | stat: -rw-r--r-- 740 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package protocol

import (
	"io"
)

// RoundTrip sends a request to a kafka broker and returns the response.
func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, req Message) (Message, error) {
	if err := WriteRequest(rw, apiVersion, correlationID, clientID, req); err != nil {
		return nil, err
	}
	if !hasResponse(req) {
		return nil, nil
	}
	id, res, err := ReadResponse(rw, req.ApiKey(), apiVersion)
	if err != nil {
		return nil, err
	}
	if id != correlationID {
		return nil, Errorf("correlation id mismatch (expected=%d, found=%d)", correlationID, id)
	}
	return res, nil
}

func hasResponse(msg Message) bool {
	x, _ := msg.(interface{ HasResponse() bool })
	return x == nil || x.HasResponse()
}