File: lsprpc.go

package info (click to toggle)
golang-github-cue-lang-cue 0.14.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,644 kB
  • sloc: makefile: 20; sh: 15
file content (335 lines) | stat: -rw-r--r-- 10,845 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package lsprpc implements a jsonrpc2.StreamServer that may be used to
// serve the LSP on a jsonrpc2 channel.
package lsprpc

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"
	"os"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"cuelang.org/go/internal/golangorgx/gopls/protocol"
	"cuelang.org/go/internal/golangorgx/gopls/protocol/command"
	"cuelang.org/go/internal/golangorgx/gopls/settings"
	"cuelang.org/go/internal/golangorgx/tools/event"
	"cuelang.org/go/internal/golangorgx/tools/event/tag"
	"cuelang.org/go/internal/golangorgx/tools/jsonrpc2"
	"cuelang.org/go/internal/lsp/cache"
	"cuelang.org/go/internal/lsp/server"
)

// Unique identifiers for client/server.
var serverIndex int64

// The streamServer type is a jsonrpc2.streamServer that handles incoming
// streams as a new LSP session, using a shared cache.
type streamServer struct {
	cache *cache.Cache
	// daemon controls whether or not to log new connections.
	daemon bool

	// optionsOverrides is passed to newly created workspaces.
	optionsOverrides func(*settings.Options)

	// serverForTest may be set to a test fake for testing.
	serverForTest server.ServerWithID
}

// NewStreamServer creates a StreamServer using the shared cache.
func NewStreamServer(cache *cache.Cache, daemon bool, optionsFunc func(*settings.Options)) jsonrpc2.StreamServer {
	return &streamServer{cache: cache, daemon: daemon, optionsOverrides: optionsFunc}
}

// ServeStream implements the jsonrpc2.StreamServer interface, by handling
// incoming streams using a new lsp server.
func (s *streamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) error {
	client := protocol.ClientDispatcher(conn)
	svr := s.serverForTest
	if svr == nil {
		options := settings.DefaultOptions(s.optionsOverrides)
		svr = server.New(s.cache, client, options)
	}
	svrID := svr.ID()
	// Clients may or may not send a shutdown message. Make sure the server is
	// shut down.
	//
	// TODO(ms): temporarily disabled because it introduces a
	// data-race: this is a moment of genuine concurrency. It would be
	// much better to inject the shutdown message onto the end of the
	// jsonrpc2 stream somehow. For now, there's nothing important to
	// do for shutdown, so disabling this is fine, and it solves the
	// data-race. It's possible we could get away with moving the
	// <-conn.Done() call within the defer, prior to the Shutdown call
	// - that would provide the necessary memory barries. TBD.
	//
	// defer svr.Shutdown(ctx)
	ctx = protocol.WithClient(ctx, client)
	conn.Go(ctx,
		protocol.Handlers(
			handshaker(svrID, s.daemon,
				protocol.ServerHandler(svr, jsonrpc2.MethodNotFound))))
	if s.daemon {
		log.Printf("Server %s: connected", svrID)
		defer log.Printf("Server %s: exited", svrID)
	}
	<-conn.Done()
	return conn.Err()
}

// A forwarder is a jsonrpc2.StreamServer that handles an LSP stream
// by forwarding it to a remote. This is used when the cuelsp process
// started by the editor is in the `-remote` mode, which means it
// finds and connects to a separate cuelsp daemon. In these cases, we
// still want the forwarder cuelsp to in some cases hijack the
// jsonrpc2 connection with the daemon.
type forwarder struct {
	dialer *autoDialer

	mu sync.Mutex
	// Hold on to the server connection so that we can redo the handshake if any
	// information changes.
	serverConn jsonrpc2.Conn
	serverID   string
}

// NewForwarder creates a new forwarder (a [jsonrpc2.StreamServer]),
// ready to forward connections to the
// remote server specified by rawAddr. If provided and rawAddr indicates an
// 'automatic' address (starting with 'auto;'), argFunc may be used to start a
// remote server for the auto-discovered address.
func NewForwarder(rawAddr string, argFunc func(network, address string) []string) (jsonrpc2.StreamServer, error) {
	dialer, err := newAutoDialer(rawAddr, argFunc)
	if err != nil {
		return nil, err
	}
	fwd := &forwarder{
		dialer: dialer,
	}
	return fwd, nil
}

// QueryServerState returns a JSON-encodable struct describing the state of the named server.
func QueryServerState(ctx context.Context, addr string) (any, error) {
	serverConn, err := dialRemote(ctx, addr)
	if err != nil {
		return nil, err
	}
	var state serverState
	if err := protocol.Call(ctx, serverConn, serversMethod, nil, &state); err != nil {
		return nil, fmt.Errorf("querying server state: %w", err)
	}
	return &state, nil
}

// dialRemote is used for making calls into the cuelsp daemon. addr should be a
// URL, possibly on the synthetic 'auto' network (e.g. tcp://..., unix://...,
// or auto://...).
func dialRemote(ctx context.Context, addr string) (jsonrpc2.Conn, error) {
	network, address := ParseAddr(addr)
	if network == autoNetwork {
		cuePath, err := os.Executable()
		if err != nil {
			return nil, fmt.Errorf("getting cue path: %w", err)
		}
		network, address = autoNetworkAddress(cuePath, address)
	}
	netConn, err := net.DialTimeout(network, address, 5*time.Second)
	if err != nil {
		return nil, fmt.Errorf("dialing remote: %w", err)
	}
	serverConn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn))
	serverConn.Go(ctx, jsonrpc2.MethodNotFound)
	return serverConn, nil
}

// ExecuteCommand connects to the named server, sends it a
// workspace/executeCommand request (with command 'id' and arguments
// JSON encoded in 'request'), and populates the result variable.
func ExecuteCommand(ctx context.Context, addr string, id string, request, result any) error {
	serverConn, err := dialRemote(ctx, addr)
	if err != nil {
		return err
	}
	args, err := command.MarshalArgs(request)
	if err != nil {
		return err
	}
	params := protocol.ExecuteCommandParams{
		Command:   id,
		Arguments: args,
	}
	return protocol.Call(ctx, serverConn, "workspace/executeCommand", params, result)
}

// ServeStream dials the forwarder remote and binds the remote to serve the LSP
// on the incoming stream.
func (f *forwarder) ServeStream(ctx context.Context, clientConn jsonrpc2.Conn) error {
	client := protocol.ClientDispatcher(clientConn)

	netConn, err := f.dialer.dialNet(ctx)
	if err != nil {
		return fmt.Errorf("forwarder: connecting to remote: %w", err)
	}
	serverConn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn))
	server := protocol.ServerDispatcher(serverConn)

	// Forward between connections.
	serverConn.Go(ctx,
		protocol.Handlers(
			protocol.ClientHandler(client,
				jsonrpc2.MethodNotFound)))

	// Don't run the clientConn yet, so that we can complete the handshake before
	// processing any client messages.

	// Do a handshake with the server instance to exchange debug information.
	index := atomic.AddInt64(&serverIndex, 1)
	f.mu.Lock()
	f.serverConn = serverConn
	f.serverID = strconv.FormatInt(index, 10)
	f.mu.Unlock()
	f.handshake(ctx)
	clientConn.Go(ctx,
		protocol.Handlers(
			protocol.ServerHandler(server, jsonrpc2.MethodNotFound)))

	select {
	case <-serverConn.Done():
		clientConn.Close()
	case <-clientConn.Done():
		serverConn.Close()
	}

	err = nil
	if serverConn.Err() != nil {
		err = fmt.Errorf("remote disconnected: %v", serverConn.Err())
	} else if clientConn.Err() != nil {
		err = fmt.Errorf("client disconnected: %v", clientConn.Err())
	}
	event.Log(ctx, fmt.Sprintf("forwarder: exited with error: %v", err))
	return err
}

// TODO(rfindley): remove this handshaking in favor of middleware.
func (f *forwarder) handshake(ctx context.Context) {
	// This call to os.Executable is redundant, and will be eliminated by the
	// transition to the V2 API.
	hreq := handshakeRequest{ServerID: f.serverID}
	var hresp handshakeResponse
	if err := protocol.Call(ctx, f.serverConn, handshakeMethod, hreq, &hresp); err != nil {
		// TODO(rfindley): at some point in the future we should return an error
		// here.  Handshakes have become functional in nature.
		event.Error(ctx, "forwarder: cuelsp handshake failed", err)
	}
	event.Log(ctx, "New server",
		tag.NewServer.Of(f.serverID),
		tag.ServerID.Of(hresp.ServerID),
	)
}

func ConnectToRemote(ctx context.Context, addr string) (net.Conn, error) {
	dialer, err := newAutoDialer(addr, nil)
	if err != nil {
		return nil, err
	}
	return dialer.dialNet(ctx)
}

// A handshakeRequest identifies a client to the LSP server.
type handshakeRequest struct {
	// ServerID is the ID of the server on the client. This should
	// usually be 0.
	ServerID string `json:"serverID"`
}

// A handshakeResponse is returned by the LSP server to tell the LSP
// client information about its server.
type handshakeResponse struct {
	// ServerID is the server server associated with the client.
	ServerID string `json:"serverID"`
}

// clientServer identifies a current client LSP server on the
// server.
type clientServer struct {
	ServerID string `json:"serverID"`
}

// serverState holds information about the cuelsp daemon process.
type serverState struct {
	CurrentServerID string `json:"currentServerID"`
}

const (
	handshakeMethod = "cuelsp/handshake"
	serversMethod   = "cuelsp/servers"
)

func handshaker(svrID string, logHandshakes bool, handler jsonrpc2.Handler) jsonrpc2.Handler {
	return func(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2.Request) error {
		switch r.Method() {
		case handshakeMethod:
			// We log.Printf in this handler, rather than event.Log when we want logs
			// to go to the daemon log rather than being reflected back to the
			// client.
			var req handshakeRequest
			if err := json.Unmarshal(r.Params(), &req); err != nil {
				if logHandshakes {
					log.Printf("Error processing handshake for server %s: %v", svrID, err)
				}
				sendError(ctx, reply, err)
				return nil
			}
			if logHandshakes {
				log.Printf("Server %s: got handshake.", svrID)
			}
			event.Log(ctx, "Handshake server update",
				tag.ServerID.Of(req.ServerID),
			)
			resp := handshakeResponse{
				ServerID: svrID,
			}
			return reply(ctx, resp, nil)

		case serversMethod:
			resp := serverState{
				CurrentServerID: svrID,
			}
			return reply(ctx, resp, nil)
		}
		return handler(ctx, reply, r)
	}
}

func sendError(ctx context.Context, reply jsonrpc2.Replier, err error) {
	err = fmt.Errorf("%v: %w", err, jsonrpc2.ErrParse)
	if err := reply(ctx, nil, err); err != nil {
		event.Error(ctx, "", err)
	}
}

// ParseAddr parses the address of a cuelsp remote.
// TODO(rFindley): further document this syntax, and allow URI-style remote
// addresses such as "auto://...".
func ParseAddr(listen string) (network string, address string) {
	// Allow passing just -remote=auto, as a shorthand for using automatic remote
	// resolution.
	if listen == autoNetwork {
		return autoNetwork, ""
	}
	if parts := strings.SplitN(listen, ";", 2); len(parts) == 2 {
		return parts[0], parts[1]
	}
	return "tcp", listen
}