1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
package socket
import (
"crypto/rand"
"encoding/hex"
"errors"
"io"
"net"
"os"
"runtime"
"sync"
)
// EnvKey represents the well-known environment variable used to pass the
// plugin being executed the socket name it should listen on to coordinate with
// the host CLI.
const EnvKey = "DOCKER_CLI_PLUGIN_SOCKET"
// NewPluginServer creates a plugin server that listens on a new Unix domain
// socket. h is called for each new connection to the socket in a goroutine.
func NewPluginServer(h func(net.Conn)) (*PluginServer, error) {
// Listen on a Unix socket, with the address being platform-dependent.
// When a non-abstract address is used, Go will unlink(2) the socket
// for us once the listener is closed, as documented in
// [net.UnixListener.SetUnlinkOnClose].
l, err := net.ListenUnix("unix", &net.UnixAddr{
Name: socketName("docker_cli_" + randomID()),
Net: "unix",
})
if err != nil {
return nil, err
}
if h == nil {
h = func(net.Conn) {}
}
pl := &PluginServer{
l: l,
h: h,
}
go func() {
defer pl.Close()
for {
err := pl.accept()
if err != nil {
return
}
}
}()
return pl, nil
}
type PluginServer struct {
mu sync.Mutex
conns []net.Conn
l *net.UnixListener
h func(net.Conn)
closed bool
}
func (pl *PluginServer) accept() error {
conn, err := pl.l.Accept()
if err != nil {
return err
}
pl.mu.Lock()
defer pl.mu.Unlock()
if pl.closed {
// Handle potential race between Close and accept.
conn.Close()
return errors.New("plugin server is closed")
}
pl.conns = append(pl.conns, conn)
go pl.h(conn)
return nil
}
// Addr returns the [net.Addr] of the underlying [net.Listener].
func (pl *PluginServer) Addr() net.Addr {
return pl.l.Addr()
}
// Close ensures that the server is no longer accepting new connections and
// closes all existing connections. Existing connections will receive [io.EOF].
//
// The error value is that of the underlying [net.Listner.Close] call.
func (pl *PluginServer) Close() error {
// Close connections first to ensure the connections get io.EOF instead
// of a connection reset.
pl.closeAllConns()
// Try to ensure that any active connections have a chance to receive
// io.EOF.
runtime.Gosched()
return pl.l.Close()
}
func (pl *PluginServer) closeAllConns() {
pl.mu.Lock()
defer pl.mu.Unlock()
// Prevent new connections from being accepted.
pl.closed = true
for _, conn := range pl.conns {
conn.Close()
}
pl.conns = nil
}
func randomID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
panic(err) // This shouldn't happen
}
return hex.EncodeToString(b)
}
// ConnectAndWait connects to the socket passed via well-known env var,
// if present, and attempts to read from it until it receives an EOF, at which
// point cb is called.
func ConnectAndWait(cb func()) {
socketAddr, ok := os.LookupEnv(EnvKey)
if !ok {
// if a plugin compiled against a more recent version of docker/cli
// is executed by an older CLI binary, ignore missing environment
// variable and behave as usual
return
}
addr, err := net.ResolveUnixAddr("unix", socketAddr)
if err != nil {
return
}
conn, err := net.DialUnix("unix", nil, addr)
if err != nil {
return
}
go func() {
b := make([]byte, 1)
for {
_, err := conn.Read(b)
if errors.Is(err, io.EOF) {
cb()
return
}
}
}()
}
|