1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
package csi
import (
"context"
"fmt"
"sync"
"time"
"github.com/moby/swarmkit/v2/agent/csi/plugin"
"github.com/moby/swarmkit/v2/agent/exec"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
"github.com/moby/swarmkit/v2/volumequeue"
)
const csiCallTimeout = 15 * time.Second
// volumeState keeps track of the state of a volume on this node.
type volumeState struct {
// volume is the actual VolumeAssignment for this volume
volume *api.VolumeAssignment
// remove is true if the volume is to be removed, or false if it should be
// active.
remove bool
// removeCallback is called when the volume is successfully removed.
removeCallback func(id string)
}
// volumes is a map that keeps all the currently available volumes to the agent
// mapped by volume ID.
type volumes struct {
// mu guards access to the volumes map.
mu sync.RWMutex
// volumes is a mapping of volume ID to volumeState
volumes map[string]volumeState
// plugins is the Manager, which provides translation to the CSI RPCs
plugins plugin.Manager
// pendingVolumes is a VolumeQueue which manages which volumes are
// processed and when.
pendingVolumes *volumequeue.VolumeQueue
}
// NewManager returns a place to store volumes.
func NewManager(pg mobyplugin.Getter, secrets exec.SecretGetter) exec.VolumesManager {
r := &volumes{
volumes: map[string]volumeState{},
plugins: plugin.NewManager(pg, secrets),
pendingVolumes: volumequeue.NewVolumeQueue(),
}
go r.retryVolumes()
return r
}
// retryVolumes runs in a goroutine to retry failing volumes.
func (r *volumes) retryVolumes() {
ctx := log.WithModule(context.Background(), "node/agent/csi")
for {
vid, attempt := r.pendingVolumes.Wait()
dctx := log.WithFields(ctx, log.Fields{
"volume.id": vid,
"attempt": fmt.Sprintf("%d", attempt),
})
// this case occurs when the Stop method has been called on
// pendingVolumes, and means that we should pack up and exit.
if vid == "" && attempt == 0 {
break
}
r.tryVolume(dctx, vid, attempt)
}
}
// tryVolume synchronously tries one volume. it puts the volume back into the
// queue if the attempt fails.
func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) {
r.mu.RLock()
vs, ok := r.volumes[id]
r.mu.RUnlock()
if !ok {
return
}
// create a sub-context with a timeout. because we can only process one
// volume at a time, if we rely on the server-side or default timeout, we
// may be waiting a very long time for a particular volume to fail.
//
// TODO(dperny): there is almost certainly a more intelligent way to do
// this. For example, we could:
//
// * Change code such that we can service volumes managed by different
// plugins at the same time.
// * Take longer timeouts when we don't have any other volumes in the
// queue
// * Have interruptible attempts, so that if we're taking longer
// timeouts, we can abort them to service new volumes.
//
// These are too complicated to be worth the engineering effort at this
// time.
timeoutCtx, cancel := context.WithTimeout(ctx, csiCallTimeout)
// always gotta call the WithTimeout cancel
defer cancel()
if !vs.remove {
if err := r.publishVolume(timeoutCtx, vs.volume); err != nil {
log.G(timeoutCtx).WithError(err).Info("publishing volume failed")
r.pendingVolumes.Enqueue(id, attempt+1)
}
} else {
if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil {
log.G(timeoutCtx).WithError(err).Info("upublishing volume failed")
r.pendingVolumes.Enqueue(id, attempt+1)
} else {
// if unpublishing was successful, then call the callback
vs.removeCallback(id)
}
}
}
// Get returns a volume published path for the provided volume ID. If the volume doesn't exist, returns empty string.
func (r *volumes) Get(volumeID string) (string, error) {
r.mu.Lock()
defer r.mu.Unlock()
if vs, ok := r.volumes[volumeID]; ok {
if vs.remove {
// TODO(dperny): use a structured error
return "", fmt.Errorf("volume being removed")
}
if p, err := r.plugins.Get(vs.volume.Driver.Name); err == nil {
path := p.GetPublishedPath(volumeID)
if path != "" {
return path, nil
}
// don't put this line here, it spams like crazy.
// log.L.WithField("method", "(*volumes).Get").Debugf("Path not published for volume:%v", volumeID)
} else {
return "", err
}
}
return "", fmt.Errorf("%w: published path is unavailable", exec.ErrDependencyNotReady)
}
// Add adds one or more volumes to the volume map.
func (r *volumes) Add(volumes ...api.VolumeAssignment) {
r.mu.Lock()
defer r.mu.Unlock()
for _, volume := range volumes {
// if we get an Add operation, then we will always restart the retries.
v := volume.Copy()
r.volumes[volume.ID] = volumeState{
volume: v,
}
// enqueue the volume so that we process it
r.pendingVolumes.Enqueue(volume.ID, 0)
log.L.WithField("method", "(*volumes).Add").Debugf("Add Volume: %v", volume.VolumeID)
}
}
// Remove removes one or more volumes from this manager. callback is called
// whenever the removal is successful.
func (r *volumes) Remove(volumes []api.VolumeAssignment, callback func(id string)) {
r.mu.Lock()
defer r.mu.Unlock()
for _, volume := range volumes {
// if we get a Remove call, then we always restart the retries and
// attempt removal.
v := volume.Copy()
r.volumes[volume.ID] = volumeState{
volume: v,
remove: true,
removeCallback: callback,
}
r.pendingVolumes.Enqueue(volume.ID, 0)
}
}
func (r *volumes) publishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
log.G(ctx).Info("attempting to publish volume")
p, err := r.plugins.Get(assignment.Driver.Name)
if err != nil {
return err
}
// even though this may have succeeded already, the call to NodeStageVolume
// is idempotent, so we can retry it every time.
if err := p.NodeStageVolume(ctx, assignment); err != nil {
return err
}
log.G(ctx).Debug("staging volume succeeded, attempting to publish volume")
return p.NodePublishVolume(ctx, assignment)
}
func (r *volumes) unpublishVolume(ctx context.Context, assignment *api.VolumeAssignment) error {
log.G(ctx).Info("attempting to unpublish volume")
p, err := r.plugins.Get(assignment.Driver.Name)
if err != nil {
return err
}
if err := p.NodeUnpublishVolume(ctx, assignment); err != nil {
return err
}
return p.NodeUnstageVolume(ctx, assignment)
}
func (r *volumes) Plugins() exec.VolumePluginManager {
return r.plugins
}
// taskRestrictedVolumesProvider restricts the ids to the task.
type taskRestrictedVolumesProvider struct {
volumes exec.VolumeGetter
volumeIDs map[string]struct{}
}
func (sp *taskRestrictedVolumesProvider) Get(volumeID string) (string, error) {
if _, ok := sp.volumeIDs[volumeID]; !ok {
return "", fmt.Errorf("task not authorized to access volume %s", volumeID)
}
return sp.volumes.Get(volumeID)
}
// Restrict provides a getter that only allows access to the volumes
// referenced by the task.
func Restrict(volumes exec.VolumeGetter, t *api.Task) exec.VolumeGetter {
vids := map[string]struct{}{}
for _, v := range t.Volumes {
vids[v.ID] = struct{}{}
}
return &taskRestrictedVolumesProvider{volumes: volumes, volumeIDs: vids}
}
|