1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
package plugin
import (
"errors"
"fmt"
"io"
"log"
"net"
"net/rpc"
"sync"
"github.com/hashicorp/yamux"
)
// RPCServer listens for network connections and then dispenses interface
// implementations over net/rpc.
//
// After setting the fields below, they shouldn't be read again directly
// from the structure which may be reading/writing them concurrently.
type RPCServer struct {
Plugins map[string]Plugin
// Stdout, Stderr are what this server will use instead of the
// normal stdin/out/err. This is because due to the multi-process nature
// of our plugin system, we can't use the normal process values so we
// make our own custom one we pipe across.
Stdout io.Reader
Stderr io.Reader
// DoneCh should be set to a non-nil channel that will be closed
// when the control requests the RPC server to end.
DoneCh chan<- struct{}
lock sync.Mutex
}
// ServerProtocol impl.
func (s *RPCServer) Init() error { return nil }
// ServerProtocol impl.
func (s *RPCServer) Config() string { return "" }
// ServerProtocol impl.
func (s *RPCServer) Serve(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Printf("[ERR] plugin: plugin server: %s", err)
return
}
go s.ServeConn(conn)
}
}
// ServeConn runs a single connection.
//
// ServeConn blocks, serving the connection until the client hangs up.
func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
// First create the yamux server to wrap this connection
mux, err := yamux.Server(conn, nil)
if err != nil {
conn.Close()
log.Printf("[ERR] plugin: error creating yamux server: %s", err)
return
}
// Accept the control connection
control, err := mux.Accept()
if err != nil {
mux.Close()
if err != io.EOF {
log.Printf("[ERR] plugin: error accepting control connection: %s", err)
}
return
}
// Connect the stdstreams (in, out, err)
stdstream := make([]net.Conn, 2)
for i, _ := range stdstream {
stdstream[i], err = mux.Accept()
if err != nil {
mux.Close()
log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
return
}
}
// Copy std streams out to the proper place
go copyStream("stdout", stdstream[0], s.Stdout)
go copyStream("stderr", stdstream[1], s.Stderr)
// Create the broker and start it up
broker := newMuxBroker(mux)
go broker.Run()
// Use the control connection to build the dispenser and serve the
// connection.
server := rpc.NewServer()
server.RegisterName("Control", &controlServer{
server: s,
})
server.RegisterName("Dispenser", &dispenseServer{
broker: broker,
plugins: s.Plugins,
})
server.ServeConn(control)
}
// done is called internally by the control server to trigger the
// doneCh to close which is listened to by the main process to cleanly
// exit.
func (s *RPCServer) done() {
s.lock.Lock()
defer s.lock.Unlock()
if s.DoneCh != nil {
close(s.DoneCh)
s.DoneCh = nil
}
}
// dispenseServer dispenses variousinterface implementations for Terraform.
type controlServer struct {
server *RPCServer
}
// Ping can be called to verify the connection (and likely the binary)
// is still alive to a plugin.
func (c *controlServer) Ping(
null bool, response *struct{}) error {
*response = struct{}{}
return nil
}
func (c *controlServer) Quit(
null bool, response *struct{}) error {
// End the server
c.server.done()
// Always return true
*response = struct{}{}
return nil
}
// dispenseServer dispenses variousinterface implementations for Terraform.
type dispenseServer struct {
broker *MuxBroker
plugins map[string]Plugin
}
func (d *dispenseServer) Dispense(
name string, response *uint32) error {
// Find the function to create this implementation
p, ok := d.plugins[name]
if !ok {
return fmt.Errorf("unknown plugin type: %s", name)
}
// Create the implementation first so we know if there is an error.
impl, err := p.Server(d.broker)
if err != nil {
// We turn the error into an errors error so that it works across RPC
return errors.New(err.Error())
}
// Reserve an ID for our implementation
id := d.broker.NextId()
*response = id
// Run the rest in a goroutine since it can only happen once this RPC
// call returns. We wait for a connection for the plugin implementation
// and serve it.
go func() {
conn, err := d.broker.Accept(id)
if err != nil {
log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
return
}
serve(conn, "Plugin", impl)
}()
return nil
}
func serve(conn io.ReadWriteCloser, name string, v interface{}) {
server := rpc.NewServer()
if err := server.RegisterName(name, v); err != nil {
log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
return
}
server.ServeConn(conn)
}
|