1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
|
package rpc
// gRPC client implementation to be able to connect to the gRPC server and perform the following operations:
// - Start a remote JupyterLab server
import (
"context"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/cli/cli/v2/internal/codespaces/rpc/codespace"
"github.com/cli/cli/v2/internal/codespaces/rpc/jupyter"
"github.com/cli/cli/v2/internal/codespaces/rpc/ssh"
"github.com/cli/cli/v2/pkg/liveshare"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
const (
ConnectionTimeout = 5 * time.Second
requestTimeout = 30 * time.Second
)
const (
codespacesInternalPort = 16634
codespacesInternalSessionName = "CodespacesInternal"
clientName = "gh"
connectedEventName = "connected"
)
type StartSSHServerOptions struct {
UserPublicKeyFile string
}
type Invoker interface {
Close() error
StartJupyterServer(ctx context.Context) (int, string, error)
RebuildContainer(ctx context.Context, full bool) error
StartSSHServer(ctx context.Context) (int, string, error)
StartSSHServerWithOptions(ctx context.Context, options StartSSHServerOptions) (int, string, error)
}
type invoker struct {
conn *grpc.ClientConn
session liveshare.LiveshareSession
listener net.Listener
jupyterClient jupyter.JupyterServerHostClient
codespaceClient codespace.CodespaceHostClient
sshClient ssh.SshServerHostClient
cancelPF context.CancelFunc
}
// Connects to the internal RPC server and returns a new invoker for it
func CreateInvoker(ctx context.Context, session liveshare.LiveshareSession) (Invoker, error) {
ctx, cancel := context.WithTimeout(ctx, ConnectionTimeout)
defer cancel()
invoker, err := connect(ctx, session)
if err != nil {
return nil, fmt.Errorf("error connecting to internal server: %w", err)
}
return invoker, nil
}
// Finds a free port to listen on and creates a new RPC invoker that connects to that port
func connect(ctx context.Context, session liveshare.LiveshareSession) (Invoker, error) {
listener, err := listenTCP()
if err != nil {
return nil, err
}
localAddress := listener.Addr().String()
invoker := &invoker{
session: session,
listener: listener,
}
// Create a cancelable context to be able to cancel background tasks
// if we encounter an error while connecting to the gRPC server
connectctx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()
ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine
// Ensure we close the port forwarder if we encounter an error
// or once the gRPC connection is closed. pfcancel is retained
// to close the PF whenever we close the gRPC connection.
pfctx, pfcancel := context.WithCancel(connectctx)
invoker.cancelPF = pfcancel
// Tunnel the remote gRPC server port to the local port
go func() {
fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true)
ch <- fwd.ForwardToListener(pfctx, listener)
}()
var conn *grpc.ClientConn
go func() {
// Attempt to connect to the port
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err = grpc.DialContext(connectctx, localAddress, opts...)
ch <- err // nil if we successfully connected
}()
// Wait for the connection to be established or for the context to be cancelled
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-ch:
if err != nil {
return nil, err
}
}
invoker.conn = conn
invoker.jupyterClient = jupyter.NewJupyterServerHostClient(conn)
invoker.codespaceClient = codespace.NewCodespaceHostClient(conn)
invoker.sshClient = ssh.NewSshServerHostClient(conn)
// Send initial connection heartbeat (no need to throw if we fail to get a response from the server)
_ = invoker.notifyCodespaceOfClientActivity(ctx, connectedEventName)
// Start the activity heatbeats
go invoker.heartbeat(pfctx, 1*time.Minute)
return invoker, nil
}
// Closes the gRPC connection
func (i *invoker) Close() error {
i.cancelPF()
// Closing the local listener effectively closes the gRPC connection
if err := i.listener.Close(); err != nil {
i.conn.Close() // If we fail to close the listener, explicitly close the gRPC connection and ignore any error
return fmt.Errorf("failed to close local tcp port listener: %w", err)
}
return nil
}
// Appends the authentication token to the gRPC context
func (i *invoker) appendMetadata(ctx context.Context) context.Context {
return metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer token")
}
// Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser
func (i *invoker) StartJupyterServer(ctx context.Context) (port int, serverUrl string, err error) {
ctx = i.appendMetadata(ctx)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
response, err := i.jupyterClient.GetRunningServer(ctx, &jupyter.GetRunningServerRequest{})
if err != nil {
return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err)
}
if !response.Result {
return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message)
}
port, err = strconv.Atoi(response.Port)
if err != nil {
return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err)
}
return port, response.ServerUrl, err
}
// Rebuilds the container using cached layers by default or from scratch if full is true
func (i *invoker) RebuildContainer(ctx context.Context, full bool) error {
ctx = i.appendMetadata(ctx)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
// If full is true, we want to pass false to the RPC call to indicate that we want to do a full rebuild
incremental := !full
response, err := i.codespaceClient.RebuildContainerAsync(ctx, &codespace.RebuildContainerRequest{Incremental: &incremental})
if err != nil {
return fmt.Errorf("failed to invoke rebuild RPC: %w", err)
}
if !response.RebuildContainer {
return fmt.Errorf("couldn't rebuild codespace")
}
return nil
}
// Starts a remote SSH server to allow the user to connect to the codespace via SSH
func (i *invoker) StartSSHServer(ctx context.Context) (int, string, error) {
return i.StartSSHServerWithOptions(ctx, StartSSHServerOptions{})
}
// Starts a remote SSH server to allow the user to connect to the codespace via SSH
func (i *invoker) StartSSHServerWithOptions(ctx context.Context, options StartSSHServerOptions) (int, string, error) {
ctx = i.appendMetadata(ctx)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
userPublicKey := ""
if options.UserPublicKeyFile != "" {
publicKeyBytes, err := os.ReadFile(options.UserPublicKeyFile)
if err != nil {
return 0, "", fmt.Errorf("failed to read public key file: %w", err)
}
userPublicKey = strings.TrimSpace(string(publicKeyBytes))
}
response, err := i.sshClient.StartRemoteServerAsync(ctx, &ssh.StartRemoteServerRequest{UserPublicKey: userPublicKey})
if err != nil {
return 0, "", fmt.Errorf("failed to invoke SSH RPC: %w", err)
}
if !response.Result {
return 0, "", fmt.Errorf("failed to start SSH server: %s", response.Message)
}
port, err := strconv.Atoi(response.ServerPort)
if err != nil {
return 0, "", fmt.Errorf("failed to parse SSH server port: %w", err)
}
return port, response.User, nil
}
func listenTCP() (*net.TCPListener, error) {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to build tcp address: %w", err)
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err)
}
return listener, nil
}
// Periodically check whether there is a reason to keep the connection alive, and if so, notify the codespace to do so
func (i *invoker) heartbeat(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reason := i.session.GetKeepAliveReason()
_ = i.notifyCodespaceOfClientActivity(ctx, reason)
}
}
}
func (i *invoker) notifyCodespaceOfClientActivity(ctx context.Context, activity string) error {
ctx = i.appendMetadata(ctx)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
_, err := i.codespaceClient.NotifyCodespaceOfClientActivity(ctx, &codespace.NotifyCodespaceOfClientActivityRequest{ClientId: clientName, ClientActivities: []string{activity}})
if err != nil {
return fmt.Errorf("failed to invoke notify RPC: %w", err)
}
return nil
}
|