1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
|
package csi
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/docker/go-events"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/state/store"
mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
"github.com/moby/swarmkit/v2/volumequeue"
)
const (
// DockerCSIPluginCap is the capability name of the plugins we use with the
// PluginGetter to get only the plugins we need. The full name of the
// plugin interface is "docker.csicontroller/1.0". This gets only the CSI
// plugins with Controller capability.
DockerCSIPluginCap = "csicontroller"
// CSIRPCTimeout is the client-side timeout duration for RPCs to the CSI
// plugin.
CSIRPCTimeout = 15 * time.Second
)
type Manager struct {
store *store.MemoryStore
// provider is the SecretProvider which allows retrieving secrets. Used
// when creating new Plugin objects.
provider SecretProvider
// pg is the plugingetter, which allows us to access the Docker Engine's
// plugin store.
pg mobyplugin.Getter
// newPlugin is a function which returns an object implementing the Plugin
// interface. It allows us to swap out the implementation of plugins while
// unit-testing the Manager
newPlugin func(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin
// synchronization for starting and stopping the Manager
startOnce sync.Once
stopChan chan struct{}
stopOnce sync.Once
doneChan chan struct{}
plugins map[string]Plugin
pendingVolumes *volumequeue.VolumeQueue
}
func NewManager(s *store.MemoryStore, pg mobyplugin.Getter) *Manager {
return &Manager{
store: s,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
newPlugin: NewPlugin,
pg: pg,
plugins: map[string]Plugin{},
provider: NewSecretProvider(s),
pendingVolumes: volumequeue.NewVolumeQueue(),
}
}
// Run runs the manager. The provided context is used as the parent for all RPC
// calls made to the CSI plugins. Canceling this context will cancel those RPC
// calls by the nature of contexts, but this is not the preferred way to stop
// the Manager. Instead, Stop should be called, which cause all RPC calls to be
// canceled anyway. The context is also used to get the logging context for the
// Manager.
func (vm *Manager) Run(ctx context.Context) {
vm.startOnce.Do(func() {
vm.run(ctx)
})
}
// run performs the actual meat of the run operation.
//
// the argument is called pctx because it's the parent context, from which we
// immediately resolve a new child context.
func (vm *Manager) run(pctx context.Context) {
defer close(vm.doneChan)
ctx, ctxCancel := context.WithCancel(
log.WithModule(pctx, "csi/manager"),
)
defer ctxCancel()
watch, cancel, err := store.ViewAndWatch(vm.store, func(tx store.ReadTx) error {
// TODO(dperny): change this from ViewAndWatch to one that's just
// Watch.
return nil
})
if err != nil {
log.G(ctx).WithError(err).Error("error in store view and watch")
return
}
defer cancel()
vm.init(ctx)
// run a goroutine which periodically processes incoming volumes. the
// handle function will trigger processing every time new events come in
// by writing to the channel
doneProc := make(chan struct{})
go func() {
for {
id, attempt := vm.pendingVolumes.Wait()
// this case occurs when the stop method has been called on
// pendingVolumes. stop is called on pendingVolumes when Stop is
// called on the CSI manager.
if id == "" && attempt == 0 {
break
}
// TODO(dperny): we can launch some number of workers and process
// more than one volume at a time, if desired.
vm.processVolume(ctx, id, attempt)
}
// closing doneProc signals that this routine has exited, and allows
// the main Run routine to exit.
close(doneProc)
}()
// defer read from doneProc. doneProc is closed in the goroutine above,
// and this defer will block until then. Because defers are executed as a
// stack, this in turn blocks the final defer (closing doneChan) from
// running. Ultimately, this prevents Stop from returning until the above
// goroutine is closed.
defer func() {
<-doneProc
}()
for {
select {
case ev := <-watch:
vm.handleEvent(ev)
case <-vm.stopChan:
vm.pendingVolumes.Stop()
return
}
}
}
// processVolumes encapuslates the logic for processing pending Volumes.
func (vm *Manager) processVolume(ctx context.Context, id string, attempt uint) {
// set up log fields for a derrived context to pass to handleVolume.
logCtx := log.WithFields(ctx, log.Fields{
"volume.id": id,
"attempt": attempt,
})
// Set a client-side timeout. Without this, one really long server-side
// timeout can block processing all volumes until it completes or fails.
dctx, cancel := context.WithTimeout(logCtx, CSIRPCTimeout)
// always gotta call the WithTimeout cancel
defer cancel()
err := vm.handleVolume(dctx, id)
// TODO(dperny): differentiate between retryable and non-retryable
// errors.
if err != nil {
log.G(dctx).WithError(err).Info("error handling volume")
vm.pendingVolumes.Enqueue(id, attempt+1)
}
}
// init does one-time setup work for the Manager, like creating all of
// the Plugins and initializing the local state of the component.
func (vm *Manager) init(ctx context.Context) {
var (
nodes []*api.Node
volumes []*api.Volume
)
vm.store.View(func(tx store.ReadTx) {
var err error
nodes, err = store.FindNodes(tx, store.All)
if err != nil {
// this should *never happen*. Find only returns errors if the find
// by is invalid.
log.G(ctx).WithError(err).Error("error finding nodes")
}
volumes, err = store.FindVolumes(tx, store.All)
if err != nil {
// likewise, should never happen.
log.G(ctx).WithError(err).Error("error finding volumes")
}
})
for _, node := range nodes {
vm.handleNode(node)
}
// on initialization, we enqueue all of the Volumes. The easiest way to
// know if a Volume needs some work performed is to just pass it through
// the VolumeManager. If it doesn't need any work, then we will quickly
// skip by it. Otherwise, the needed work will be performed.
for _, volume := range volumes {
vm.enqueueVolume(volume.ID)
}
}
func (vm *Manager) Stop() {
vm.stopOnce.Do(func() {
close(vm.stopChan)
})
<-vm.doneChan
}
func (vm *Manager) handleEvent(ev events.Event) {
switch e := ev.(type) {
case api.EventCreateVolume:
vm.enqueueVolume(e.Volume.ID)
case api.EventUpdateVolume:
vm.enqueueVolume(e.Volume.ID)
case api.EventCreateNode:
vm.handleNode(e.Node)
case api.EventUpdateNode:
// for updates, we're only adding the node to every plugin. if the node
// no longer reports CSIInfo for a specific plugin, we will just leave
// the stale data in the plugin. this should not have any adverse
// effect, because the memory impact is small, and this operation
// should not be frequent. this may change as the code for volumes
// becomes more polished.
vm.handleNode(e.Node)
case api.EventDeleteNode:
vm.handleNodeRemove(e.Node.ID)
}
}
func (vm *Manager) createVolume(ctx context.Context, v *api.Volume) error {
l := log.G(ctx).WithField("volume.id", v.ID).WithField("driver", v.Spec.Driver.Name)
l.Info("creating volume")
p, err := vm.getPlugin(v.Spec.Driver.Name)
if err != nil {
l.Errorf("volume creation failed: %s", err.Error())
return err
}
info, err := p.CreateVolume(ctx, v)
if err != nil {
l.WithError(err).Error("volume create failed")
return err
}
err = vm.store.Update(func(tx store.Tx) error {
v2 := store.GetVolume(tx, v.ID)
// the volume should never be missing. I don't know of even any race
// condition that could result in this behavior. nevertheless, it's
// better to do this than to segfault.
if v2 == nil {
return nil
}
v2.VolumeInfo = info
return store.UpdateVolume(tx, v2)
})
if err != nil {
l.WithError(err).Error("committing created volume to store failed")
}
return err
}
// enqueueVolume enqueues a new volume event, placing the Volume ID into
// pendingVolumes to be processed. Because enqueueVolume is only called in
// response to a new Volume update event, not for a retry, the retry number is
// always reset to 0.
func (vm *Manager) enqueueVolume(id string) {
vm.pendingVolumes.Enqueue(id, 0)
}
// handleVolume processes a Volume. It determines if any relevant update has
// occurred, and does the required work to handle that update if so.
//
// returns an error if handling the volume failed and needs to be retried.
//
// even if an error is returned, the store may still be updated.
func (vm *Manager) handleVolume(ctx context.Context, id string) error {
var volume *api.Volume
vm.store.View(func(tx store.ReadTx) {
volume = store.GetVolume(tx, id)
})
if volume == nil {
// if the volume no longer exists, there is nothing to do, nothing to
// retry, and no relevant error.
return nil
}
if volume.VolumeInfo == nil {
return vm.createVolume(ctx, volume)
}
if volume.PendingDelete {
return vm.deleteVolume(ctx, volume)
}
updated := false
// TODO(dperny): it's just pointers, but copying the entire PublishStatus
// on each update might be intensive.
// we take a copy of the PublishStatus slice, because if we succeed in an
// unpublish operation, we will delete that status from PublishStatus.
statuses := make([]*api.VolumePublishStatus, len(volume.PublishStatus))
copy(statuses, volume.PublishStatus)
// failedPublishOrUnpublish is a slice of nodes where publish or unpublish
// operations failed. Publishing or unpublishing a volume can succeed or
// fail in part. If any failures occur, we will add the node ID of the
// publish operation that failed to this slice. Then, at the end of this
// function, after we update the store, if there are any failed operations,
// we will still return an error.
failedPublishOrUnpublish := []string{}
// adjustIndex is the number of entries deleted from volume.PublishStatus.
// when we're deleting entries from volume.PublishStatus, the index of the
// entry in statuses will no longer match the index of the same entry in
// volume.PublishStatus. we subtract adjustIndex from i to get the index
// where the entry is found after taking into account the deleted entries.
adjustIndex := 0
for i, status := range statuses {
switch status.State {
case api.VolumePublishStatus_PENDING_PUBLISH:
plug, err := vm.getPlugin(volume.Spec.Driver.Name)
if err != nil {
status.Message = fmt.Sprintf("error publishing volume: %v", err)
failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
} else {
publishContext, err := plug.PublishVolume(ctx, volume, status.NodeID)
if err == nil {
status.State = api.VolumePublishStatus_PUBLISHED
status.PublishContext = publishContext
status.Message = ""
} else {
status.Message = fmt.Sprintf("error publishing volume: %v", err)
failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
}
}
updated = true
case api.VolumePublishStatus_PENDING_UNPUBLISH:
plug, err := vm.getPlugin(volume.Spec.Driver.Name)
if err != nil {
status.Message = fmt.Sprintf("error unpublishing volume: %v", err)
failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
} else {
err := plug.UnpublishVolume(ctx, volume, status.NodeID)
if err == nil {
// if there is no error with unpublishing, then we delete the
// status from the statuses slice.
j := i - adjustIndex
volume.PublishStatus = append(volume.PublishStatus[:j], volume.PublishStatus[j+1:]...)
adjustIndex++
} else {
status.Message = fmt.Sprintf("error unpublishing volume: %v", err)
failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
}
}
updated = true
}
}
if updated {
if err := vm.store.Update(func(tx store.Tx) error {
// the publish status is now authoritative. read-update-write the
// volume object.
v := store.GetVolume(tx, volume.ID)
if v == nil {
// volume should never be deleted with pending publishes. if
// this does occur somehow, then we will just ignore it, rather
// than crashing.
return nil
}
v.PublishStatus = volume.PublishStatus
return store.UpdateVolume(tx, v)
}); err != nil {
return err
}
}
if len(failedPublishOrUnpublish) > 0 {
return fmt.Errorf("error publishing or unpublishing to some nodes: %v", failedPublishOrUnpublish)
}
return nil
}
// handleNode handles one node event
func (vm *Manager) handleNode(n *api.Node) {
if n.Description == nil {
return
}
// we just call AddNode on every update. Because it's just a map
// assignment, this is probably faster than checking if something changed.
for _, info := range n.Description.CSIInfo {
p, err := vm.getPlugin(info.PluginName)
if err != nil {
log.L.Warnf("error handling node: %v", err)
// TODO(dperny): log something
continue
}
p.AddNode(n.ID, info.NodeID)
}
}
// handleNodeRemove handles a node delete event
func (vm *Manager) handleNodeRemove(nodeID string) {
// we just call RemoveNode on every plugin, because it's probably quicker
// than checking if the node was using that plugin.
//
// we don't need to worry about lazy-loading here, because if don't have
// the plugin loaded, there's no need to call remove.
for _, plugin := range vm.plugins {
plugin.RemoveNode(nodeID)
}
}
func (vm *Manager) deleteVolume(ctx context.Context, v *api.Volume) error {
// TODO(dperny): handle missing plugin
plug, err := vm.getPlugin(v.Spec.Driver.Name)
if err != nil {
return err
}
err = plug.DeleteVolume(ctx, v)
if err != nil {
return err
}
// TODO(dperny): handle update error
return vm.store.Update(func(tx store.Tx) error {
return store.DeleteVolume(tx, v.ID)
})
}
// getPlugin returns the plugin with the given name.
//
// In a previous iteration of the architecture of this component, plugins were
// added to the manager through an update to the Cluster object, which
// triggered an event. In other words, they were eagerly loaded.
//
// When rearchitecting to use the plugingetter.PluginGetter interface, that
// eager loading is no longer practical, because the method for getting events
// about new plugins would be difficult to plumb this deep into swarm.
//
// Instead, we change from what was previously a bunch of raw map lookups to
// instead a method call which lazy-loads the plugins as needed. This is fine,
// because in the Plugin object itself, the network connection is made lazily
// as well.
//
// TODO(dperny): There is no way to unload a plugin. Unloading plugins will
// happen as part of a leadership change, but otherwise, on especially
// long-lived managers with especially high plugin churn, this is a memory
// leak. It's acceptable for now because we expect neither exceptionally long
// lived managers nor exceptionally high plugin churn.
func (vm *Manager) getPlugin(name string) (Plugin, error) {
// if the plugin already exists, we can just return it.
if p, ok := vm.plugins[name]; ok {
return p, nil
}
// otherwise, we need to load the plugin.
pc, err := vm.pg.Get(name, DockerCSIPluginCap)
if err != nil {
return nil, err
}
if pc == nil {
return nil, errors.New("driver \"" + name + "\" not found")
}
pa, ok := pc.(mobyplugin.AddrPlugin)
if !ok {
return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
}
p := vm.newPlugin(pa, vm.provider)
vm.plugins[name] = p
return p, nil
}
|