1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
package plugin // import "github.com/docker/docker/daemon/cluster/controllers/plugin"
import (
"context"
"io"
"net/http"
"github.com/docker/distribution/reference"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/plugin"
v2 "github.com/docker/docker/plugin/v2"
"github.com/docker/swarmkit/api"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Controller is the controller for the plugin backend.
// Plugins are managed as a singleton object with a desired state (different from containers).
// With the plugin controller instead of having a strict create->start->stop->remove
// task lifecycle like containers, we manage the desired state of the plugin and let
// the plugin manager do what it already does and monitor the plugin.
// We'll also end up with many tasks all pointing to the same plugin ID.
//
// TODO(@cpuguy83): registry auth is intentionally not supported until we work out
// the right way to pass registry credentials via secrets.
type Controller struct {
backend Backend
spec runtime.PluginSpec
logger *logrus.Entry
pluginID string
serviceID string
// hook used to signal tests that `Wait()` is actually ready and waiting
signalWaitReady func()
}
// Backend is the interface for interacting with the plugin manager
// Controller actions are passed to the configured backend to do the real work.
type Backend interface {
Disable(name string, config *enginetypes.PluginDisableConfig) error
Enable(name string, config *enginetypes.PluginEnableConfig) error
Remove(name string, config *enginetypes.PluginRmConfig) error
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
Get(name string) (*v2.Plugin, error)
SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
}
// NewController returns a new cluster plugin controller
func NewController(backend Backend, t *api.Task) (*Controller, error) {
spec, err := readSpec(t)
if err != nil {
return nil, err
}
return &Controller{
backend: backend,
spec: spec,
serviceID: t.ServiceID,
logger: logrus.WithFields(logrus.Fields{
"controller": "plugin",
"task": t.ID,
"plugin": spec.Name,
})}, nil
}
func readSpec(t *api.Task) (runtime.PluginSpec, error) {
var cfg runtime.PluginSpec
generic := t.Spec.GetGeneric()
if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
return cfg, errors.Wrap(err, "error reading plugin spec")
}
return cfg, nil
}
// Update is the update phase from swarmkit
func (p *Controller) Update(ctx context.Context, t *api.Task) error {
p.logger.Debug("Update")
return nil
}
// Prepare is the prepare phase from swarmkit
func (p *Controller) Prepare(ctx context.Context) (err error) {
p.logger.Debug("Prepare")
remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
if err != nil {
return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
}
if p.spec.Name == "" {
p.spec.Name = remote.String()
}
var authConfig enginetypes.AuthConfig
privs := convertPrivileges(p.spec.Privileges)
pl, err := p.backend.Get(p.spec.Name)
defer func() {
if pl != nil && err == nil {
pl.Acquire()
}
}()
if err == nil && pl != nil {
if pl.SwarmServiceID != p.serviceID {
return errors.Errorf("plugin already exists: %s", p.spec.Name)
}
if pl.IsEnabled() {
if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
}
}
p.pluginID = pl.GetID()
return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard)
}
if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard, plugin.WithSwarmService(p.serviceID), plugin.WithEnv(p.spec.Env)); err != nil {
return err
}
pl, err = p.backend.Get(p.spec.Name)
if err != nil {
return err
}
p.pluginID = pl.GetID()
return nil
}
// Start is the start phase from swarmkit
func (p *Controller) Start(ctx context.Context) error {
p.logger.Debug("Start")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
if p.spec.Disabled {
if pl.IsEnabled() {
return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
}
return nil
}
if !pl.IsEnabled() {
return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
}
return nil
}
// Wait causes the task to wait until returned
func (p *Controller) Wait(ctx context.Context) error {
p.logger.Debug("Wait")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
defer cancel()
if p.signalWaitReady != nil {
p.signalWaitReady()
}
if !p.spec.Disabled != pl.IsEnabled() {
return errors.New("mismatched plugin state")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case e := <-events:
p.logger.Debugf("got event %T", e)
switch e.(type) {
case plugin.EventEnable:
if p.spec.Disabled {
return errors.New("plugin enabled")
}
case plugin.EventRemove:
return errors.New("plugin removed")
case plugin.EventDisable:
if !p.spec.Disabled {
return errors.New("plugin disabled")
}
}
}
}
}
func isNotFound(err error) bool {
return errdefs.IsNotFound(err)
}
// Shutdown is the shutdown phase from swarmkit
func (p *Controller) Shutdown(ctx context.Context) error {
p.logger.Debug("Shutdown")
return nil
}
// Terminate is the terminate phase from swarmkit
func (p *Controller) Terminate(ctx context.Context) error {
p.logger.Debug("Terminate")
return nil
}
// Remove is the remove phase from swarmkit
func (p *Controller) Remove(ctx context.Context) error {
p.logger.Debug("Remove")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
if isNotFound(err) {
return nil
}
return err
}
pl.Release()
if pl.GetRefCount() > 0 {
p.logger.Debug("skipping remove due to ref count")
return nil
}
// This may error because we have exactly 1 plugin, but potentially multiple
// tasks which are calling remove.
err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
if isNotFound(err) {
return nil
}
return err
}
// Close is the close phase from swarmkit
func (p *Controller) Close() error {
p.logger.Debug("Close")
return nil
}
func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
var out enginetypes.PluginPrivileges
for _, p := range ls {
pp := enginetypes.PluginPrivilege{
Name: p.Name,
Description: p.Description,
Value: p.Value,
}
out = append(out, pp)
}
return out
}
|