1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
|
package csi
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/internal/csi/capability"
"github.com/moby/swarmkit/v2/log"
mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
)
// Plugin is the interface for a CSI controller plugin.
//
// In this package, the word "plugin" is unfortunately overused. This
// particular "Plugin" is the interface used by volume Manager to interact with
// CSI controller plugins. It should not be confused with the "plugin" returned
// from the plugingetter interface, which is the interface that gives us the
// information we need to create this Plugin.
type Plugin interface {
CreateVolume(context.Context, *api.Volume) (*api.VolumeInfo, error)
DeleteVolume(context.Context, *api.Volume) error
PublishVolume(context.Context, *api.Volume, string) (map[string]string, error)
UnpublishVolume(context.Context, *api.Volume, string) error
AddNode(swarmID, csiID string)
RemoveNode(swarmID string)
}
// plugin represents an individual CSI controller plugin
type plugin struct {
// name is the name of the plugin, which is also the name used as the
// Driver.Name field
name string
// socket is the unix socket to connect to this plugin at.
socket string
// provider is the SecretProvider, which allows retrieving secrets for CSI
// calls.
provider SecretProvider
// cc is the grpc client connection
// TODO(dperny): the client is never closed. it may be closed when it goes
// out of scope, but this should be verified.
cc *grpc.ClientConn
// idClient is the identity service client
idClient csi.IdentityClient
// controllerClient is the controller service client
controllerClient csi.ControllerClient
// controller indicates that the plugin has controller capabilities.
controller bool
// publisher indicates that the controller plugin has
// PUBLISH_UNPUBLISH_VOLUME capability.
publisher bool
// swarmToCSI maps a swarm node ID to the corresponding CSI node ID
swarmToCSI map[string]string
// csiToSwarm maps a CSI node ID back to the swarm node ID.
csiToSwarm map[string]string
}
// NewPlugin creates a new Plugin object.
//
// NewPlugin takes both the CompatPlugin and the PluginAddr. These should be
// the same object. By taking both parts here, we can push off the work of
// assuring that the given plugin implements the PluginAddr interface without
// having to typecast in this constructor.
func NewPlugin(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin {
return &plugin{
name: p.Name(),
// TODO(dperny): verify that we do not need to include the Network()
// portion of the Addr.
socket: fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
provider: provider,
swarmToCSI: map[string]string{},
csiToSwarm: map[string]string{},
}
}
// connect is a private method that initializes a gRPC ClientConn and creates
// the IdentityClient and ControllerClient.
func (p *plugin) connect(ctx context.Context) error {
cc, err := grpc.DialContext(ctx, p.socket, grpc.WithInsecure())
if err != nil {
return err
}
p.cc = cc
// first, probe the plugin, to ensure that it exists and is ready to go
idc := csi.NewIdentityClient(cc)
p.idClient = idc
// controllerClient may not do anything if the plugin does not support
// the controller service, but it should not be an error to create it now
// anyway
p.controllerClient = csi.NewControllerClient(cc)
return p.init(ctx)
}
// init checks uses the identity service to check the properties of the plugin,
// most importantly, its capabilities.
func (p *plugin) init(ctx context.Context) error {
probe, err := p.idClient.Probe(ctx, &csi.ProbeRequest{})
if err != nil {
return err
}
if probe.Ready != nil && !probe.Ready.Value {
return errors.New("plugin not ready")
}
resp, err := p.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
if err != nil {
return err
}
if resp == nil {
return nil
}
for _, c := range resp.Capabilities {
if sc := c.GetService(); sc != nil {
switch sc.Type {
case csi.PluginCapability_Service_CONTROLLER_SERVICE:
p.controller = true
}
}
}
if p.controller {
cCapResp, err := p.controllerClient.ControllerGetCapabilities(
ctx, &csi.ControllerGetCapabilitiesRequest{},
)
if err != nil {
return err
}
for _, c := range cCapResp.Capabilities {
rpc := c.GetRpc()
if rpc.Type == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME {
p.publisher = true
}
}
}
return nil
}
// CreateVolume wraps and abstracts the CSI CreateVolume logic and returns
// the volume info, or an error.
func (p *plugin) CreateVolume(ctx context.Context, v *api.Volume) (*api.VolumeInfo, error) {
c, err := p.Client(ctx)
if err != nil {
return nil, err
}
if !p.controller {
// TODO(dperny): come up with a scheme to handle headless plugins
// TODO(dperny): handle plugins without create volume capabilities
return &api.VolumeInfo{VolumeID: v.Spec.Annotations.Name}, nil
}
createVolumeRequest := p.makeCreateVolume(v)
resp, err := c.CreateVolume(ctx, createVolumeRequest)
if err != nil {
return nil, err
}
return makeVolumeInfo(resp.Volume), nil
}
func (p *plugin) DeleteVolume(ctx context.Context, v *api.Volume) error {
if v.VolumeInfo == nil {
return errors.New("VolumeInfo must not be nil")
}
// we won't use a fancy createDeleteVolumeRequest method because the
// request is simple enough to not bother with it
secrets := p.makeSecrets(v)
req := &csi.DeleteVolumeRequest{
VolumeId: v.VolumeInfo.VolumeID,
Secrets: secrets,
}
c, err := p.Client(ctx)
if err != nil {
return err
}
// response from RPC intentionally left blank
_, err = c.DeleteVolume(ctx, req)
return err
}
// PublishVolume calls ControllerPublishVolume to publish the given Volume to
// the Node with the given swarmkit ID. It returns a map, which is the
// PublishContext for this Volume on this Node.
func (p *plugin) PublishVolume(ctx context.Context, v *api.Volume, nodeID string) (map[string]string, error) {
if !p.publisher {
return nil, nil
}
csiNodeID := p.swarmToCSI[nodeID]
if csiNodeID == "" {
log.L.Errorf("CSI node ID not found for given Swarm node ID. Plugin: %s , Swarm node ID: %s", p.name, nodeID)
return nil, status.Error(codes.FailedPrecondition, "CSI node ID not found for given Swarm node ID")
}
req := p.makeControllerPublishVolumeRequest(v, nodeID)
c, err := p.Client(ctx)
if err != nil {
return nil, err
}
resp, err := c.ControllerPublishVolume(ctx, req)
if err != nil {
return nil, err
}
return resp.PublishContext, nil
}
// UnpublishVolume calls ControllerUnpublishVolume to unpublish the given
// Volume from the Node with the given swarmkit ID. It returns an error if the
// unpublish does not succeed
func (p *plugin) UnpublishVolume(ctx context.Context, v *api.Volume, nodeID string) error {
if !p.publisher {
return nil
}
req := p.makeControllerUnpublishVolumeRequest(v, nodeID)
c, err := p.Client(ctx)
if err != nil {
return err
}
// response of the RPC intentionally left blank
_, err = c.ControllerUnpublishVolume(ctx, req)
return err
}
// AddNode adds a mapping for a node's swarm ID to the ID provided by this CSI
// plugin. This allows future calls to the plugin to be done entirely in terms
// of the swarm node ID.
//
// The CSI node ID is provided by the node as part of the NodeDescription.
func (p *plugin) AddNode(swarmID, csiID string) {
p.swarmToCSI[swarmID] = csiID
p.csiToSwarm[csiID] = swarmID
}
// RemoveNode removes a node from this plugin's node mappings.
func (p *plugin) RemoveNode(swarmID string) {
csiID := p.swarmToCSI[swarmID]
delete(p.swarmToCSI, swarmID)
delete(p.csiToSwarm, csiID)
}
// Client retrieves a csi.ControllerClient for this plugin
//
// If this is the first time client has been called and no client yet exists,
// it will initialize the gRPC connection to the remote plugin and create a new
// ControllerClient.
func (p *plugin) Client(ctx context.Context) (csi.ControllerClient, error) {
if p.controllerClient == nil {
if err := p.connect(ctx); err != nil {
return nil, err
}
}
return p.controllerClient, nil
}
// makeCreateVolume makes a csi.CreateVolumeRequest from the volume object and
// spec. it uses the Plugin's SecretProvider to retrieve relevant secrets.
func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest {
secrets := p.makeSecrets(v)
return &csi.CreateVolumeRequest{
Name: v.Spec.Annotations.Name,
Parameters: v.Spec.Driver.Options,
VolumeCapabilities: []*csi.VolumeCapability{
capability.MakeCapability(v.Spec.AccessMode),
},
Secrets: secrets,
AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements),
CapacityRange: makeCapacityRange(v.Spec.CapacityRange),
}
}
// makeSecrets uses the plugin's SecretProvider to make the secrets map to pass
// to CSI RPCs.
func (p *plugin) makeSecrets(v *api.Volume) map[string]string {
secrets := map[string]string{}
for _, vs := range v.Spec.Secrets {
// a secret should never be nil, but check just to be sure
if vs != nil {
secret := p.provider.GetSecret(vs.Secret)
if secret != nil {
// TODO(dperny): return an error, but this should never happen,
// as secrets should be validated at volume creation time
secrets[vs.Key] = string(secret.Spec.Data)
}
}
}
return secrets
}
func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerPublishVolumeRequest {
if v.VolumeInfo == nil {
return nil
}
secrets := p.makeSecrets(v)
capability := capability.MakeCapability(v.Spec.AccessMode)
capability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
}
return &csi.ControllerPublishVolumeRequest{
VolumeId: v.VolumeInfo.VolumeID,
NodeId: p.swarmToCSI[nodeID],
Secrets: secrets,
VolumeCapability: capability,
VolumeContext: v.VolumeInfo.VolumeContext,
}
}
func (p *plugin) makeControllerUnpublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerUnpublishVolumeRequest {
if v.VolumeInfo == nil {
return nil
}
secrets := p.makeSecrets(v)
return &csi.ControllerUnpublishVolumeRequest{
VolumeId: v.VolumeInfo.VolumeID,
NodeId: p.swarmToCSI[nodeID],
Secrets: secrets,
}
}
|