File: migration_connection.go

package info (click to toggle)
incus 6.0.5-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 25,788 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (177 lines) | stat: -rw-r--r-- 4,394 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main

import (
	"context"
	"crypto/x509"
	"encoding/pem"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"

	"github.com/gorilla/websocket"

	"github.com/lxc/incus/v6/shared/api"
	"github.com/lxc/incus/v6/shared/logger"
	"github.com/lxc/incus/v6/shared/tcp"
	localtls "github.com/lxc/incus/v6/shared/tls"
	"github.com/lxc/incus/v6/shared/ws"
)

// setupWebsocketDialer uses a certificate to parse and configure a websocket.Dialer.
func setupWebsocketDialer(certificate string) (*websocket.Dialer, error) {
	var err error
	var cert *x509.Certificate

	if certificate != "" {
		certBlock, _ := pem.Decode([]byte(certificate))
		if certBlock == nil {
			return nil, errors.New("Failed PEM decoding certificate")
		}

		cert, err = x509.ParseCertificate(certBlock.Bytes)
		if err != nil {
			return nil, fmt.Errorf("Failed parsing certificate: %w", err)
		}
	}

	config, err := localtls.GetTLSConfig(cert)
	if err != nil {
		return nil, fmt.Errorf("Failed configuring TLS: %w", err)
	}

	dialer := &websocket.Dialer{
		TLSClientConfig:  config,
		NetDialContext:   localtls.RFC3493Dialer,
		HandshakeTimeout: time.Second * 5,
	}

	return dialer, nil
}

// newMigrationConn configures a new migration connection handler.
func newMigrationConn(secret string, outgoingDialer *websocket.Dialer, outgoingURL *url.URL) *migrationConn {
	return &migrationConn{
		secret:         secret,
		outgoingDialer: outgoingDialer,
		outgoingURL:    outgoingURL,
		connected:      make(chan struct{}),
	}
}

// migrationConn represents a handler for both accepting and making new migration connections.
type migrationConn struct {
	mu             sync.Mutex
	secret         string
	outgoingDialer *websocket.Dialer
	outgoingURL    *url.URL
	conn           *websocket.Conn
	connected      chan struct{}
	disconnected   bool
}

// Secret returns the secret for this connection.
func (c *migrationConn) Secret() string {
	return c.secret
}

// AcceptIncoming takes an incoming HTTP request and upgrades it to a websocket.
func (c *migrationConn) AcceptIncoming(r *http.Request, w http.ResponseWriter) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.disconnected {
		return errors.New("Connection already disconnected")
	}

	if c.conn != nil {
		return api.StatusErrorf(http.StatusConflict, "Connection already established")
	}

	var err error
	c.conn, err = ws.Upgrader.Upgrade(w, r, nil)
	if err != nil {
		return fmt.Errorf("Failed upgrading incoming request to websocket: %w", err)
	}

	// Set TCP timeout options.
	remoteTCP, _ := tcp.ExtractConn(c.conn.UnderlyingConn())
	if remoteTCP != nil {
		err = tcp.SetTimeouts(remoteTCP, 0)
		if err != nil {
			logger.Warn("Failed setting TCP timeouts on incoming websocket connection", logger.Ctx{"err": err})
		}
	}

	close(c.connected)

	return nil
}

// WebSocket returns the underlying websocket connection.
// If the connection isn't yet active it will either wait for an incoming connection or if configured, will attempt
// to initiate a new outbound connection. If the context is cancelled before the connection is established it
// will return with an error.
func (c *migrationConn) WebSocket(ctx context.Context) (*websocket.Conn, error) {
	c.mu.Lock()

	if c.disconnected {
		c.mu.Unlock()
		return nil, errors.New("Connection already disconnected")
	}

	if c.conn != nil {
		c.mu.Unlock()
		return c.conn, nil
	}

	if c.outgoingURL != nil && c.outgoingDialer != nil {
		var err error
		q := c.outgoingURL.Query()
		q.Set("secret", c.secret)
		c.outgoingURL.RawQuery = q.Encode()
		c.conn, _, err = c.outgoingDialer.DialContext(ctx, c.outgoingURL.String(), http.Header{})
		if err != nil {
			c.mu.Unlock()
			return nil, err
		}

		c.mu.Unlock()
		return c.conn, nil
	}

	c.mu.Unlock()

	select {
	case <-c.connected:
		return c.conn, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

// WebsocketIO calls WebSocket and returns it wrapped for io.ReadWriteCloser compatibility.
func (c *migrationConn) WebsocketIO(ctx context.Context) (io.ReadWriteCloser, error) {
	wsConn, err := c.WebSocket(ctx)
	if err != nil {
		return nil, err
	}

	return ws.NewWrapper(wsConn), nil
}

// Close closes the connection (if established) and marks it as disconnected so that it cannot be used again.
func (c *migrationConn) Close() {
	c.mu.Lock()
	defer c.mu.Unlock()

	c.disconnected = true

	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}
}