1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
package rpc // import "collectd.org/rpc"
import (
"context"
"io"
"log"
"collectd.org/api"
pb "collectd.org/rpc/proto"
"google.golang.org/grpc"
)
// Type client implements rpc.Interface using a gRPC stub.
type client struct {
pb.CollectdClient
}
// Newclient returns a wrapper around the gRPC client connection that maps
// between the Go interface and the gRPC interface.
func NewClient(conn *grpc.ClientConn) Interface {
return &client{
CollectdClient: pb.NewCollectdClient(conn),
}
}
// Query maps its arguments to a QueryValuesRequest object and calls
// QueryValues. The response is parsed by a goroutine and written to the
// returned channel.
func (c *client) Query(ctx context.Context, id *api.Identifier) (<-chan *api.ValueList, error) {
stream, err := c.QueryValues(ctx, &pb.QueryValuesRequest{
Identifier: MarshalIdentifier(id),
})
if err != nil {
return nil, err
}
ch := make(chan *api.ValueList, 16)
go func() {
defer close(ch)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("error while receiving value lists: %v", err)
return
}
vl, err := UnmarshalValueList(res.GetValueList())
if err != nil {
log.Printf("received malformed response: %v", err)
continue
}
select {
case ch <- vl:
continue
case <-stream.Context().Done():
break
}
}
}()
return ch, nil
}
// Write maps its arguments to a PutValuesRequest and calls PutValues.
func (c *client) Write(ctx context.Context, vl *api.ValueList) error {
pbVL, err := MarshalValueList(vl)
if err != nil {
return err
}
stream, err := c.PutValues(ctx)
if err != nil {
return err
}
req := &pb.PutValuesRequest{
ValueList: pbVL,
}
if err := stream.Send(req); err != nil {
stream.CloseSend()
return err
}
_, err = stream.CloseAndRecv()
return err
}
|