File: jsonrpc2.go

package info (click to toggle)
golang-golang-x-tools 1%3A0.0~git20190125.d66bd3c%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: buster, buster-backports
  • size: 8,912 kB
  • sloc: asm: 1,394; yacc: 155; makefile: 109; sh: 108; ansic: 17; xml: 11
file content (358 lines) | stat: -rw-r--r-- 10,854 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
// https://www.jsonrpc.org/specification
// It is intended to be compatible with other implementations at the wire level.
package jsonrpc2

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// Conn is a JSON RPC 2 client server connection.
// Conn is bidirectional; it does not have a designated server or client end.
type Conn struct {
	handle     Handler
	cancel     Canceler
	log        Logger
	stream     Stream
	done       chan struct{}
	err        error
	seq        int64      // must only be accessed using atomic operations
	pendingMu  sync.Mutex // protects the pending map
	pending    map[ID]chan *Response
	handlingMu sync.Mutex // protects the handling map
	handling   map[ID]handling
}

// Handler is an option you can pass to NewConn to handle incoming requests.
// If the request returns false from IsNotify then the Handler must eventually
// call Reply on the Conn with the supplied request.
// Handlers are called synchronously, they should pass the work off to a go
// routine if they are going to take a long time.
type Handler func(context.Context, *Conn, *Request)

// Canceler is an option you can pass to NewConn which is invoked for
// cancelled outgoing requests.
// The request will have the ID filled in, which can be used to propagate the
// cancel to the other process if needed.
// It is okay to use the connection to send notifications, but the context will
// be in the cancelled state, so you must do it with the background context
// instead.
type Canceler func(context.Context, *Conn, *Request)

// NewErrorf builds a Error struct for the suppied message and code.
// If args is not empty, message and args will be passed to Sprintf.
func NewErrorf(code int64, format string, args ...interface{}) *Error {
	return &Error{
		Code:    code,
		Message: fmt.Sprintf(format, args...),
	}
}

// NewConn creates a new connection object that reads and writes messages from
// the supplied stream and dispatches incoming messages to the supplied handler.
func NewConn(ctx context.Context, s Stream, options ...interface{}) *Conn {
	conn := &Conn{
		stream:   s,
		done:     make(chan struct{}),
		pending:  make(map[ID]chan *Response),
		handling: make(map[ID]handling),
	}
	for _, opt := range options {
		switch opt := opt.(type) {
		case Handler:
			if conn.handle != nil {
				panic("Duplicate Handler function in options list")
			}
			conn.handle = opt
		case Canceler:
			if conn.cancel != nil {
				panic("Duplicate Canceler function in options list")
			}
			conn.cancel = opt
		case Logger:
			if conn.log != nil {
				panic("Duplicate Logger function in options list")
			}
			conn.log = opt
		default:
			panic(fmt.Errorf("Unknown option type %T in options list", opt))
		}
	}
	if conn.handle == nil {
		// the default handler reports a method error
		conn.handle = func(ctx context.Context, c *Conn, r *Request) {
			if r.IsNotify() {
				c.Reply(ctx, r, nil, NewErrorf(CodeMethodNotFound, "method %q not found", r.Method))
			}
		}
	}
	if conn.cancel == nil {
		// the default canceller does nothing
		conn.cancel = func(context.Context, *Conn, *Request) {}
	}
	if conn.log == nil {
		// the default logger does nothing
		conn.log = func(Direction, *ID, time.Duration, string, *json.RawMessage, *Error) {}
	}
	go func() {
		conn.err = conn.run(ctx)
		close(conn.done)
	}()
	return conn
}

// Wait blocks until the connection is terminated, and returns any error that
// cause the termination.
func (c *Conn) Wait(ctx context.Context) error {
	select {
	case <-c.done:
		return c.err
	case <-ctx.Done():
		return ctx.Err()
	}
}

// Cancel cancels a pending Call on the server side.
// The call is identified by its id.
// JSON RPC 2 does not specify a cancel message, so cancellation support is not
// directly wired in. This method allows a higher level protocol to choose how
// to propagate the cancel.
func (c *Conn) Cancel(id ID) {
	c.handlingMu.Lock()
	handling, found := c.handling[id]
	c.handlingMu.Unlock()
	if found {
		handling.cancel()
	}
}

// Notify is called to send a notification request over the connection.
// It will return as soon as the notification has been sent, as no response is
// possible.
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error {
	jsonParams, err := marshalToRaw(params)
	if err != nil {
		return fmt.Errorf("marshalling notify parameters: %v", err)
	}
	request := &Request{
		Method: method,
		Params: jsonParams,
	}
	data, err := json.Marshal(request)
	if err != nil {
		return fmt.Errorf("marshalling notify request: %v", err)
	}
	c.log(Send, nil, -1, request.Method, request.Params, nil)
	return c.stream.Write(ctx, data)
}

// Call sends a request over the connection and then waits for a response.
// If the response is not an error, it will be decoded into result.
// result must be of a type you an pass to json.Unmarshal.
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error {
	jsonParams, err := marshalToRaw(params)
	if err != nil {
		return fmt.Errorf("marshalling call parameters: %v", err)
	}
	// generate a new request identifier
	id := ID{Number: atomic.AddInt64(&c.seq, 1)}
	request := &Request{
		ID:     &id,
		Method: method,
		Params: jsonParams,
	}
	// marshal the request now it is complete
	data, err := json.Marshal(request)
	if err != nil {
		return fmt.Errorf("marshalling call request: %v", err)
	}
	// we have to add ourselves to the pending map before we send, otherwise we
	// are racing the response
	rchan := make(chan *Response)
	c.pendingMu.Lock()
	c.pending[id] = rchan
	c.pendingMu.Unlock()
	defer func() {
		// clean up the pending response handler on the way out
		c.pendingMu.Lock()
		delete(c.pending, id)
		c.pendingMu.Unlock()
	}()
	// now we are ready to send
	before := time.Now()
	c.log(Send, request.ID, -1, request.Method, request.Params, nil)
	if err := c.stream.Write(ctx, data); err != nil {
		// sending failed, we will never get a response, so don't leave it pending
		return err
	}
	// now wait for the response
	select {
	case response := <-rchan:
		elapsed := time.Since(before)
		c.log(Send, response.ID, elapsed, request.Method, response.Result, response.Error)
		// is it an error response?
		if response.Error != nil {
			return response.Error
		}
		if result == nil || response.Result == nil {
			return nil
		}
		if err := json.Unmarshal(*response.Result, result); err != nil {
			return fmt.Errorf("unmarshalling result: %v", err)
		}
		return nil
	case <-ctx.Done():
		// allow the handler to propagate the cancel
		c.cancel(ctx, c, request)
		return ctx.Err()
	}
}

// Reply sends a reply to the given request.
// It is an error to call this if request was not a call.
// You must call this exactly once for any given request.
// If err is set then result will be ignored.
func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error {
	if req.IsNotify() {
		return fmt.Errorf("reply not invoked with a valid call")
	}
	c.handlingMu.Lock()
	handling, found := c.handling[*req.ID]
	if found {
		delete(c.handling, *req.ID)
	}
	c.handlingMu.Unlock()
	if !found {
		return fmt.Errorf("not a call in progress: %v", req.ID)
	}

	elapsed := time.Since(handling.start)
	var raw *json.RawMessage
	if err == nil {
		raw, err = marshalToRaw(result)
	}
	response := &Response{
		Result: raw,
		ID:     req.ID,
	}
	if err != nil {
		if callErr, ok := err.(*Error); ok {
			response.Error = callErr
		} else {
			response.Error = NewErrorf(0, "%s", err)
		}
	}
	data, err := json.Marshal(response)
	if err != nil {
		return err
	}
	c.log(Send, response.ID, elapsed, req.Method, response.Result, response.Error)
	if err = c.stream.Write(ctx, data); err != nil {
		// TODO(iancottrell): if a stream write fails, we really need to shut down
		// the whole stream
		return err
	}
	return nil
}

type handling struct {
	request *Request
	cancel  context.CancelFunc
	start   time.Time
}

// combined has all the fields of both Request and Response.
// We can decode this and then work out which it is.
type combined struct {
	VersionTag VersionTag       `json:"jsonrpc"`
	ID         *ID              `json:"id,omitempty"`
	Method     string           `json:"method"`
	Params     *json.RawMessage `json:"params,omitempty"`
	Result     *json.RawMessage `json:"result,omitempty"`
	Error      *Error           `json:"error,omitempty"`
}

// Run starts a read loop on the supplied reader.
// It must be called exactly once for each Conn.
// It returns only when the reader is closed or there is an error in the stream.
func (c *Conn) run(ctx context.Context) error {
	for {
		// get the data for a message
		data, err := c.stream.Read(ctx)
		if err != nil {
			// the stream failed, we cannot continue
			return err
		}
		// read a combined message
		msg := &combined{}
		if err := json.Unmarshal(data, msg); err != nil {
			// a badly formed message arrived, log it and continue
			// we trust the stream to have isolated the error to just this message
			c.log(Receive, nil, -1, "", nil, NewErrorf(0, "unmarshal failed: %v", err))
			continue
		}
		// work out which kind of message we have
		switch {
		case msg.Method != "":
			// if method is set it must be a request
			request := &Request{
				Method: msg.Method,
				Params: msg.Params,
				ID:     msg.ID,
			}
			if request.IsNotify() {
				c.log(Receive, request.ID, -1, request.Method, request.Params, nil)
				// we have a Notify, forward to the handler in a go routine
				c.handle(ctx, c, request)
			} else {
				// we have a Call, forward to the handler in another go routine
				reqCtx, cancelReq := context.WithCancel(ctx)
				c.handlingMu.Lock()
				c.handling[*request.ID] = handling{
					request: request,
					cancel:  cancelReq,
					start:   time.Now(),
				}
				c.handlingMu.Unlock()
				c.log(Receive, request.ID, -1, request.Method, request.Params, nil)
				c.handle(reqCtx, c, request)
			}
		case msg.ID != nil:
			// we have a response, get the pending entry from the map
			c.pendingMu.Lock()
			rchan := c.pending[*msg.ID]
			if rchan != nil {
				delete(c.pending, *msg.ID)
			}
			c.pendingMu.Unlock()
			// and send the reply to the channel
			response := &Response{
				Result: msg.Result,
				Error:  msg.Error,
				ID:     msg.ID,
			}
			rchan <- response
			close(rchan)
		default:
			c.log(Receive, nil, -1, "", nil, NewErrorf(0, "message not a call, notify or response, ignoring"))
		}
	}
}

func marshalToRaw(obj interface{}) (*json.RawMessage, error) {
	data, err := json.Marshal(obj)
	if err != nil {
		return nil, err
	}
	raw := json.RawMessage(data)
	return &raw, nil
}