1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
package yarpc
import (
"context"
"encoding/binary"
"github.com/influxdata/yamux"
)
func Invoke(ctx context.Context, api uint16, args interface{}, reply interface{}, cc *ClientConn) error {
stream, err := cc.NewStream()
if err != nil {
// TODO(sgc): convert to RPC error
return err
}
defer stream.Close()
var tmp [2]byte
binary.BigEndian.PutUint16(tmp[:], api)
_, err = stream.Write(tmp[:])
if err != nil {
return err
}
err = sendRequest(ctx, cc.dopts, stream, args)
if err != nil {
return err
}
err = recvResponse(ctx, cc.dopts, stream, reply)
if err != nil {
return err
}
return nil
}
func sendRequest(ctx context.Context, dopts dialOptions, stream *yamux.Stream, args interface{}) error {
outBuf, err := encode(dopts.codec, args)
if err != nil {
return err
}
_, err = stream.Write(outBuf)
return err
}
func recvResponse(ctx context.Context, dopts dialOptions, stream *yamux.Stream, reply interface{}) error {
p := &parser{r: stream}
err := decode(p, dopts.codec, stream, reply)
if err != nil {
return err
}
return nil
}
|