File: server.go

package info (click to toggle)
golang-collectd 0.3.0%2Bgit20181025.f80706d-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 312 kB
  • sloc: makefile: 3
file content (78 lines) | stat: -rw-r--r-- 1,708 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package rpc // import "collectd.org/rpc"

import (
	"context"
	"io"

	pb "collectd.org/rpc/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

// RegisterServer registers the implementation srv with the gRPC instance s.
func RegisterServer(s *grpc.Server, srv Interface) {
	pb.RegisterCollectdServer(s, &server{
		Interface: srv,
	})
}

// Type server implements pb.CollectdServer using the Go implementation of
// rpc.Interface.
type server struct {
	Interface
}

// PutValues reads ValueLists from stream and calls the Write() implementation
// on each one.
func (s *server) PutValues(stream pb.Collectd_PutValuesServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}

		vl, err := UnmarshalValueList(req.GetValueList())
		if err != nil {
			return err
		}

		if err := s.Write(stream.Context(), vl); err != nil {
			return grpc.Errorf(codes.Internal, "Write(%v): %v", vl, err)
		}
	}

	return stream.SendAndClose(&pb.PutValuesResponse{})
}

// QueryValues calls the Query() implementation and streams all ValueLists from
// the channel back to the client.
func (s *server) QueryValues(req *pb.QueryValuesRequest, stream pb.Collectd_QueryValuesServer) error {
	id := UnmarshalIdentifier(req.GetIdentifier())

	ctx, cancel := context.WithCancel(stream.Context())
	defer cancel()

	ch, err := s.Query(ctx, id)
	if err != nil {
		return grpc.Errorf(codes.Internal, "Query(%v): %v", id, err)
	}

	for vl := range ch {
		pbVL, err := MarshalValueList(vl)
		if err != nil {
			return err
		}

		res := &pb.QueryValuesResponse{
			ValueList: pbVL,
		}
		if err := stream.Send(res); err != nil {
			return err
		}
	}

	return nil
}