1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
|
package plugin
import (
"context"
"fmt"
"path/filepath"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/internal/csi/capability"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/node/plugin"
)
// SecretGetter is a reimplementation of the exec.SecretGetter interface in the
// scope of the plugin package. This avoids the needing to import exec into the
// plugin package.
type SecretGetter interface {
Get(secretID string) (*api.Secret, error)
}
type NodePlugin interface {
GetPublishedPath(volumeID string) string
NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error)
NodeStageVolume(ctx context.Context, req *api.VolumeAssignment) error
NodeUnstageVolume(ctx context.Context, req *api.VolumeAssignment) error
NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) error
NodeUnpublishVolume(ctx context.Context, req *api.VolumeAssignment) error
}
type volumePublishStatus struct {
// stagingPath is staging path of volume
stagingPath string
// isPublished keeps track if the volume is published.
isPublished bool
// publishedPath is published path of volume
publishedPath string
}
type nodePlugin struct {
// name is the name of the plugin, which is used in the Driver.Name field.
name string
// socket is the path of the unix socket to connect to this plugin at
socket string
// scopePath gets the provided path relative to the plugin directory.
scopePath func(s string) string
// secrets is the SecretGetter to get volume secret data
secrets SecretGetter
// volumeMap is the map from volume ID to Volume. Will place a volume once it is staged,
// remove it from the map for unstage.
// TODO: Make this map persistent if the swarm node goes down
volumeMap map[string]*volumePublishStatus
// mu for volumeMap
mu sync.RWMutex
// staging indicates that the plugin has staging capabilities.
staging bool
// cc is the gRPC client connection
cc *grpc.ClientConn
// idClient is the CSI Identity Service client
idClient csi.IdentityClient
// nodeClient is the CSI Node Service client
nodeClient csi.NodeClient
}
const (
// TargetStagePath is the path within the plugin's scope that the volume is
// to be staged. This does not need to be accessible or propagated outside
// of the plugin rootfs.
TargetStagePath string = "/data/staged"
// TargetPublishPath is the path within the plugin's scope that the volume
// is to be published. This needs to be the plugin's PropagatedMount.
TargetPublishPath string = "/data/published"
)
func NewNodePlugin(name string, p plugin.AddrPlugin, secrets SecretGetter) NodePlugin {
return newNodePlugin(name, p, secrets)
}
// newNodePlugin returns a raw nodePlugin object, not behind an interface. this
// is useful for testing.
func newNodePlugin(name string, p plugin.AddrPlugin, secrets SecretGetter) *nodePlugin {
return &nodePlugin{
name: name,
socket: fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
scopePath: p.ScopedPath,
secrets: secrets,
volumeMap: map[string]*volumePublishStatus{},
}
}
// connect is a private method that sets up the identity client and node
// client from a grpc client. it exists separately so that testing code can
// substitute in fake clients without a grpc connection
func (np *nodePlugin) connect(ctx context.Context) error {
// even though this is a unix socket, we must set WithInsecure or the
// connection will not be allowed.
cc, err := grpc.DialContext(ctx, np.socket, grpc.WithInsecure())
if err != nil {
return err
}
np.cc = cc
// first, probe the plugin, to ensure that it exists and is ready to go
idc := csi.NewIdentityClient(cc)
np.idClient = idc
np.nodeClient = csi.NewNodeClient(cc)
return np.init(ctx)
}
func (np *nodePlugin) Client(ctx context.Context) (csi.NodeClient, error) {
if np.nodeClient == nil {
if err := np.connect(ctx); err != nil {
return nil, err
}
}
return np.nodeClient, nil
}
func (np *nodePlugin) init(ctx context.Context) error {
probe, err := np.idClient.Probe(ctx, &csi.ProbeRequest{})
if err != nil {
return err
}
if probe.Ready != nil && !probe.Ready.Value {
return status.Error(codes.FailedPrecondition, "Plugin is not Ready")
}
c, err := np.Client(ctx)
if err != nil {
return err
}
resp, err := c.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{})
if err != nil {
// TODO(ameyag): handle
return err
}
if resp == nil {
return nil
}
log.G(ctx).Debugf("plugin advertises %d capabilities", len(resp.Capabilities))
for _, c := range resp.Capabilities {
if rpc := c.GetRpc(); rpc != nil {
log.G(ctx).Debugf("plugin has capability %s", rpc)
switch rpc.Type {
case csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME:
np.staging = true
}
}
}
return nil
}
// GetPublishedPath returns the path at which the provided volume ID is
// published. This path is provided in terms of absolute location on the host,
// not the location in the plugins' scope.
//
// Returns an empty string if the volume does not exist.
func (np *nodePlugin) GetPublishedPath(volumeID string) string {
np.mu.RLock()
defer np.mu.RUnlock()
if volInfo, ok := np.volumeMap[volumeID]; ok {
if volInfo.isPublished {
return np.scopePath(volInfo.publishedPath)
}
}
return ""
}
func (np *nodePlugin) NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error) {
c, err := np.Client(ctx)
if err != nil {
return nil, err
}
resp, err := c.NodeGetInfo(ctx, &csi.NodeGetInfoRequest{})
if err != nil {
return nil, err
}
i := makeNodeInfo(resp)
i.PluginName = np.name
return i, nil
}
func (np *nodePlugin) NodeStageVolume(ctx context.Context, req *api.VolumeAssignment) error {
np.mu.Lock()
defer np.mu.Unlock()
if !np.staging {
return nil
}
stagingTarget := stagePath(req)
err := capability.CheckArguments(req)
if err != nil {
return err
}
c, err := np.Client(ctx)
if err != nil {
return err
}
_, err = c.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
VolumeId: req.VolumeID,
StagingTargetPath: stagingTarget,
Secrets: np.makeSecrets(req),
VolumeCapability: capability.MakeCapability(req.AccessMode),
VolumeContext: req.VolumeContext,
PublishContext: req.PublishContext,
})
if err != nil {
return err
}
v := &volumePublishStatus{
stagingPath: stagingTarget,
}
np.volumeMap[req.ID] = v
log.G(ctx).Infof("volume staged to path %s", stagingTarget)
return nil
}
func (np *nodePlugin) NodeUnstageVolume(ctx context.Context, req *api.VolumeAssignment) error {
np.mu.Lock()
defer np.mu.Unlock()
if !np.staging {
return nil
}
stagingTarget := stagePath(req)
// Check arguments
if len(req.VolumeID) == 0 {
return status.Error(codes.FailedPrecondition, "VolumeID missing in request")
}
c, err := np.Client(ctx)
if err != nil {
return err
}
// we must unpublish before we unstage. verify here that the volume is not
// published.
if v, ok := np.volumeMap[req.ID]; ok {
if v.isPublished {
return status.Errorf(codes.FailedPrecondition, "Volume %s is not unpublished", req.ID)
}
return nil
}
_, err = c.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
VolumeId: req.VolumeID,
StagingTargetPath: stagingTarget,
})
if err != nil {
return err
}
// if the volume doesn't exist in the volumeMap, deleting has no effect.
delete(np.volumeMap, req.ID)
log.G(ctx).Info("volume unstaged")
return nil
}
func (np *nodePlugin) NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) error {
err := capability.CheckArguments(req)
if err != nil {
return err
}
np.mu.Lock()
defer np.mu.Unlock()
publishTarget := publishPath(req)
// Some volumes plugins require staging; we track this with a boolean, which
// also implies a staging path in the path map. If the plugin is marked as
// requiring staging but does not have a staging path in the map, that is an
// error.
var stagingPath string
if vs, ok := np.volumeMap[req.ID]; ok {
stagingPath = vs.stagingPath
} else if np.staging {
return status.Error(codes.FailedPrecondition, "volume requires staging but was not staged")
}
c, err := np.Client(ctx)
if err != nil {
return err
}
_, err = c.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeId: req.VolumeID,
TargetPath: publishTarget,
StagingTargetPath: stagingPath,
VolumeCapability: capability.MakeCapability(req.AccessMode),
Secrets: np.makeSecrets(req),
VolumeContext: req.VolumeContext,
PublishContext: req.PublishContext,
})
if err != nil {
return err
}
status, ok := np.volumeMap[req.ID]
if !ok {
status = &volumePublishStatus{}
np.volumeMap[req.ID] = status
}
status.isPublished = true
status.publishedPath = publishTarget
log.G(ctx).Infof("volume published to path %s", publishTarget)
return nil
}
func (np *nodePlugin) NodeUnpublishVolume(ctx context.Context, req *api.VolumeAssignment) error {
// Check arguments
if len(req.VolumeID) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
np.mu.Lock()
defer np.mu.Unlock()
publishTarget := publishPath(req)
c, err := np.Client(ctx)
if err != nil {
return err
}
_, err = c.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
VolumeId: req.VolumeID,
TargetPath: publishTarget,
})
if err != nil {
return err
}
if v, ok := np.volumeMap[req.ID]; ok {
v.publishedPath = ""
v.isPublished = false
return nil
}
log.G(ctx).Info("volume unpublished")
return nil
}
func (np *nodePlugin) makeSecrets(v *api.VolumeAssignment) map[string]string {
// this should never happen, but program defensively.
if v == nil {
return nil
}
secrets := make(map[string]string, len(v.Secrets))
for _, secret := range v.Secrets {
// TODO(dperny): handle error from Get
value, _ := np.secrets.Get(secret.Secret)
if value != nil {
secrets[secret.Key] = string(value.Spec.Data)
}
}
return secrets
}
// makeNodeInfo converts a csi.NodeGetInfoResponse object into a swarmkit NodeCSIInfo
// object.
func makeNodeInfo(csiNodeInfo *csi.NodeGetInfoResponse) *api.NodeCSIInfo {
return &api.NodeCSIInfo{
NodeID: csiNodeInfo.NodeId,
MaxVolumesPerNode: csiNodeInfo.MaxVolumesPerNode,
}
}
// stagePath returns the staging path for a given volume assignment
func stagePath(v *api.VolumeAssignment) string {
// this really just exists so we use the same trick to determine staging
// path across multiple methods and can't forget to change it in one place
// but not another
return filepath.Join(TargetStagePath, v.ID)
}
// publishPath returns the publishing path for a given volume assignment
func publishPath(v *api.VolumeAssignment) string {
// ditto as stagePath
return filepath.Join(TargetPublishPath, v.ID)
}
|