File: invoker.go

package info (click to toggle)
gh 2.23.0%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 18,040 kB
  • sloc: asm: 6,813; ansic: 258; sh: 100; makefile: 96
file content (281 lines) | stat: -rw-r--r-- 8,757 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package rpc

// gRPC client implementation to be able to connect to the gRPC server and perform the following operations:
// - Start a remote JupyterLab server

import (
	"context"
	"fmt"
	"net"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/cli/cli/v2/internal/codespaces/rpc/codespace"
	"github.com/cli/cli/v2/internal/codespaces/rpc/jupyter"
	"github.com/cli/cli/v2/internal/codespaces/rpc/ssh"
	"github.com/cli/cli/v2/pkg/liveshare"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
)

const (
	ConnectionTimeout = 5 * time.Second
	requestTimeout    = 30 * time.Second
)

const (
	codespacesInternalPort        = 16634
	codespacesInternalSessionName = "CodespacesInternal"
	clientName                    = "gh"
	connectedEventName            = "connected"
)

type StartSSHServerOptions struct {
	UserPublicKeyFile string
}

type Invoker interface {
	Close() error
	StartJupyterServer(ctx context.Context) (int, string, error)
	RebuildContainer(ctx context.Context, full bool) error
	StartSSHServer(ctx context.Context) (int, string, error)
	StartSSHServerWithOptions(ctx context.Context, options StartSSHServerOptions) (int, string, error)
}

type invoker struct {
	conn            *grpc.ClientConn
	session         liveshare.LiveshareSession
	listener        net.Listener
	jupyterClient   jupyter.JupyterServerHostClient
	codespaceClient codespace.CodespaceHostClient
	sshClient       ssh.SshServerHostClient
	cancelPF        context.CancelFunc
}

// Connects to the internal RPC server and returns a new invoker for it
func CreateInvoker(ctx context.Context, session liveshare.LiveshareSession) (Invoker, error) {
	ctx, cancel := context.WithTimeout(ctx, ConnectionTimeout)
	defer cancel()

	invoker, err := connect(ctx, session)
	if err != nil {
		return nil, fmt.Errorf("error connecting to internal server: %w", err)
	}

	return invoker, nil
}

// Finds a free port to listen on and creates a new RPC invoker that connects to that port
func connect(ctx context.Context, session liveshare.LiveshareSession) (Invoker, error) {
	listener, err := listenTCP()
	if err != nil {
		return nil, err
	}
	localAddress := listener.Addr().String()

	invoker := &invoker{
		session:  session,
		listener: listener,
	}

	// Create a cancelable context to be able to cancel background tasks
	// if we encounter an error while connecting to the gRPC server
	connectctx, cancel := context.WithCancel(context.Background())
	defer func() {
		if err != nil {
			cancel()
		}
	}()

	ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine

	// Ensure we close the port forwarder if we encounter an error
	// or once the gRPC connection is closed. pfcancel is retained
	// to close the PF whenever we close the gRPC connection.
	pfctx, pfcancel := context.WithCancel(connectctx)
	invoker.cancelPF = pfcancel

	// Tunnel the remote gRPC server port to the local port
	go func() {
		fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true)
		ch <- fwd.ForwardToListener(pfctx, listener)
	}()

	var conn *grpc.ClientConn
	go func() {
		// Attempt to connect to the port
		opts := []grpc.DialOption{
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			grpc.WithBlock(),
		}
		conn, err = grpc.DialContext(connectctx, localAddress, opts...)
		ch <- err // nil if we successfully connected
	}()

	// Wait for the connection to be established or for the context to be cancelled
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case err := <-ch:
		if err != nil {
			return nil, err
		}
	}

	invoker.conn = conn
	invoker.jupyterClient = jupyter.NewJupyterServerHostClient(conn)
	invoker.codespaceClient = codespace.NewCodespaceHostClient(conn)
	invoker.sshClient = ssh.NewSshServerHostClient(conn)

	// Send initial connection heartbeat (no need to throw if we fail to get a response from the server)
	_ = invoker.notifyCodespaceOfClientActivity(ctx, connectedEventName)

	// Start the activity heatbeats
	go invoker.heartbeat(pfctx, 1*time.Minute)

	return invoker, nil
}

// Closes the gRPC connection
func (i *invoker) Close() error {
	i.cancelPF()

	// Closing the local listener effectively closes the gRPC connection
	if err := i.listener.Close(); err != nil {
		i.conn.Close() // If we fail to close the listener, explicitly close the gRPC connection and ignore any error
		return fmt.Errorf("failed to close local tcp port listener: %w", err)
	}

	return nil
}

// Appends the authentication token to the gRPC context
func (i *invoker) appendMetadata(ctx context.Context) context.Context {
	return metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer token")
}

// Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser
func (i *invoker) StartJupyterServer(ctx context.Context) (port int, serverUrl string, err error) {
	ctx = i.appendMetadata(ctx)
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	defer cancel()

	response, err := i.jupyterClient.GetRunningServer(ctx, &jupyter.GetRunningServerRequest{})
	if err != nil {
		return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err)
	}

	if !response.Result {
		return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message)
	}

	port, err = strconv.Atoi(response.Port)
	if err != nil {
		return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err)
	}

	return port, response.ServerUrl, err
}

// Rebuilds the container using cached layers by default or from scratch if full is true
func (i *invoker) RebuildContainer(ctx context.Context, full bool) error {
	ctx = i.appendMetadata(ctx)
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	defer cancel()

	// If full is true, we want to pass false to the RPC call to indicate that we want to do a full rebuild
	incremental := !full
	response, err := i.codespaceClient.RebuildContainerAsync(ctx, &codespace.RebuildContainerRequest{Incremental: &incremental})
	if err != nil {
		return fmt.Errorf("failed to invoke rebuild RPC: %w", err)
	}

	if !response.RebuildContainer {
		return fmt.Errorf("couldn't rebuild codespace")
	}

	return nil
}

// Starts a remote SSH server to allow the user to connect to the codespace via SSH
func (i *invoker) StartSSHServer(ctx context.Context) (int, string, error) {
	return i.StartSSHServerWithOptions(ctx, StartSSHServerOptions{})
}

// Starts a remote SSH server to allow the user to connect to the codespace via SSH
func (i *invoker) StartSSHServerWithOptions(ctx context.Context, options StartSSHServerOptions) (int, string, error) {
	ctx = i.appendMetadata(ctx)
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	defer cancel()

	userPublicKey := ""
	if options.UserPublicKeyFile != "" {
		publicKeyBytes, err := os.ReadFile(options.UserPublicKeyFile)
		if err != nil {
			return 0, "", fmt.Errorf("failed to read public key file: %w", err)
		}

		userPublicKey = strings.TrimSpace(string(publicKeyBytes))
	}

	response, err := i.sshClient.StartRemoteServerAsync(ctx, &ssh.StartRemoteServerRequest{UserPublicKey: userPublicKey})
	if err != nil {
		return 0, "", fmt.Errorf("failed to invoke SSH RPC: %w", err)
	}

	if !response.Result {
		return 0, "", fmt.Errorf("failed to start SSH server: %s", response.Message)
	}

	port, err := strconv.Atoi(response.ServerPort)
	if err != nil {
		return 0, "", fmt.Errorf("failed to parse SSH server port: %w", err)
	}

	return port, response.User, nil
}

func listenTCP() (*net.TCPListener, error) {
	addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
	if err != nil {
		return nil, fmt.Errorf("failed to build tcp address: %w", err)
	}
	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err)
	}

	return listener, nil
}

// Periodically check whether there is a reason to keep the connection alive, and if so, notify the codespace to do so
func (i *invoker) heartbeat(ctx context.Context, interval time.Duration) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			reason := i.session.GetKeepAliveReason()
			_ = i.notifyCodespaceOfClientActivity(ctx, reason)
		}
	}
}

func (i *invoker) notifyCodespaceOfClientActivity(ctx context.Context, activity string) error {
	ctx = i.appendMetadata(ctx)
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	defer cancel()

	_, err := i.codespaceClient.NotifyCodespaceOfClientActivity(ctx, &codespace.NotifyCodespaceOfClientActivityRequest{ClientId: clientName, ClientActivities: []string{activity}})
	if err != nil {
		return fmt.Errorf("failed to invoke notify RPC: %w", err)
	}

	return nil
}