File: celrpc.go

package info (click to toggle)
golang-github-google-cel-spec 0.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 876 kB
  • sloc: sh: 11; makefile: 8
file content (254 lines) | stat: -rw-r--r-- 7,309 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// Package celrpc defines CEL conformance service RPC helpers.
package celrpc

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"os/exec"
	"strings"

	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"

	confpb "google.golang.org/genproto/googleapis/api/expr/conformance/v1alpha1"
)

// ConfClient manages calls to conformance test services.
type ConfClient interface {
	confpb.ConformanceServiceClient
	// Shutdown deallocates all resources associated with the client.
	// No further calls should be made on the client after shutdown.
	// Shutdown should be called even on an error return.
	Shutdown()
}

// gRPC conformance service client
type grpcConfClient struct {
	confpb.ConformanceServiceClient
	cmd  *exec.Cmd
	conn *grpc.ClientConn
}

// pipe conformance client uses the following protocol:
//   * two lines are sent over input
//   * first input line is "parse", "check", or "eval"
//   * second input line is JSON of the corresponding request
//   * one output line is expected, repeat again.
type pipeConfClient struct {
	cmd    *exec.Cmd
	stdOut *bufio.Reader
	stdIn  io.Writer
}

// NewGrpcClient creates a new gRPC ConformanceService client. A server binary
// is launched given the command line serverCmd. The spawned server shares the
// current process's stderr, so its log messages will be visible.
// The caller must call Shutdown() on the retured ConfClient, even if
// NewGrpcClient() returns a non-nil error.
func NewGrpcClient(serverCmd string) (ConfClient, error) {
	c := grpcConfClient{}

	fields := strings.Fields(serverCmd)
	cmd := exec.Command(fields[0], fields[1:]...)
	out, err := cmd.StdoutPipe()
	if err != nil {
		return &c, err
	}
	cmd.Stderr = os.Stderr // share our error stream

	err = cmd.Start()
	if err != nil {
		return &c, err
	}
	// Only assign cmd for stopping if it has successfully started.
	c.cmd = cmd

	// Expect a port only with gRPC
	var addr string
	_, err = fmt.Fscanf(out, "Listening on %s\n", &addr)
	out.Close()
	if err != nil {
		return &c, err
	}

	conn, err := grpc.Dial(addr, grpc.WithInsecure())
	if err != nil {
		return &c, err
	}
	c.conn = conn
	c.ConformanceServiceClient = confpb.NewConformanceServiceClient(conn)
	return &c, nil
}

// ExampleNewGrpcClient creates a new CEL RPC client using a path to a server binary.
// TODO Run from celrpc_test.go.
func ExampleNewGrpcClient() {
	c, err := NewGrpcClient("/path/to/server/binary")
	defer c.Shutdown()
	if err != nil {
		log.Fatal("Couldn't create client")
	}
	parseRequest := confpb.ParseRequest{
		CelSource: "1 + 1",
	}
	parseResponse, err := c.Parse(context.Background(), &parseRequest)
	if err != nil {
		log.Fatal("Couldn't parse")
	}
	parsedExpr := parseResponse.ParsedExpr
	evalRequest := confpb.EvalRequest{
		ExprKind: &confpb.EvalRequest_ParsedExpr{ParsedExpr: parsedExpr},
	}
	evalResponse, err := c.Eval(context.Background(), &evalRequest)
	if err != nil {
		log.Fatal("Couldn't eval")
	}
	fmt.Printf("1 + 1 is %v\n", evalResponse.Result.GetValue().GetInt64Value())
}

// NewPipeClient launches a server binary using the provided serverCmd
// command line. The spawned server shares the current process's stderr, so its
// log messages will be visible. stdin and stdout are used for communication.
// The caller must call Shutdown() on the retured ConfClient, even if the
// method returns a non-nil error.
func NewPipeClient(serverCmd string) (ConfClient, error) {
	c := pipeConfClient{}

	fields := strings.Fields(serverCmd)
	cmd := exec.Command(fields[0], fields[1:]...)
	out, err := cmd.StdoutPipe()
	if err != nil {
		return &c, err
	}
	c.stdIn, err = cmd.StdinPipe()
	if err != nil {
		return &c, err
	}
	cmd.Stderr = os.Stderr // share our error stream

	err = cmd.Start()
	if err != nil {
		return &c, err
	}
	// Only assign cmd for stopping if it has successfully started.
	c.cmd = cmd
	c.stdOut = bufio.NewReader(out)
	return &c, nil
}

// ExampleNewPipeClient creates a new CEL pipe client using a path to a server binary.
// TODO Run from celrpc_test.go.
func ExampleNewPipeClient() {
	c, err := NewPipeClient("/path/to/server/binary")
	defer c.Shutdown()
	if err != nil {
		log.Fatal("Couldn't create client")
	}
	parseRequest := confpb.ParseRequest{
		CelSource: "1 + 1",
	}
	parseResponse, err := c.Parse(context.Background(), &parseRequest)
	if err != nil {
		log.Fatal("Couldn't parse")
	}
	parsedExpr := parseResponse.ParsedExpr
	evalRequest := confpb.EvalRequest{
		ExprKind: &confpb.EvalRequest_ParsedExpr{ParsedExpr: parsedExpr},
	}
	evalResponse, err := c.Eval(context.Background(), &evalRequest)
	if err != nil {
		log.Fatal("Couldn't eval")
	}
	fmt.Printf("1 + 1 is %v\n", evalResponse.Result.GetValue().GetInt64Value())
}

func (c *pipeConfClient) pipeCommand(cmd string, in proto.Message, out proto.Message) error {
	if _, err := c.stdIn.Write([]byte(cmd + "\n")); err != nil {
		return err
	}
	jsonInput := protojson.MarshalOptions{}.Format(in)
	if _, err := c.stdIn.Write([]byte(jsonInput + "\n")); err != nil {
		return err
	}
	jsonOutput, err := c.stdOut.ReadBytes('\n')
	if err != nil {
		return err
	}
	return protojson.Unmarshal(jsonOutput, out)
}

// Parse implements a gRPC client stub with both pipe and gRPC
func (c *pipeConfClient) Parse(ctx context.Context, in *confpb.ParseRequest, opts ...grpc.CallOption) (*confpb.ParseResponse, error) {
	out := &confpb.ParseResponse{}
	err := c.pipeCommand("parse", in, out)
	return out, err
}

// Check implements a gRPC client stub with both pipe and gRPC
func (c *pipeConfClient) Check(ctx context.Context, in *confpb.CheckRequest, opts ...grpc.CallOption) (*confpb.CheckResponse, error) {
	out := &confpb.CheckResponse{}
	err := c.pipeCommand("check", in, out)
	return out, err
}

// Eval implements a gRPC client stub with both pipe and gRPC
func (c *pipeConfClient) Eval(ctx context.Context, in *confpb.EvalRequest, opts ...grpc.CallOption) (*confpb.EvalResponse, error) {
	out := &confpb.EvalResponse{}
	err := c.pipeCommand("eval", in, out)
	return out, err
}

// Shutdown implements the interface stub.
func (c *pipeConfClient) Shutdown() {
	if c.cmd != nil {
		c.cmd.Process.Kill()
		c.cmd.Wait()
		c.cmd = nil
	}
}

// Shutdown implements the interface stub.
func (c *grpcConfClient) Shutdown() {
	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}
	if c.cmd != nil {
		c.cmd.Process.Kill()
		c.cmd.Wait()
		c.cmd = nil
	}
}

// RunServer listens on a dynamically-allocated port on the loopback
// network device, prints its address and port to stdout, then starts
// a gRPC server on the socket with the given service callbacks.
// Note that this call doesn't return until ther server exits.
func RunServer(service confpb.ConformanceServiceServer) {
	lis, err := net.Listen("tcp4", "127.0.0.1:")
	if err != nil {
		lis, err = net.Listen("tcp6", "[::1]:0")
		if err != nil {
			log.Fatalf("failed to listen: %v", err)
		}
	}

	// Must print to stdout, so the client can find the port.
	// So, no, this must be 'fmt', not 'log'.
	fmt.Printf("Listening on %v\n", lis.Addr())
	os.Stdout.Sync()

	s := grpc.NewServer()
	confpb.RegisterConformanceServiceServer(s, service)
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}